PySpark_Training_Repository/assets/pipegraph/pipegraph.py
2024-01-17 17:33:24 +01:00

108 lines
2.7 KiB
Python

import json
import inspect
import stack_data
import uuid
def get_node_type_from_func(func: object) -> str:
if func.__name__.startswith('clean_'):
node_type = 'Cleaning'
elif func.__name__.startswith('compute_'):
node_type = 'Computing'
else:
node_type = 'Processing'
return node_type
def get_stack_frame_id(frame_str: str):
return frame_str.split('id:')[1]
class PipeGraph(object):
__json = {
'nodes': [],
'links': []
}
__node_id = 0
__link_id = 0
def __init__(self, func):
self.__func = func
if func is not None:
self.__name = func.__name__
else:
self.__name = None
def __call__(self, *args, **kwargs):
print(self.__name)
print(self.__func.__doc__)
print("-----------")
print(self.add_node(
doc=self.__func.__doc__,
))
return self.__func(*args, **kwargs)
@classmethod
def json(cls):
print(json.dumps(cls.__json, indent=4))
def add_node(self, **kwargs):
node_type = get_node_type_from_func(self.__func)
current_id = PipeGraph.__node_id
frame = inspect.currentframe().f_back.f_back
current_co_name = stack_data.FrameInfo(frame).code.co_name
node = {'id': current_id, 'uuid': str(uuid.uuid4()), 'name': self.__name, 'type': node_type, 'before': current_co_name}
for k, v in kwargs.items():
node[k] = v
PipeGraph.__json['nodes'].append(node)
PipeGraph.__node_id += 1
return current_id
@classmethod
def add_link(cls, source_id: str, target_id: str):
current_id = cls.__link_id
cls.__json['links'].append({'id': current_id, 'source_id': source_id, 'target_id': target_id})
cls.__link_id += 1
return current_id
@classmethod
def remove_node(cls, node_id: int):
cls.__json['nodes'].remove(node_id)
@classmethod
def remove_link(cls, link_id: int):
cls.__json['links'].remove(link_id)
@classmethod
def get_node(cls, node_id: int):
return cls.__json['nodes'][node_id]
@classmethod
def get_link(cls, link_id: int):
return cls.__json['links'][link_id]
@classmethod
def get_nodes(cls):
return cls.__json['nodes']
@classmethod
def get_links(cls):
return cls.__json['links']
@classmethod
def get_node_id(cls):
return cls.__node_id
@classmethod
def get_link_id(cls):
return cls.__link_id
@classmethod
def reset(cls):
cls.__json = {
'nodes': [],
'links': []
}
cls.__node_id = 0
cls.__link_id = 0