Source code for ketl.transformer.Transformer

from abc import abstractmethod
from itertools import chain
from pathlib import Path
from typing import List, Union, Dict, Callable

import pandas as pd
import json


[docs]class AdapterError(Exception): pass
[docs]class NoValidSourcesError(AdapterError): pass
[docs]class BaseTransformer: """ The base transformer class. Should not be instantiated directly. """ # TODO: init should take the configuration kwargs def __init__(self, transpose: bool = False, concat_on_axis: Union[int, str] = None, columns: List[Union[str, int]] = None, skip_errors: bool = False, rename: Union[Callable, Dict[str, str]] = None, **kwargs): """ Initialize the transformer. :param transpose: whether to transpose the resulting matrix. :param concat_on_axis: whether to concatenate data along some axis. :param columns: column names. :param skip_errors: whether to skip input files if an error is encountered. :param rename: a dict or function suitable for passing to the Pandas rename function. :param kwargs: optional keyword arguments to pass to reader. """ self.transpose = transpose self.concat_on_axis = concat_on_axis self.columns = columns self.skip_errors = skip_errors self.rename = rename self.passed_kwargs = kwargs
[docs] @abstractmethod def transform(self, source_files: List[Path]) -> pd.DataFrame: """ Run the actual transformation. :param source_files: the source files containing the data. :return: a data frame. """ raise NotImplementedError
@abstractmethod def _build_data_frame(self, source_files: List[Path]) -> pd.DataFrame: """ Construct a data frame from the list of inpute files. :param source_files: the source files containing the data. :return: a data frame. """ raise NotImplementedError
[docs]class DelimitedTableTransformer(BaseTransformer): """ A transformer that changes the input data into a delimited table. """ def __init__(self, transpose: bool = False, concat_on_axis: Union[str, int] = None, columns: List[Union[str, int]] = None, skip_errors: bool = False, rename: Union[Callable, Dict[str, str]] = None, **kwargs): """ Initialize the transformer. :param transpose: whether to transpose the resulting data. :param concat_on_axis: whether to concatenate the data along an axis. :param columns: list of column names. :param skip_errors: whether to skip errors. :param rename: a dict or function suitable for passing to the Pandas rename function. :param kwargs: keyword arguments to be passed to the reader. """ super(DelimitedTableTransformer, self).__init__( transpose, concat_on_axis, columns, skip_errors, rename, **kwargs) self.reader_kwargs = { 'comment': None, 'names': None, 'delimiter': None, 'header': 'infer', 'dtype': None, 'index_col': None, 'parse_dates': None, 'skiprows': None, 'iterator': True, 'chunksize': 50000 } self.reader_kwargs.update(self.passed_kwargs) def _build_data_frame(self, source_files: List[Path]): """ Build a data frame from a list of source files. All kwargs set at initialization are passed to the CSV reader. :param source_files: a list of source files to read data from. :return: a Pandas data frame. """ data_frames = [pd.read_csv(source_file, **self.reader_kwargs) for source_file in source_files] # for the special case where every file is a column. this assumes all data can fit into memory # TODO: replace this with dask stuff so that things can be lazily concatenated if self.concat_on_axis: df = pd.concat(data_frames, axis=self.concat_on_axis) yield df else: df_chain = chain(*data_frames) for chunk in df_chain: if self.transpose: yield chunk.transpose() else: yield chunk
[docs] def transform(self, source_files: List[Path]) -> pd.DataFrame: """ Transform the data contained in the list of source files to something else. By default simply returns the data frame consisting of the raw data. :param source_files: a list of source files. :return: a Pandas data frame. """ for df in self._build_data_frame(source_files): yield df
[docs]class JsonTableTransformer(BaseTransformer): def __init__(self, record_path: Union[List[str], str] = None, transpose: bool = False, concat_on_axis: Union[str, int] = None, columns: List[Union[str, int]] = None, skip_errors: bool = False, rename: Union[Callable, Dict[str, str]] = None, **kwargs): super(JsonTableTransformer, self).__init__( transpose, concat_on_axis, columns, skip_errors, rename, **kwargs) self.record_path = record_path self.reader_kwargs = { 'orient': None, 'typ': 'frame', 'dtype': None, 'convert_axes': None, 'convert_dates': True, 'keep_default_dates': True, 'precise_float': False, 'date_unit': None, 'encoding': None, 'lines': False, 'chunksize': None, 'compression': 'infer', 'nrows': None, 'storage_options': None } self.reader_kwargs.update(self.passed_kwargs) @staticmethod def _extract_data(filename: Union[Path, str], record_path: Union[List[str], str], serialize: bool = True) -> Union[dict, list, str]: with open(filename, 'r') as f: data: dict = json.load(f) if type(record_path) is str: if serialize: return json.dumps(data[record_path]) else: return data[record_path] elif type(record_path) is list: for item in record_path: data = data[item] if serialize: return json.dumps(data) else: return data else: raise TypeError('record_path must be a list or a string') def _build_data_frame(self, source_files: List[Path]) -> pd.DataFrame: # we're assuming any single json file can fit into memory here because we need to be able to # access its internals to extract data from it for source_file in source_files: try: if not self.record_path: df = pd.read_json(source_file, **self.reader_kwargs) df._source_file = source_file else: data = self._extract_data(source_file, self.record_path) df = pd.read_json(data, **self.reader_kwargs) df._source_file = source_file yield df.transpose() if self.transpose else df except Exception as ex: if self.skip_errors: print(f'skipping {source_file} due to error: {ex}') yield pd.DataFrame() else: raise ex
[docs] def transform(self, source_files: List[Path]) -> pd.DataFrame: # TODO: move the renaming logic to the base and allow a mapper to be passed for df in self._build_data_frame(source_files): if not df.empty: if self.rename: df = df.rename(self.rename, axis='columns') if self.columns: df = df[self.columns] yield df