Usage

kETL is a library of components intended to execute the different stages of the ETL pipeline. These components are the extractors, transformers, and loaders. Additionally, there is a Pipeline class which provides a convenience logic for chaining complicated ETL configurations together.

Philosophy

The idea behind kETL is to turn the ETL process into a configurable set of building blocks that can be flexibly reused as needed. Rather than writing custom functions for every sort of extraction and transformation logic under the sun, kETL strictly enforces a separation of concerns, demanding that you subclass and override different functions provided to achieve the desired goal.

In particular, kETL’s data model emphasizes not repeating data fetches if the existing data has not changed. To do that, kETL keeps track of the hashes of the files it has downloaded and will only redownload them if forced to or if the statistics of the file (e.g. size) have changed. kETL can also redownload a file if a certain configurable amount of time has passed, under the assumption that the contents of that file have changed.

Example

Suppose I want to get a bunch of data from the Census Bureau. Here’s a worked example of how I might do that. First, we need to configure the API.

from ketl.db import models
from ketl.extractor import DefaultExtractor
from ketl.transformer import DelimitedTableTransformer
from ketl.loader import DelimitedFileLoader, DatabaseLoader
from ketl.utils.db_utils import get_or_create

class CensusAPI(models.API):

    def setup():

        source, _ = get_or_create(Source,
            url='http://www2.census.gov/programs-surveys/bds/tables/time-series/',
            data_dir='downloads',
            api_config_id=self.id)

        cached_file, _ = get_or_create(CachedFile,
            url='bds2018.csv',
            expected_mode=models.ExpectedMode.self)

This is the entirety of the setup to grab the economy-wide dataset from the business dynamics page of the Census Bureau. A few things merit mention:

  • get_or_create is a utility function which returns a database instance with the supplied parameters, or creates one if such an instance does not exist. It also returns whether or not the instance was created, which can be ignored unless you need to know this.

  • If the ExpectedFile is equivalent to the CachedFile which is to be downloaded, the expected_mode argument of the CachedFile constructor can be set to ExpectedMode.success. This will automatically create an ExpectedFile with identical information to the downloaded file.

  • Although we have here defined the configuration statically, we need not do so as long as we have some place to begin. For example, it would be just as correct to scrape the public-facing website of the Census Bureau and generate the CachedFile entries dynamically that way. In fact, that would be a paradigmatic usage of kETL.

Once the API is configured, we are off to the races:

api = models.API.get_instance(CensusAPI)
api.setup()
extractor = DefaultExtractor(api)
files = extractor.extract()

The extractor will now fetch all the entries defined by the setup function above and put them in the download directory relative to the current working directory. Easy! Note the use of the static get_instance convenience function, which will return an instance of the CensusAPI, or create one if one does not exist in the database. API instances must have unique names; see the data model section below for details.

We can now transform the CSV data into something else. For the time being we will simply turn it into a Parquet file with all the same data.

transformer = DelimitedTableTransformer()
loader = DatabaseLoader('bds_economy')
for df in transformer.transform(files):
    loader.load(df)

This will incrementally process the data in the downloaded files and load it into whatever database you have configured. Now the data is yours to do with as you please.

The transformers and loaders all include additional options that can be read in the docstrings of the specific classes. Note that the DatabaseLoader is agnostic to the underlying data model of the passed data frame; it leaves it to Pandas to convert the data into the proper SQL.

Data model

At the heart of kETL is a data model consisting of APIs, Sources, CachedFiles, and ExpectedFiles. Their functions are described below.

API

The API class is the basic unit of configuration around which everything revolves. The API has one setup method that must be executed before it can be used; this method should be used to configure the rest of the data that is to be fetched. API must be subclassed and the setup method overridden by the user. The API may optionally be given a name, though if one is not given, the API will use the name of the class itself. Note that only one API of a given name may exist in a project.

Source

The Source class represents some actual location of data nested under an API. The purpose of the Source is twofold: to configure the base location of the data and to configure where on disk the data should be placed. The Source itself does not actually configure any files to be extracted, it merely provides a structure for their organization.

CachedFile

The CachedFile class represents an actual file to be downloaded from somewhere. The location of the file may be either an FTP server or any location that is accessible by smart_open. Note that the URL parameter of CachedFile should be specified relative to the URL of its source. In other words, if the Source has the base_url of https://path/to/some then to retrieve the a file under this hierarchy we would create a CachedFile whose url is file; this will be joined with the URL of the source to produce the actual resource to be retrieved. Similarly, the path of CachedFile should be relative to the data_dir of the parent Source.

ExpectedFile

The ExpectedFile class reflects files that actually appear on disk. For example, the CachedFile may represent an archive that might need to be decompressed somewhere; the ExpectedFile might represent a file that is actually present within the archive. It is the ExpectedFile s that represent the data that is actuall to be processed by Transformer s.

Functional Components

Extractors

The Extractor class is the direct link between the data model and the actual ETL operations. The job of the Extractor is to actually fetch the CachedFile entries from wherever they happen to reside. The initializer of the Extractor takes an API instance as an argument. Assuming setup has been called on the API, the Extractor can then be run with the extract function and will download the specified data.

Transformers

The job of the Transformer is to take the ExpectedFiles generated by the Extractor and transform them in some way. There are two default transformers that are part of kETL: the DelimitedTableTransformer and the JsonTableTransformer. All Transformer subclasses of the BaseTransformer parent class must implement the transform method, but they may also override any of the other methods as needed. The transformers must produce a Pandas data frame, which is then passed to the loader.

Loaders

The Loader class is responsible for the final stage of the pipeline: putting the data somewhere, either on disk or to a database. The DelimitedFileLoader writes a data frame to disk a CSV file, and the DatabaseLoader loads the data into a database table. Any database that can be interfaced with via SQLalchemy should work fine.