Add life_cycle & architecture refactored
This commit is contained in:
parent
b22ebc40fe
commit
87f4dfdb0b
6 changed files with 87 additions and 1 deletions
|
|
@ -1,4 +1,16 @@
|
||||||
import pyspark.sql.functions as F
|
import pyspark.sql.functions as F
|
||||||
|
from pyspark.sql import DataFrame
|
||||||
|
|
||||||
|
|
||||||
|
def clean_output_dataset_1(df: DataFrame) -> DataFrame:
|
||||||
|
"""
|
||||||
|
|
||||||
|
:param df:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
df = remove_extra_spaces(df, 'name')
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
def remove_extra_spaces(df, column_name):
|
def remove_extra_spaces(df, column_name):
|
||||||
|
|
@ -0,0 +1,31 @@
|
||||||
|
import pyspark.sql.functions as F
|
||||||
|
from pyspark.sql import DataFrame
|
||||||
|
from src.pyspark_training.output_dataset_1.clean_output_dataset_1 import clean_output_dataset_1
|
||||||
|
|
||||||
|
|
||||||
|
def compute_output_dataset_1(df: DataFrame) -> DataFrame:
|
||||||
|
|
||||||
|
df = clean_output_dataset_1(df)
|
||||||
|
|
||||||
|
df = add_life_stage(df)
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
||||||
|
|
||||||
|
def add_life_stage(df: DataFrame) -> DataFrame:
|
||||||
|
"""
|
||||||
|
Add life stage
|
||||||
|
child if age < 13
|
||||||
|
teenager if age >= 13 and <= 19
|
||||||
|
adult for age>20
|
||||||
|
:param df:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
|
df = df.withColumn(
|
||||||
|
'life_stage',
|
||||||
|
F.when(F.col('age') < 13, F.lit('child'))
|
||||||
|
.when(F.col('age').between(13, 19), F.lit('teenager'))
|
||||||
|
.otherwise(F.lit('adult'))
|
||||||
|
)
|
||||||
|
|
||||||
|
return df
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
from pyspark.sql import types as T
|
from pyspark.sql import types as T
|
||||||
from src.test_pyspark_training.lib_test_utils import assert_df_equal
|
from src.test_pyspark_training.lib_test_utils import assert_df_equal
|
||||||
from src.pyspark_training.output_dataset_1.remove_extra_spaces import remove_extra_spaces
|
from src.pyspark_training.output_dataset_1.clean_output_dataset_1 import remove_extra_spaces
|
||||||
|
|
||||||
|
|
||||||
def test_remove_extra_spaces(spark_session):
|
def test_remove_extra_spaces(spark_session):
|
||||||
|
|
@ -0,0 +1,43 @@
|
||||||
|
from pyspark.sql import types as T
|
||||||
|
from src.test_pyspark_training.lib_test_utils import assert_df_equal
|
||||||
|
from src.pyspark_training.output_dataset_1.compute_output_dataset_1 import add_life_stage
|
||||||
|
|
||||||
|
|
||||||
|
def test_add_life_stage(spark_session):
|
||||||
|
|
||||||
|
input_schema = T.StructType(
|
||||||
|
[
|
||||||
|
T.StructField('name', T.StringType(), False),
|
||||||
|
T.StructField('age', T.IntegerType(), False),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
input_data = [
|
||||||
|
('Alice G.', 13),
|
||||||
|
('John B.', 20),
|
||||||
|
('Jack W.', 19),
|
||||||
|
('Bob T.', 35),
|
||||||
|
('John D.', 9),
|
||||||
|
('Eve A.', 12),
|
||||||
|
]
|
||||||
|
input_df = spark_session.createDataFrame(input_data, input_schema)
|
||||||
|
|
||||||
|
expected_schema = T.StructType(
|
||||||
|
[
|
||||||
|
T.StructField('name', T.StringType(), False),
|
||||||
|
T.StructField('age', T.IntegerType(), False),
|
||||||
|
T.StructField('life_stage', T.StringType(), False),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
expected_data = [
|
||||||
|
('Alice G.', 13, 'teenager'),
|
||||||
|
('John B.', 20, 'adult'),
|
||||||
|
('Jack W.', 19, 'teenager'),
|
||||||
|
('Bob T.', 35, 'adult'),
|
||||||
|
('John D.', 9, 'child'),
|
||||||
|
('Eve A.', 12, 'child'),
|
||||||
|
]
|
||||||
|
expected_df = spark_session.createDataFrame(expected_data, expected_schema)
|
||||||
|
|
||||||
|
df = add_life_stage(input_df)
|
||||||
|
|
||||||
|
assert_df_equal(df, expected_df)
|
||||||
Loading…
Add table
Add a link
Reference in a new issue