From e28c44656930536e43823919095001110d8a7768 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Y=C3=BBki=20VACHOT?= Date: Thu, 11 Jan 2024 14:01:57 +0100 Subject: [PATCH] Add env python for Spark --- .idea/misc.xml | 2 +- .pylintrc | 3 ++ README.md | 43 ++++++++++++------- init.py | 19 ++++---- requirements.txt | 6 +++ spark_check.py | 27 ++++++++++++ .../compute_output_dataset_1.py | 9 ++-- src/test_pyspark_training/conftest.py | 2 +- 8 files changed, 79 insertions(+), 32 deletions(-) create mode 100644 .pylintrc create mode 100644 requirements.txt create mode 100644 spark_check.py diff --git a/.idea/misc.xml b/.idea/misc.xml index 8c1bc1c..6883c26 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..90c4c58 --- /dev/null +++ b/.pylintrc @@ -0,0 +1,3 @@ +[BASIC] +# Good variable names which should always be accepted, separated by a comma. +good-names=i, j, k, df \ No newline at end of file diff --git a/README.md b/README.md index b4ddf25..a1e33a6 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,31 @@ -# Python PySpark Training Repository +Python PySpark Training Repository +============== +**Author:** *Yûki VACHOT* -## Installation - - [Python 3.10](https://www.python.org/downloads/) - - pyspark=3.1.1 - - findspark - - pyspark-test - - [Spark 3.1.1](https://spark.apache.org/downloads.html) - - [Hadoop 3.3.6](https://hadoop.apache.org/releases.html) - - [Java JDK 11](https://www.oracle.com/java/technologies/downloads/#java11) - - (not mandatory) [Anaconda for conda](https://www.anaconda.com/download/) +**Updated:** **10/01/24** +# CONTENT TABLE -## Run Python Test - - path from src/test_pyspark_training - - `pytest -k test_` -## Run pylint for code check - +--- +# Installation -## Run Python doc with Sphinx \ No newline at end of file +`python -m venv ` + +- [Python 3.11.7](https://www.python.org/downloads/) +- [Spark 3.5.0 with Hadoop 3.0.0](https://spark.apache.org/downloads.html) +- [winutils.exe, .pdb and hadoop.dll](https://github.com/steveloughran/winutils/tree/master/hadoop-3.0.0/bin) +- [Java JDK 17](https://www.azul.com/downloads/?version=java-17-lts&package=jdk#zulu) +--- +# Run Python PySpark +- `python init.py` +--- +# Run Python Test +- path from src/test_pyspark_training +- `pytest -k test_` +--- +# Run pylint for code check + +--- +# Run Python doc with Sphinx + +--- \ No newline at end of file diff --git a/init.py b/init.py index a6afcb9..48144e1 100644 --- a/init.py +++ b/init.py @@ -1,24 +1,21 @@ import os -import findspark +import sys +from dotenv import load_dotenv from pyspark.sql import SparkSession from src.pyspark_training.output_dataset_1.compute_output_dataset_1 import compute_output_dataset_1 -def init_env(): - os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-11" - os.environ["SPARK_HOME"] = "C:\\SPARK\\spark-3.1.1-bin-hadoop3.2" - os.environ["HADOOP_HOME"] = "C:\\SPARK\\hadoop" - - findspark.init() - - def init_spark(): return SparkSession.builder.master("local[*]").getOrCreate() def main(): - print("hey there") - init_env() + load_dotenv() + print(os.environ["SPARK_HOME"]) # spark-3.5.0-bin-hadoop3 + print(os.environ["HADOOP_HOME"]) # spark-3.5.0-bin-hadoop3, + winutils et dll hadoop 3.0 + print(os.environ["JAVA_HOME"]) # java 8 local (zulu) + print("EXEC:") + print(sys.executable) spark_session = init_spark() compute_output_dataset_1(spark_session) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..442c80a --- /dev/null +++ b/requirements.txt @@ -0,0 +1,6 @@ +pyspark +pyspark-test +python-dotenv +pytest +pylint +sphinx \ No newline at end of file diff --git a/spark_check.py b/spark_check.py new file mode 100644 index 0000000..ab5d617 --- /dev/null +++ b/spark_check.py @@ -0,0 +1,27 @@ +from dotenv import load_dotenv +import sys +import os +from pyspark.sql import SparkSession + +load_dotenv() + +print(os.environ["SPARK_HOME"]) +print(os.environ["HADOOP_HOME"]) +print(os.environ["JAVA_HOME"]) +print("EXEC:") +print(sys.executable) + +spark = SparkSession.builder.getOrCreate() +df = spark.createDataFrame( + [ + (1, "val1"), + (2, "val2"), + (3, "val3"), + (4, "val4"), + ] +) +df.show() + +df.coalesce(1).write.mode("overwrite").csv("output/testoutput") +spark.stop() +print("Done\n") diff --git a/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py b/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py index bd31066..5faf01b 100644 --- a/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py +++ b/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py @@ -1,11 +1,14 @@ +""" + This module is used to compute the dataset output_dataset_1. +""" from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, IntegerType, StringType - import src.pyspark_training.output_dataset_1.cleaning_output_dataset_1 as C import src.pyspark_training.output_dataset_1.processing_output_dataset_1 as P + INPUT_DATASET_1_PATH = './assets/output_dataset_1/raw/RAW_input_output_dataset_1.csv' -OUTPUT_DATASET_1_PATH = './assets/output_dataset_1/output/OUTPUT_output_dataset_1.csv' +OUTPUT_DATASET_1_PATH = '.\\assets\\output_dataset_1\\output\\OUTPUT_output_dataset_1.csv' def compute_output_dataset_1(spark_session: SparkSession): @@ -27,4 +30,4 @@ def compute_output_dataset_1(spark_session: SparkSession): df = P.add_life_stage(cleaned_df) df.show() - df.write.mode('overwrite').csv(OUTPUT_DATASET_1_PATH) + df.coalesce(1).write.mode('overwrite').save(OUTPUT_DATASET_1_PATH, format='csv') diff --git a/src/test_pyspark_training/conftest.py b/src/test_pyspark_training/conftest.py index e0a5155..b30b0a5 100644 --- a/src/test_pyspark_training/conftest.py +++ b/src/test_pyspark_training/conftest.py @@ -16,7 +16,7 @@ def spark_session(request): os.environ['PYSPARK_PYTHON'] = sys.executable os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk-11" - os.environ["SPARK_HOME"] = "C:\\SPARK\\spark-3.1.1-bin-hadoop3.2" + os.environ["SPARK_HOME"] = "C:\\SPARK\\spark-3.1.1-bin-hadoop3.2\\bin" os.environ["HADOOP_HOME"] = "C:\\SPARK\\hadoop" findspark.init()