PipeGraph Decorator
This commit is contained in:
parent
e28c446569
commit
8f6699ecc6
14 changed files with 357 additions and 136 deletions
190
assets/example_test/graph_example.py
Normal file
190
assets/example_test/graph_example.py
Normal file
|
|
@ -0,0 +1,190 @@
|
|||
import os
|
||||
import sys
|
||||
from dotenv import load_dotenv
|
||||
from pyspark.sql import SparkSession, DataFrame
|
||||
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
|
||||
import inspect
|
||||
from assets.pipegraph.pipegraph import PipeGraph
|
||||
|
||||
|
||||
###############################################################
|
||||
@PipeGraph
|
||||
def compute_dataset_1(df1: DataFrame, df2: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Compute the dataset_1
|
||||
:param df1:
|
||||
:param df2:
|
||||
:return:
|
||||
"""
|
||||
cleaned_df1 = clean_df1(df1)
|
||||
cleaned_df2 = clean_df2(df2)
|
||||
|
||||
df = join_df1_df2(cleaned_df1, cleaned_df2, 'id', how='left')
|
||||
|
||||
df = add_letter_column(df)
|
||||
df = add_calculated_column(df)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
def clean_df1(df1: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Clean the dataframe
|
||||
:param df1:
|
||||
:return:
|
||||
"""
|
||||
df1 = clean_df1_space(df1)
|
||||
|
||||
return df1
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def clean_df1_space(df1: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Clean space of dataframe
|
||||
:param df1:
|
||||
:return:
|
||||
"""
|
||||
# clean space
|
||||
return df1
|
||||
|
||||
|
||||
def clean_df2(df2: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Clean the dataframe
|
||||
:param df2:
|
||||
:return:
|
||||
"""
|
||||
df2 = clean_df2_space(df2)
|
||||
df2 = clean_df2_letter(df2)
|
||||
return df2
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def clean_df2_space(df2: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Clean space of dataframe
|
||||
:param df2:
|
||||
:return:
|
||||
"""
|
||||
# clean space
|
||||
return df2
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def clean_df2_letter(df2: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Clean the letter of dataframe
|
||||
:param df2:
|
||||
:return:
|
||||
"""
|
||||
# clean letter
|
||||
return df2
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def add_letter_column(df: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Adds a letter column to dataframe
|
||||
:param df:
|
||||
:return:
|
||||
"""
|
||||
# Add column letter
|
||||
return df
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def add_calculated_column(df: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Adds calculated column to dataframe
|
||||
:param df:
|
||||
:return:
|
||||
"""
|
||||
# Add calculated column
|
||||
return df
|
||||
|
||||
|
||||
###############################################################
|
||||
@PipeGraph
|
||||
def compute_dataset_2(df2: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Compute the dataset_2
|
||||
:param df2:
|
||||
:return:
|
||||
"""
|
||||
cleaned_df2 = clean_df2(df2)
|
||||
|
||||
df = add_letter_column(cleaned_df2)
|
||||
df = add_complex_calculated_column(df)
|
||||
|
||||
return df
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def add_complex_calculated_column(df: DataFrame) -> DataFrame:
|
||||
"""
|
||||
Compute the complex_calculated_column
|
||||
:param df:
|
||||
:return:
|
||||
"""
|
||||
# Add complex calculated column
|
||||
return df
|
||||
|
||||
|
||||
@PipeGraph
|
||||
def join_df1_df2(df1: DataFrame, df2: DataFrame, on: str, how='left') -> DataFrame:
|
||||
"""
|
||||
Join two dataframes
|
||||
:param df1:
|
||||
:param df2:
|
||||
:param on:
|
||||
:param how:
|
||||
:return:
|
||||
"""
|
||||
return df1.join(df2, on, how)
|
||||
|
||||
|
||||
###############################################################
|
||||
def init_spark():
|
||||
return SparkSession.builder.master("local[*]").getOrCreate()
|
||||
|
||||
|
||||
def main():
|
||||
load_dotenv()
|
||||
print(os.environ["SPARK_HOME"]) # spark-3.5.0-bin-hadoop3
|
||||
print(os.environ["HADOOP_HOME"]) # spark-3.5.0-bin-hadoop3, + winutils et dll hadoop 3.0
|
||||
print(os.environ["JAVA_HOME"]) # java 8 local (zulu)
|
||||
print("EXEC:")
|
||||
print(sys.executable)
|
||||
spark_session = init_spark()
|
||||
|
||||
PipeGraph.json()
|
||||
|
||||
df1 = spark_session.createDataFrame(
|
||||
[(1, 'name 1'), (2, 'name 2'), (3, 'name 3')],
|
||||
StructType([
|
||||
StructField('id', IntegerType()),
|
||||
StructField('name', StringType()),
|
||||
])
|
||||
)
|
||||
df2 = spark_session.createDataFrame(
|
||||
[(1, 'adult'), (2, 'child'), (3, 'teenager')],
|
||||
StructType([
|
||||
StructField('id', IntegerType()),
|
||||
StructField('life_stage', StringType()),
|
||||
])
|
||||
)
|
||||
|
||||
output_dataset_1 = compute_dataset_1(df1, df2)
|
||||
output_dataset_2 = compute_dataset_2(df2)
|
||||
|
||||
output_dataset_1.show()
|
||||
output_dataset_2.show()
|
||||
spark_session.stop()
|
||||
|
||||
print(f'PipeGraph JSON id:{PipeGraph.get_node_id()}')
|
||||
PipeGraph.json()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
5
assets/graphviz/index.html
Normal file
5
assets/graphviz/index.html
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
<header>
|
||||
</header>
|
||||
<body>
|
||||
<img src="my_graph.svg" usemap="#my_graph"/>
|
||||
</body>
|
||||
16
assets/graphviz/pygraphviz_example.py
Normal file
16
assets/graphviz/pygraphviz_example.py
Normal file
|
|
@ -0,0 +1,16 @@
|
|||
import pygraphviz as pgv
|
||||
|
||||
|
||||
my_graph = pgv.AGraph(id='my_graph', name='my_graph')
|
||||
my_graph.add_node(
|
||||
'RAW_dataset_1',
|
||||
label='RAW_dataset_1',
|
||||
tooltip='tooltip text \r next line',
|
||||
URL='https://google.be/',
|
||||
target='_blank'
|
||||
)
|
||||
my_graph.add_node(
|
||||
'node 1'
|
||||
)
|
||||
my_graph.layout(prog='dot')
|
||||
my_graph.draw(path="../graphviz/my_graph.svg", format="svg")
|
||||
108
assets/pipegraph/pipegraph.py
Normal file
108
assets/pipegraph/pipegraph.py
Normal file
|
|
@ -0,0 +1,108 @@
|
|||
import json
|
||||
import inspect
|
||||
|
||||
|
||||
def get_node_type_from_func(func: object) -> str:
|
||||
if func.__name__.startswith('clean_'):
|
||||
node_type = 'Cleaning'
|
||||
elif func.__name__.startswith('compute_'):
|
||||
node_type = 'Computing'
|
||||
else:
|
||||
node_type = 'Processing'
|
||||
return node_type
|
||||
|
||||
|
||||
def get_stack_frame_id(frame_str: str):
|
||||
return frame_str.split('id:')[1]
|
||||
|
||||
|
||||
class PipeGraph(object):
|
||||
__json = {
|
||||
'nodes': [],
|
||||
'links': []
|
||||
}
|
||||
__node_id = 0
|
||||
__link_id = 0
|
||||
|
||||
def __init__(self, func):
|
||||
self.__func = func
|
||||
if func is not None:
|
||||
self.__name = func.__name__
|
||||
else:
|
||||
self.__name = None
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
print(self.__name)
|
||||
print(self.__func.__doc__)
|
||||
test = inspect.stack()
|
||||
print(get_stack_frame_id(test[1][0].stack[0]))
|
||||
print(self.add_node(
|
||||
doc=self.__func.__doc__,
|
||||
))
|
||||
print(self.add_link(
|
||||
get_stack_frame_id()
|
||||
))
|
||||
|
||||
return self.__func(*args, **kwargs)
|
||||
|
||||
@classmethod
|
||||
def json(cls):
|
||||
print(json.dumps(cls.__json, indent=4))
|
||||
|
||||
def add_node(self, **kwargs):
|
||||
node_type = get_node_type_from_func(self.__func)
|
||||
current_id = PipeGraph.__node_id
|
||||
node = {'id': current_id, 'name': self.__name, 'type': node_type}
|
||||
for k, v in kwargs.items():
|
||||
node[k] = v
|
||||
PipeGraph.__json['nodes'].append(node)
|
||||
PipeGraph.__node_id += 1
|
||||
return current_id
|
||||
|
||||
@classmethod
|
||||
def add_link(cls, source_id: str, target_id: str):
|
||||
current_id = cls.__link_id
|
||||
cls.__json['links'].append({'id': current_id, 'source_id': source_id, 'target_id': target_id})
|
||||
cls.__link_id += 1
|
||||
return current_id
|
||||
|
||||
@classmethod
|
||||
def remove_node(cls, node_id: int):
|
||||
cls.__json['nodes'].remove(node_id)
|
||||
|
||||
@classmethod
|
||||
def remove_link(cls, link_id: int):
|
||||
cls.__json['links'].remove(link_id)
|
||||
|
||||
@classmethod
|
||||
def get_node(cls, node_id: int):
|
||||
return cls.__json['nodes'][node_id]
|
||||
|
||||
@classmethod
|
||||
def get_link(cls, link_id: int):
|
||||
return cls.__json['links'][link_id]
|
||||
|
||||
@classmethod
|
||||
def get_nodes(cls):
|
||||
return cls.__json['nodes']
|
||||
|
||||
@classmethod
|
||||
def get_links(cls):
|
||||
return cls.__json['links']
|
||||
|
||||
@classmethod
|
||||
def get_node_id(cls):
|
||||
return cls.__node_id
|
||||
|
||||
@classmethod
|
||||
def get_link_id(cls):
|
||||
return cls.__link_id
|
||||
|
||||
@classmethod
|
||||
def reset(cls):
|
||||
cls.__json = {
|
||||
'nodes': [],
|
||||
'links': []
|
||||
}
|
||||
cls.__node_id = 0
|
||||
cls.__link_id = 0
|
||||
19
assets/pyspark_transforms/pyspark_transforms.py
Normal file
19
assets/pyspark_transforms/pyspark_transforms.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
from pyspark.sql import DataFrame
|
||||
|
||||
|
||||
def pyspark_transforms(*args, **kwargs):
|
||||
|
||||
def wrapper(*args, **kwargs):
|
||||
print(args)
|
||||
print(kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def Input(url: str) -> None:
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def Output(url: str):
|
||||
pass
|
||||
13
assets/pyspark_transforms/pyspark_transforms_test.py
Normal file
13
assets/pyspark_transforms/pyspark_transforms_test.py
Normal file
|
|
@ -0,0 +1,13 @@
|
|||
from assets.pyspark_transforms.pyspark_transforms import pyspark_transforms, Input, Output
|
||||
|
||||
|
||||
@pyspark_transforms(
|
||||
output_df=Output('test'),
|
||||
input_df1=Input('test'),
|
||||
input_df2=Input('test'),
|
||||
)
|
||||
def pyspark_training_test(sc, output_df, input_df1, input_df2):
|
||||
print('hey pyspark_training_test')
|
||||
|
||||
|
||||
pyspark_training_test()
|
||||
Loading…
Add table
Add a link
Reference in a new issue