diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/.name b/.idea/.name new file mode 100644 index 0000000..2f49d84 --- /dev/null +++ b/.idea/.name @@ -0,0 +1 @@ +PySpark Training Repository \ No newline at end of file diff --git a/.idea/PySpark Training Repository.iml b/.idea/PySpark Training Repository.iml new file mode 100644 index 0000000..d1bcb34 --- /dev/null +++ b/.idea/PySpark Training Repository.iml @@ -0,0 +1,10 @@ + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..8c1bc1c --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..e15ec35 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/README.md b/README.md index 217e34f..b4ddf25 100644 --- a/README.md +++ b/README.md @@ -12,4 +12,9 @@ ## Run Python Test - path from src/test_pyspark_training - - `pytest -k test_` \ No newline at end of file + - `pytest -k test_` + +## Run pylint for code check + + +## Run Python doc with Sphinx \ No newline at end of file diff --git a/assets/output_dataset_1/raw/RAW_input_output_dataset_1.csv b/assets/output_dataset_1/raw/RAW_input_output_dataset_1.csv new file mode 100644 index 0000000..d677c10 --- /dev/null +++ b/assets/output_dataset_1/raw/RAW_input_output_dataset_1.csv @@ -0,0 +1,7 @@ +name,age +Alice G.,13 +John B.,20 +Jack W.,19 +Bob T.,35 +John D.,9 +Eve A.,12 \ No newline at end of file diff --git a/init.py b/init.py index 6413406..a6afcb9 100644 --- a/init.py +++ b/init.py @@ -1,6 +1,7 @@ import os import findspark from pyspark.sql import SparkSession +from src.pyspark_training.output_dataset_1.compute_output_dataset_1 import compute_output_dataset_1 def init_env(): @@ -12,17 +13,15 @@ def init_env(): def init_spark(): - spark = SparkSession.builder.master("local[*]").getOrCreate() - df = spark.createDataFrame([ - {'name': 'OUI OUI', 'age': 30}, - ]) - df.show() + return SparkSession.builder.master("local[*]").getOrCreate() def main(): print("hey there") init_env() - init_spark() + spark_session = init_spark() + + compute_output_dataset_1(spark_session) if __name__ == "__main__": diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..d4bb2cb --- /dev/null +++ b/src/Makefile @@ -0,0 +1,20 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line, and also +# from the environment for the first two. +SPHINXOPTS ?= +SPHINXBUILD ?= sphinx-build +SOURCEDIR = . +BUILDDIR = _build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/src/conf.py b/src/conf.py new file mode 100644 index 0000000..0bec54c --- /dev/null +++ b/src/conf.py @@ -0,0 +1,56 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +# import os +# import sys +# sys.path.insert(0, os.path.abspath('.')) + + +# -- Project information ----------------------------------------------------- + +project = 'PySpark Training Repository' +copyright = '2024, Yûki VACHOT' +author = 'Yûki VACHOT' + +# The full version, including alpha/beta/rc tags +release = '0.0.1' + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + 'sphinx.ext.autodoc', +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ['_templates'] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = 'alabaster' + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ['_static'] \ No newline at end of file diff --git a/src/index.rst b/src/index.rst new file mode 100644 index 0000000..573dce3 --- /dev/null +++ b/src/index.rst @@ -0,0 +1,20 @@ +.. PySpark Training Repository documentation master file, created by + sphinx-quickstart on Tue Jan 9 09:55:22 2024. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +Welcome to PySpark Training Repository's documentation! +======================================================= + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + + +Indices and tables +================== + +* :ref:`genindex` +* :ref:`modindex` +* :ref:`search` diff --git a/src/make.bat b/src/make.bat new file mode 100644 index 0000000..954237b --- /dev/null +++ b/src/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=. +set BUILDDIR=_build + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.https://www.sphinx-doc.org/ + exit /b 1 +) + +if "%1" == "" goto help + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% + +:end +popd diff --git a/src/pyspark_training/output_dataset_1/clean_output_dataset_1.py b/src/pyspark_training/output_dataset_1/clean_output_dataset_1.py deleted file mode 100644 index e0addb0..0000000 --- a/src/pyspark_training/output_dataset_1/clean_output_dataset_1.py +++ /dev/null @@ -1,18 +0,0 @@ -import pyspark.sql.functions as F -from pyspark.sql import DataFrame - - -def clean_output_dataset_1(df: DataFrame) -> DataFrame: - """ - - :param df: - :return: - """ - df = remove_extra_spaces(df, 'name') - - return df - - -def remove_extra_spaces(df, column_name): - df_transformed = df.withColumn(column_name, F.regexp_replace(F.col(column_name), "\\s+", " ")) - return df_transformed diff --git a/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py b/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py new file mode 100644 index 0000000..cb30ea3 --- /dev/null +++ b/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py @@ -0,0 +1,17 @@ +import pyspark.sql.functions as F +from pyspark.sql import DataFrame +from pyspark.sql.types import IntegerType + + +def remove_extra_spaces(df: DataFrame, column_name: str) -> DataFrame: + """ + + :param df: + :param column_name: + :return: + """ + df_transformed = df.withColumn( + column_name, + F.regexp_replace(F.col(column_name), "\\s+", " ") + ) + return df_transformed diff --git a/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py b/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py index 11002ca..bd31066 100644 --- a/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py +++ b/src/pyspark_training/output_dataset_1/compute_output_dataset_1.py @@ -1,31 +1,30 @@ -import pyspark.sql.functions as F -from pyspark.sql import DataFrame -from src.pyspark_training.output_dataset_1.clean_output_dataset_1 import clean_output_dataset_1 +from pyspark.sql import SparkSession +from pyspark.sql.types import StructType, StructField, IntegerType, StringType + +import src.pyspark_training.output_dataset_1.cleaning_output_dataset_1 as C +import src.pyspark_training.output_dataset_1.processing_output_dataset_1 as P + +INPUT_DATASET_1_PATH = './assets/output_dataset_1/raw/RAW_input_output_dataset_1.csv' +OUTPUT_DATASET_1_PATH = './assets/output_dataset_1/output/OUTPUT_output_dataset_1.csv' -def compute_output_dataset_1(df: DataFrame) -> DataFrame: - - df = clean_output_dataset_1(df) - - df = add_life_stage(df) - - return df - - -def add_life_stage(df: DataFrame) -> DataFrame: +def compute_output_dataset_1(spark_session: SparkSession): """ - Add life stage - child if age < 13 - teenager if age >= 13 and <= 19 - adult for age>20 - :param df: + Compute the output of output_dataset_1 + :param spark_session: :return: """ - df = df.withColumn( - 'life_stage', - F.when(F.col('age') < 13, F.lit('child')) - .when(F.col('age').between(13, 19), F.lit('teenager')) - .otherwise(F.lit('adult')) - ) + df_schema = StructType([ + StructField('name', StringType(), False), + StructField('age', IntegerType(), False) + ]) + df = spark_session.read.csv(INPUT_DATASET_1_PATH, header=True, schema=df_schema) - return df + # Cleaning + cleaned_df = C.remove_extra_spaces(df, 'name') + + # Processing + df = P.add_life_stage(cleaned_df) + + df.show() + df.write.mode('overwrite').csv(OUTPUT_DATASET_1_PATH) diff --git a/src/pyspark_training/output_dataset_1/processing_output_dataset_1.py b/src/pyspark_training/output_dataset_1/processing_output_dataset_1.py new file mode 100644 index 0000000..4281218 --- /dev/null +++ b/src/pyspark_training/output_dataset_1/processing_output_dataset_1.py @@ -0,0 +1,35 @@ +import pyspark.sql.functions as F +from pyspark.sql import DataFrame + + +def add_life_stage(df: DataFrame) -> DataFrame: + """ + Add life stage + child if age < 13 + teenager if age >= 13 and <= 19 + adult for age>20 + :param df: + :return: + """ + df = df.withColumn( + 'life_stage', + F.when(F.col('age') < 13, F.lit('child')) + .when(F.col('age').between(13, 19), F.lit('teenager')) + .when(F.col('age') >= 20, F.lit('adult')) + ) + + return df + + +def join_with_broadcast(big_df: DataFrame, smaller_df: DataFrame) -> DataFrame: + """ + Join big dataset and smaller dataset with broadcast + :param big_df: + :param smaller_df: + :return: + """ + df = big_df.join( + F.broadcast(smaller_df) + ) + + return df diff --git a/src/test_pyspark_training/test_output_dataset_1/test_clean_output_dataset_1/__init__.py b/src/test_pyspark_training/test_output_dataset_1/test_cleaning_output_dataset_1/__init__.py similarity index 100% rename from src/test_pyspark_training/test_output_dataset_1/test_clean_output_dataset_1/__init__.py rename to src/test_pyspark_training/test_output_dataset_1/test_cleaning_output_dataset_1/__init__.py diff --git a/src/test_pyspark_training/test_output_dataset_1/test_clean_output_dataset_1/test_remove_extra_spaces.py b/src/test_pyspark_training/test_output_dataset_1/test_cleaning_output_dataset_1/test_remove_extra_spaces.py similarity index 91% rename from src/test_pyspark_training/test_output_dataset_1/test_clean_output_dataset_1/test_remove_extra_spaces.py rename to src/test_pyspark_training/test_output_dataset_1/test_cleaning_output_dataset_1/test_remove_extra_spaces.py index 302cfb5..76ae077 100644 --- a/src/test_pyspark_training/test_output_dataset_1/test_clean_output_dataset_1/test_remove_extra_spaces.py +++ b/src/test_pyspark_training/test_output_dataset_1/test_cleaning_output_dataset_1/test_remove_extra_spaces.py @@ -1,6 +1,6 @@ from pyspark.sql import types as T from src.test_pyspark_training.lib_test_utils import assert_df_equal -from src.pyspark_training.output_dataset_1.clean_output_dataset_1 import remove_extra_spaces +from src.pyspark_training.output_dataset_1.cleaning_output_dataset_1 import remove_extra_spaces def test_remove_extra_spaces(spark_session): diff --git a/src/test_pyspark_training/test_output_dataset_1/test_compute_output_dataset_1/__init__.py b/src/test_pyspark_training/test_output_dataset_1/test_processing_output_dataset_1/__init__.py similarity index 100% rename from src/test_pyspark_training/test_output_dataset_1/test_compute_output_dataset_1/__init__.py rename to src/test_pyspark_training/test_output_dataset_1/test_processing_output_dataset_1/__init__.py diff --git a/src/test_pyspark_training/test_output_dataset_1/test_compute_output_dataset_1/test_add_life_stage.py b/src/test_pyspark_training/test_output_dataset_1/test_processing_output_dataset_1/test_add_life_stage.py similarity index 75% rename from src/test_pyspark_training/test_output_dataset_1/test_compute_output_dataset_1/test_add_life_stage.py rename to src/test_pyspark_training/test_output_dataset_1/test_processing_output_dataset_1/test_add_life_stage.py index de1be0d..9d708fb 100644 --- a/src/test_pyspark_training/test_output_dataset_1/test_compute_output_dataset_1/test_add_life_stage.py +++ b/src/test_pyspark_training/test_output_dataset_1/test_processing_output_dataset_1/test_add_life_stage.py @@ -1,6 +1,6 @@ from pyspark.sql import types as T from src.test_pyspark_training.lib_test_utils import assert_df_equal -from src.pyspark_training.output_dataset_1.compute_output_dataset_1 import add_life_stage +from src.pyspark_training.output_dataset_1.processing_output_dataset_1 import add_life_stage def test_add_life_stage(spark_session): @@ -8,7 +8,7 @@ def test_add_life_stage(spark_session): input_schema = T.StructType( [ T.StructField('name', T.StringType(), False), - T.StructField('age', T.IntegerType(), False), + T.StructField('age', T.IntegerType(), True), ] ) input_data = [ @@ -18,14 +18,15 @@ def test_add_life_stage(spark_session): ('Bob T.', 35), ('John D.', 9), ('Eve A.', 12), + ('Eve B.', None), ] input_df = spark_session.createDataFrame(input_data, input_schema) expected_schema = T.StructType( [ T.StructField('name', T.StringType(), False), - T.StructField('age', T.IntegerType(), False), - T.StructField('life_stage', T.StringType(), False), + T.StructField('age', T.IntegerType(), True), + T.StructField('life_stage', T.StringType(), True), ] ) expected_data = [ @@ -35,6 +36,7 @@ def test_add_life_stage(spark_session): ('Bob T.', 35, 'adult'), ('John D.', 9, 'child'), ('Eve A.', 12, 'child'), + ('Eve B.', None, None), ] expected_df = spark_session.createDataFrame(expected_data, expected_schema)