Source code for ketl.etl.Pipeline

import pandas as pd

from typing import Dict, List, Callable, Optional, Union, Type
from itertools import product
from pathlib import Path
from collections import defaultdict

from ketl.extractor.Extractor import BaseExtractor
from ketl.transformer.Transformer import BaseTransformer
from ketl.loader.Loader import BaseLoader


[docs]class InvalidPipelineError(Exception): pass
[docs]class ETLPipeline: def __init__(self, extractors: List[BaseExtractor] = None, transformers: List[BaseTransformer] = None, loaders: List[BaseLoader] = None, fanout: Dict[Union[BaseExtractor, BaseTransformer, BaseLoader], Union[List[BaseTransformer], List[BaseLoader]]] = None): """ Initialize the ETL pipeline. If a fanout is provided, checks the fanout for validity and ignores any other parameters. If a fanout is not provided, construct one from the provided extractors, transformers, and loaders where each extractor feeds each transformer and each transformer feeds each loader. :param extractors: A list of Extractors. :param transformers: A list of Transformers. :param loaders: A list of Loaders. :param fanout: A fanout. Other parameters ignored if fanout is provided. """ extractors = extractors or [] transformers = transformers or [] loaders = loaders or [] if not fanout: self.fanout: Dict[Union[BaseExtractor, BaseTransformer, BaseLoader], Union[List[BaseTransformer], List[BaseLoader]]] = defaultdict(list) for extractor, transformer in product(extractors, transformers): self.fanout[extractor].append(transformer) for transformer, loader in product(transformers, loaders): self.fanout[transformer].append(loader) else: for k, v in fanout.items(): if isinstance(k, BaseExtractor) and not all(map(lambda item: isinstance(item, BaseTransformer), v)): raise InvalidPipelineError(f'An Extractor {k} is attached to something other than a Transformer: {v}.') elif isinstance(k, BaseTransformer) and not all(map(lambda item: isinstance(item, BaseLoader), v)): raise InvalidPipelineError(f'A Transformer {k} is attached to something other than a Loader: {v}.') self.fanout = fanout
[docs] def execute(self): """ Run the pipeline. Fires the extractors, then feeds the results into the transformers. :return: None """ extraction_results = self._fire_extractors() self._fire_transformers(extraction_results)
def _fire_extractors(self) -> Dict[BaseExtractor, List[Path]]: """ Fire the extractors. :return: A dictionary whose keys are extractors and whose values are lists of paths produced by the extractors. """ results = {op: op.extract() for op in self.fanout if isinstance(op, BaseExtractor)} return results def _fire_transformers(self, extraction_results: Dict[BaseExtractor, List[Path]]): """ Fire the transformers. Each transformer result feeds the associated loaders. :param extraction_results: :return: None """ for op in self.fanout: if isinstance(op, BaseExtractor): result = extraction_results[op] transformers = self.fanout.get(op, []) # TODO: parallelize this using joblib or something like that for transformer in transformers: loaders = self.fanout.get(transformer, []) for df in transformer.transform(result): # type: pd.DataFrame self._fire_loaders(df, loaders) for loader in loaders: loader.finalize() @staticmethod def _fire_loaders(df, loaders): # we consume a data frame as expected if isinstance(df, pd.DataFrame): if not df.empty: for loader in loaders: # type: BaseLoader loader.load(df) else: # we'll do our best to handle this and trust # that you configured the loader correctly # but we'll warn you from warnings import warn for loader in loaders: warn(f'Loading data of type {type(df)} that is not a DataFrame with {type(loader)}') loader.load(df)