from pyspark.sql import DataFrame
from typing import Callable, Dict, List, Union
class GenericPipe:
"""
A class that sequentially applies a series of functions to a PySpark DataFrame.
Parameters
----------
df : pyspark.sql.dataframe.DataFrame
The input DataFrame to which the functions will be applied.
params : Dict[Callable, Union[List, None]]
A dictionary where keys are functions and values are lists of arguments or None.
Each function is applied to the DataFrame sequentially, with the DataFrame
passed as the first argument.
Attributes
----------
df : pyspark.sql.dataframe.DataFrame
The transformed DataFrame after applying all functions.
Methods
-------
apply_pipe()
Applies the series of functions to the DataFrame as specified by the `params`.
"""
def __init__(self, params: Dict[Callable, Union[List, None]]):
"""
Initializes the GenericPipe with a DataFrame and a set of functions to apply.
Parameters
----------
df : pyspark.sql.dataframe.DataFrame
The input DataFrame.
params : Dict[Callable, Union[List, None]]
A dictionary of functions to apply and their respective arguments.
"""
self.params = params
def apply_pipe(self, df) -> DataFrame:
"""
Applies the series of functions to the DataFrame in the order defined by the `params`.
Returns
-------
pyspark.sql.dataframe.DataFrame
The transformed DataFrame after applying all functions.
"""
self.df = df
for func, args in self.params.items():
if args is None:
self.df = func(self.df)
else:
self.df = func(self.df, *args)
return self.df