Add RAW data with new architecture

This commit is contained in:
Yûki VACHOT 2024-01-09 17:11:23 +01:00
parent 87f4dfdb0b
commit 1e42b00ce7
21 changed files with 272 additions and 55 deletions

8
.idea/.gitignore generated vendored Normal file
View file

@ -0,0 +1,8 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

1
.idea/.name generated Normal file
View file

@ -0,0 +1 @@
PySpark Training Repository

10
.idea/PySpark Training Repository.iml generated Normal file
View file

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="PySpark_3_10" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View file

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

7
.idea/misc.xml generated Normal file
View file

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="PySpark" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="PySpark_3_10" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated Normal file
View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/pythonProject.iml" filepath="$PROJECT_DIR$/.idea/pythonProject.iml" />
</modules>
</component>
</project>

View file

@ -13,3 +13,8 @@
## Run Python Test ## Run Python Test
- path from src/test_pyspark_training - path from src/test_pyspark_training
- `pytest -k test_` - `pytest -k test_`
## Run pylint for code check
## Run Python doc with Sphinx

View file

@ -0,0 +1,7 @@
name,age
Alice G.,13
John B.,20
Jack W.,19
Bob T.,35
John D.,9
Eve A.,12
1 name age
2 Alice G. 13
3 John B. 20
4 Jack W. 19
5 Bob T. 35
6 John D. 9
7 Eve A. 12

11
init.py
View file

@ -1,6 +1,7 @@
import os import os
import findspark import findspark
from pyspark.sql import SparkSession from pyspark.sql import SparkSession
from src.pyspark_training.output_dataset_1.compute_output_dataset_1 import compute_output_dataset_1
def init_env(): def init_env():
@ -12,17 +13,15 @@ def init_env():
def init_spark(): def init_spark():
spark = SparkSession.builder.master("local[*]").getOrCreate() return SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame([
{'name': 'OUI OUI', 'age': 30},
])
df.show()
def main(): def main():
print("hey there") print("hey there")
init_env() init_env()
init_spark() spark_session = init_spark()
compute_output_dataset_1(spark_session)
if __name__ == "__main__": if __name__ == "__main__":

20
src/Makefile Normal file
View file

@ -0,0 +1,20 @@
# Minimal makefile for Sphinx documentation
#
# You can set these variables from the command line, and also
# from the environment for the first two.
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = .
BUILDDIR = _build
# Put it first so that "make" without argument is like "make help".
help:
@$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
.PHONY: help Makefile
# Catch-all target: route all unknown targets to Sphinx using the new
# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS).
%: Makefile
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)

56
src/conf.py Normal file
View file

@ -0,0 +1,56 @@
# Configuration file for the Sphinx documentation builder.
#
# This file only contains a selection of the most common options. For a full
# list see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
# -- Path setup --------------------------------------------------------------
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
# -- Project information -----------------------------------------------------
project = 'PySpark Training Repository'
copyright = '2024, Yûki VACHOT'
author = 'Yûki VACHOT'
# The full version, including alpha/beta/rc tags
release = '0.0.1'
# -- General configuration ---------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'sphinx.ext.autodoc',
]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# -- Options for HTML output -------------------------------------------------
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'alabaster'
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']

20
src/index.rst Normal file
View file

