diff --git a/README.md b/README.md index a1e33a6..27df861 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ Python PySpark Training Repository - [Spark 3.5.0 with Hadoop 3.0.0](https://spark.apache.org/downloads.html) - [winutils.exe, .pdb and hadoop.dll](https://github.com/steveloughran/winutils/tree/master/hadoop-3.0.0/bin) - [Java JDK 17](https://www.azul.com/downloads/?version=java-17-lts&package=jdk#zulu) +- [pygraphviz](https://pygraphviz.github.io/documentation/stable/install.html#windows) install in x86 + - `pip install --global-option=build_ext --global-option="-IC:\Program Files (x86)\Graphviz\include" --global-option="-LC:\Program Files (x86)\Graphviz\lib" pygraphviz` --- # Run Python PySpark - `python init.py` diff --git a/assets/example_test/graph_example.py b/assets/example_test/graph_example.py new file mode 100644 index 0000000..93c5314 --- /dev/null +++ b/assets/example_test/graph_example.py @@ -0,0 +1,190 @@ +import os +import sys +from dotenv import load_dotenv +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql.types import StructField, StructType, StringType, IntegerType +import inspect +from assets.pipegraph.pipegraph import PipeGraph + + +############################################################### +@PipeGraph +def compute_dataset_1(df1: DataFrame, df2: DataFrame) -> DataFrame: + """ + Compute the dataset_1 + :param df1: + :param df2: + :return: + """ + cleaned_df1 = clean_df1(df1) + cleaned_df2 = clean_df2(df2) + + df = join_df1_df2(cleaned_df1, cleaned_df2, 'id', how='left') + + df = add_letter_column(df) + df = add_calculated_column(df) + + return df + + +def clean_df1(df1: DataFrame) -> DataFrame: + """ + Clean the dataframe + :param df1: + :return: + """ + df1 = clean_df1_space(df1) + + return df1 + + +@PipeGraph +def clean_df1_space(df1: DataFrame) -> DataFrame: + """ + Clean space of dataframe + :param df1: + :return: + """ + # clean space + return df1 + + +def clean_df2(df2: DataFrame) -> DataFrame: + """ + Clean the dataframe + :param df2: + :return: + """ + df2 = clean_df2_space(df2) + df2 = clean_df2_letter(df2) + return df2 + + +@PipeGraph +def clean_df2_space(df2: DataFrame) -> DataFrame: + """ + Clean space of dataframe + :param df2: + :return: + """ + # clean space + return df2 + + +@PipeGraph +def clean_df2_letter(df2: DataFrame) -> DataFrame: + """ + Clean the letter of dataframe + :param df2: + :return: + """ + # clean letter + return df2 + + +@PipeGraph +def add_letter_column(df: DataFrame) -> DataFrame: + """ + Adds a letter column to dataframe + :param df: + :return: + """ + # Add column letter + return df + + +@PipeGraph +def add_calculated_column(df: DataFrame) -> DataFrame: + """ + Adds calculated column to dataframe + :param df: + :return: + """ + # Add calculated column + return df + + +############################################################### +@PipeGraph +def compute_dataset_2(df2: DataFrame) -> DataFrame: + """ + Compute the dataset_2 + :param df2: + :return: + """ + cleaned_df2 = clean_df2(df2) + + df = add_letter_column(cleaned_df2) + df = add_complex_calculated_column(df) + + return df + + +@PipeGraph +def add_complex_calculated_column(df: DataFrame) -> DataFrame: + """ + Compute the complex_calculated_column + :param df: + :return: + """ + # Add complex calculated column + return df + + +@PipeGraph +def join_df1_df2(df1: DataFrame, df2: DataFrame, on: str, how='left') -> DataFrame: + """ + Join two dataframes + :param df1: + :param df2: + :param on: + :param how: + :return: + """ + return df1.join(df2, on, how) + + +############################################################### +def init_spark(): + return SparkSession.builder.master("local[*]").getOrCreate() + + +def main(): + load_dotenv() + print(os.environ["SPARK_HOME"]) # spark-3.5.0-bin-hadoop3 + print(os.environ["HADOOP_HOME"]) # spark-3.5.0-bin-hadoop3, + winutils et dll hadoop 3.0 + print(os.environ["JAVA_HOME"]) # java 8 local (zulu) + print("EXEC:") + print(sys.executable) + spark_session = init_spark() + + PipeGraph.json() + + df1 = spark_session.createDataFrame( + [(1, 'name 1'), (2, 'name 2'), (3, 'name 3')], + StructType([ + StructField('id', IntegerType()), + StructField('name', StringType()), + ]) + ) + df2 = spark_session.createDataFrame( + [(1, 'adult'), (2, 'child'), (3, 'teenager')], + StructType([ + StructField('id', IntegerType()), + StructField('life_stage', StringType()), + ]) + ) + + output_dataset_1 = compute_dataset_1(df1, df2) + output_dataset_2 = compute_dataset_2(df2) + + output_dataset_1.show() + output_dataset_2.show() + spark_session.stop() + + print(f'PipeGraph JSON id:{PipeGraph.get_node_id()}') + PipeGraph.json() + + +if __name__ == "__main__": + main() diff --git a/assets/graphviz/index.html b/assets/graphviz/index.html new file mode 100644 index 0000000..bfd3c8c --- /dev/null +++ b/assets/graphviz/index.html @@ -0,0 +1,5 @@ +
+
+ + + \ No newline at end of file diff --git a/assets/graphviz/pygraphviz_example.py b/assets/graphviz/pygraphviz_example.py new file mode 100644 index 0000000..14e8c21 --- /dev/null +++ b/assets/graphviz/pygraphviz_example.py @@ -0,0 +1,16 @@ +import pygraphviz as pgv + + +my_graph = pgv.AGraph(id='my_graph', name='my_graph') +my_graph.add_node( + 'RAW_dataset_1', + label='RAW_dataset_1', + tooltip='tooltip text \r next line', + URL='https://google.be/', + target='_blank' +) +my_graph.add_node( + 'node 1' +) +my_graph.layout(prog='dot') +my_graph.draw(path="../graphviz/my_graph.svg", format="svg") diff --git a/assets/pipegraph/pipegraph.py b/assets/pipegraph/pipegraph.py new file mode 100644 index 0000000..a261a41 --- /dev/null +++ b/assets/pipegraph/pipegraph.py @@ -0,0 +1,108 @@ +import json +import inspect + + +def get_node_type_from_func(func: object) -> str: + if func.__name__.startswith('clean_'): + node_type = 'Cleaning' + elif func.__name__.startswith('compute_'): + node_type = 'Computing' + else: + node_type = 'Processing' + return node_type + + +def get_stack_frame_id(frame_str: str): + return frame_str.split('id:')[1] + + +class PipeGraph(object): + __json = { + 'nodes': [], + 'links': [] + } + __node_id = 0 + __link_id = 0 + + def __init__(self, func): + self.__func = func + if func is not None: + self.__name = func.__name__ + else: + self.__name = None + + def __call__(self, *args, **kwargs): + print(self.__name) + print(self.__func.__doc__) + test = inspect.stack() + print(get_stack_frame_id(test[1][0].stack[0])) + print(self.add_node( + doc=self.__func.__doc__, + )) + print(self.add_link( + get_stack_frame_id() + )) + + return self.__func(*args, **kwargs) + + @classmethod + def json(cls): + print(json.dumps(cls.__json, indent=4)) + + def add_node(self, **kwargs): + node_type = get_node_type_from_func(self.__func) + current_id = PipeGraph.__node_id + node = {'id': current_id, 'name': self.__name, 'type': node_type} + for k, v in kwargs.items(): + node[k] = v + PipeGraph.__json['nodes'].append(node) + PipeGraph.__node_id += 1 + return current_id + + @classmethod + def add_link(cls, source_id: str, target_id: str): + current_id = cls.__link_id + cls.__json['links'].append({'id': current_id, 'source_id': source_id, 'target_id': target_id}) + cls.__link_id += 1 + return current_id + + @classmethod + def remove_node(cls, node_id: int): + cls.__json['nodes'].remove(node_id) + + @classmethod + def remove_link(cls, link_id: int): + cls.__json['links'].remove(link_id) + + @classmethod + def get_node(cls, node_id: int): + return cls.__json['nodes'][node_id] + + @classmethod + def get_link(cls, link_id: int): + return cls.__json['links'][link_id] + + @classmethod + def get_nodes(cls): + return cls.__json['nodes'] + + @classmethod + def get_links(cls): + return cls.__json['links'] + + @classmethod + def get_node_id(cls): + return cls.__node_id + + @classmethod + def get_link_id(cls): + return cls.__link_id + + @classmethod + def reset(cls): + cls.__json = { + 'nodes': [], + 'links': [] + } + cls.__node_id = 0 + cls.__link_id = 0 diff --git a/assets/pyspark_transforms/pyspark_transforms.py b/assets/pyspark_transforms/pyspark_transforms.py new file mode 100644 index 0000000..cf3a4ed --- /dev/null +++ b/assets/pyspark_transforms/pyspark_transforms.py @@ -0,0 +1,19 @@ +from pyspark.sql import DataFrame + + +def pyspark_transforms(*args, **kwargs): + + def wrapper(*args, **kwargs): + print(args) + print(kwargs) + + return wrapper + + +def Input(url: str) -> None: + + return None + + +def Output(url: str): + pass \ No newline at end of file diff --git a/assets/pyspark_transforms/pyspark_transforms_test.py b/assets/pyspark_transforms/pyspark_transforms_test.py new file mode 100644 index 0000000..1f8e24d --- /dev/null +++ b/assets/pyspark_transforms/pyspark_transforms_test.py @@ -0,0 +1,13 @@ +from assets.pyspark_transforms.pyspark_transforms import pyspark_transforms, Input, Output + + +@pyspark_transforms( + output_df=Output('test'), + input_df1=Input('test'), + input_df2=Input('test'), +) +def pyspark_training_test(sc, output_df, input_df1, input_df2): + print('hey pyspark_training_test') + + +pyspark_training_test() diff --git a/requirements.txt b/requirements.txt index 442c80a..d1a0bc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,6 @@ pyspark-test python-dotenv pytest pylint -sphinx \ No newline at end of file +sphinx +sphinx-rtd-theme +pygraphviz \ No newline at end of file diff --git a/src/Makefile b/src/Makefile deleted file mode 100644 index d4bb2cb..0000000 --- a/src/Makefile +++ /dev/null @@ -1,20 +0,0 @@ -# Minimal makefile for Sphinx documentation -# - -# You can set these variables from the command line, and also -# from the environment for the first two. -SPHINXOPTS ?= -SPHINXBUILD ?= sphinx-build -SOURCEDIR = . -BUILDDIR = _build - -# Put it first so that "make" without argument is like "make help". -help: - @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) - -.PHONY: help Makefile - -# Catch-all target: route all unknown targets to Sphinx using the new -# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). -%: Makefile - @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) diff --git a/src/conf.py b/src/conf.py deleted file mode 100644 index 0bec54c..0000000 --- a/src/conf.py +++ /dev/null @@ -1,56 +0,0 @@ -# Configuration file for the Sphinx documentation builder. -# -# This file only contains a selection of the most common options. For a full -# list see the documentation: -# https://www.sphinx-doc.org/en/master/usage/configuration.html - -# -- Path setup -------------------------------------------------------------- - -# If extensions (or modules to document with autodoc) are in another directory, -# add these directories to sys.path here. If the directory is relative to the -# documentation root, use os.path.abspath to make it absolute, like shown here. -# -# import os -# import sys -# sys.path.insert(0, os.path.abspath('.')) - - -# -- Project information ----------------------------------------------------- - -project = 'PySpark Training Repository' -copyright = '2024, Yûki VACHOT' -author = 'Yûki VACHOT' - -# The full version, including alpha/beta/rc tags -release = '0.0.1' - - -# -- General configuration --------------------------------------------------- - -# Add any Sphinx extension module names here, as strings. They can be -# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom -# ones. -extensions = [ - 'sphinx.ext.autodoc', -] - -# Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] - -# List of patterns, relative to source directory, that match files and -# directories to ignore when looking for source files. -# This pattern also affects html_static_path and html_extra_path. -exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store'] - - -# -- Options for HTML output ------------------------------------------------- - -# The theme to use for HTML and HTML Help pages. See the documentation for -# a list of builtin themes. -# -html_theme = 'alabaster' - -# Add any paths that contain custom static files (such as style sheets) here, -# relative to this directory. They are copied after the builtin static files, -# so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] \ No newline at end of file diff --git a/src/index.rst b/src/index.rst deleted file mode 100644 index 573dce3..0000000 --- a/src/index.rst +++ /dev/null @@ -1,20 +0,0 @@ -.. PySpark Training Repository documentation master file, created by - sphinx-quickstart on Tue Jan 9 09:55:22 2024. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. - -Welcome to PySpark Training Repository's documentation! -======================================================= - -.. toctree:: - :maxdepth: 2 - :caption: Contents: - - - -Indices and tables -================== - -* :ref:`genindex` -* :ref:`modindex` -* :ref:`search` diff --git a/src/make.bat b/src/make.bat deleted file mode 100644 index 954237b..0000000 --- a/src/make.bat +++ /dev/null @@ -1,35 +0,0 @@ -@ECHO OFF - -pushd %~dp0 - -REM Command file for Sphinx documentation - -if "%SPHINXBUILD%" == "" ( - set SPHINXBUILD=sphinx-build -) -set SOURCEDIR=. -set BUILDDIR=_build - -%SPHINXBUILD% >NUL 2>NUL -if errorlevel 9009 ( - echo. - echo.The 'sphinx-build' command was not found. Make sure you have Sphinx - echo.installed, then set the SPHINXBUILD environment variable to point - echo.to the full path of the 'sphinx-build' executable. Alternatively you - echo.may add the Sphinx directory to PATH. - echo. - echo.If you don't have Sphinx installed, grab it from - echo.https://www.sphinx-doc.org/ - exit /b 1 -) - -if "%1" == "" goto help - -%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% -goto end - -:help -%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% %O% - -:end -popd diff --git a/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py b/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py index cb30ea3..82c8b81 100644 --- a/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py +++ b/src/pyspark_training/output_dataset_1/cleaning_output_dataset_1.py @@ -1,6 +1,5 @@ import pyspark.sql.functions as F from pyspark.sql import DataFrame -from pyspark.sql.types import IntegerType def remove_extra_spaces(df: DataFrame, column_name: str) -> DataFrame: diff --git a/example_test.py b/src/test_pyspark_training/example_test.py similarity index 64% rename from example_test.py rename to src/test_pyspark_training/example_test.py index 19b41b4..f133dcb 100644 --- a/example_test.py +++ b/src/test_pyspark_training/example_test.py @@ -1,4 +1,2 @@ - - def test_example_test(): - return 0 \ No newline at end of file + pass