@ -0,0 +1,20 @@
.. PySpark Training Repository documentation master file, created by
sphinx-quickstart on Tue Jan 9 09:55:22 2024.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to PySpark Training Repository's documentation!
=======================================================
.. toctree::
:maxdepth: 2
:caption: Contents:
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`

35
src/make.bat Normal file
View file

@ -0,0 +1,35 @@
@ECHO OFF
pushd %~dp0
REM Command file for Sphinx documentation
if "%SPHINXBUILD%" == "" (
set SPHINXBUILD=sphinx-build
)
set SOURCEDIR=.
set BUILDDIR=_build
%SPHINXBUILD% >NUL 2>NUL
if errorlevel 9009 (
echo.
echo.The 'sphinx-build' command was not found. Make sure you have Sphinx
echo.installed, then set the SPHINXBUILD environment variable to point
echo.to the full path of the 'sphinx-build' executable. Alternatively you
echo.may add the Sphinx directory to PATH.
echo.
echo.If you don't have Sphinx installed, grab it from
echo.https://www.sphinx-doc.org/
exit /b 1
)
if "%1" == "" goto help
%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
goto end
:help
%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O%
:end
popd

View file

@ -1,18 +0,0 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
def clean_output_dataset_1(df: DataFrame) -> DataFrame:
"""
:param df:
:return:
"""
df = remove_extra_spaces(df, 'name')
return df
def remove_extra_spaces(df, column_name):
df_transformed = df.withColumn(column_name, F.regexp_replace(F.col(column_name), "\\s+", " "))
return df_transformed

View file

@ -0,0 +1,17 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType
def remove_extra_spaces(df: DataFrame, column_name: str) -> DataFrame:
"""
:param df:
:param column_name:
:return:
"""
df_transformed = df.withColumn(
column_name,
F.regexp_replace(F.col(column_name), "\\s+", " ")
)
return df_transformed

View file

@ -1,31 +1,30 @@
import pyspark.sql.functions as F from pyspark.sql import SparkSession
from pyspark.sql import DataFrame from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from src.pyspark_training.output_dataset_1.clean_output_dataset_1 import clean_output_dataset_1
import src.pyspark_training.output_dataset_1.cleaning_output_dataset_1 as C
import src.pyspark_training.output_dataset_1.processing_output_dataset_1 as P
INPUT_DATASET_1_PATH = './assets/output_dataset_1/raw/RAW_input_output_dataset_1.csv'
OUTPUT_DATASET_1_PATH = './assets/output_dataset_1/output/OUTPUT_output_dataset_1.csv'
def compute_output_dataset_1(df: DataFrame) -> DataFrame: def compute_output_dataset_1(spark_session: SparkSession):
df = clean_output_dataset_1(df)
df = add_life_stage(df)
return df
def add_life_stage(df: DataFrame) -> DataFrame:
""" """
Add life stage Compute the output of output_dataset_1
child if age < 13 :param spark_session:
teenager if age >= 13 and <= 19
adult for age>20
:param df:
:return: :return:
""" """
df = df.withColumn( df_schema = StructType([
'life_stage', StructField('name', StringType(), False),
F.when(F.col('age') < 13, F.lit('child')) StructField('age', IntegerType(), False)
.when(F.col('age').between(13, 19), F.lit('teenager')) ])
.otherwise(F.lit('adult')) df = spark_session.read.csv(INPUT_DATASET_1_PATH, header=True, schema=df_schema)
)
return df # Cleaning
cleaned_df = C.remove_extra_spaces(df, 'name')
# Processing
df = P.add_life_stage(cleaned_df)
df.show()
df.write.mode('overwrite').csv(OUTPUT_DATASET_1_PATH)

View file

@ -0,0 +1,35 @@
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
def add_life_stage(df: DataFrame) -> DataFrame:
"""
Add life stage
child if age < 13
teenager if age >= 13 and <= 19
adult for age>20
:param df:
:return:
"""
df = df.withColumn(
'life_stage',
F.when(F.col('age') < 13, F.lit('child'))
.when(F.col('age').between(13, 19), F.lit('teenager'))
.when(F.col('age') >= 20, F.lit('adult'))
)
return df
def join_with_broadcast(big_df: DataFrame, smaller_df: DataFrame) -> DataFrame:
"""
Join big dataset and smaller dataset with broadcast
:param big_df:
:param smaller_df:
:return:
"""
df = big_df.join(
F.broadcast(smaller_df)
)
return df

View file

@ -1,6 +1,6 @@
from pyspark.sql import types as T from pyspark.sql import types as T
from src.test_pyspark_training.lib_test_utils import assert_df_equal from src.test_pyspark_training.lib_test_utils import assert_df_equal
from src.pyspark_training.output_dataset_1.clean_output_dataset_1 import remove_extra_spaces from src.pyspark_training.output_dataset_1.cleaning_output_dataset_1 import remove_extra_spaces
def test_remove_extra_spaces(spark_session): def test_remove_extra_spaces(spark_session):

View file

@ -1,6 +1,6 @@
from pyspark.sql import types as T from pyspark.sql import types as T
from src.test_pyspark_training.lib_test_utils import assert_df_equal from src.test_pyspark_training.lib_test_utils import assert_df_equal
from src.pyspark_training.output_dataset_1.compute_output_dataset_1 import add_life_stage from src.pyspark_training.output_dataset_1.processing_output_dataset_1 import add_life_stage
def test_add_life_stage(spark_session): def test_add_life_stage(spark_session):
@ -8,7 +8,7 @@ def test_add_life_stage(spark_session):
input_schema = T.StructType( input_schema = T.StructType(
[ [
T.StructField('name', T.StringType(), False), T.StructField('name', T.StringType(), False),
T.StructField('age', T.IntegerType(), False), T.StructField('age', T.IntegerType(), True),
] ]
) )
input_data = [ input_data = [
@ -18,14 +18,15 @@ def test_add_life_stage(spark_session):
('Bob T.', 35), ('Bob T.', 35),
('John D.', 9), ('John D.', 9),
('Eve A.', 12), ('Eve A.', 12),
('Eve B.', None),
] ]
input_df = spark_session.createDataFrame(input_data, input_schema) input_df = spark_session.createDataFrame(input_data, input_schema)
expected_schema = T.StructType( expected_schema = T.StructType(
[ [
T.StructField('name', T.StringType(), False), T.StructField('name', T.StringType(), False),
T.StructField('age', T.IntegerType(), False), T.StructField('age', T.IntegerType(), True),
T.StructField('life_stage', T.StringType(), False), T.StructField('life_stage', T.StringType(), True),
] ]
) )
expected_data = [ expected_data = [
@ -35,6 +36,7 @@ def test_add_life_stage(spark_session):
('Bob T.', 35, 'adult'), ('Bob T.', 35, 'adult'),
('John D.', 9, 'child'), ('John D.', 9, 'child'),
('Eve A.', 12, 'child'), ('Eve A.', 12, 'child'),
('Eve B.', None, None),
] ]
expected_df = spark_session.createDataFrame(expected_data, expected_schema) expected_df = spark_session.createDataFrame(expected_data, expected_schema)