Usage
kETL is a library of components intended to execute the different stages of the ETL pipeline. These components are the extractors, transformers, and loaders. Additionally, there is a Pipeline class which provides a convenience logic for chaining complicated ETL configurations together.
Philosophy
The idea behind kETL is to turn the ETL process into a configurable set of building blocks that can be flexibly reused as needed. Rather than writing custom functions for every sort of extraction and transformation logic under the sun, kETL strictly enforces a separation of concerns, demanding that you subclass and override different functions provided to achieve the desired goal.
In particular, kETL’s data model emphasizes not repeating data fetches if the existing data has not changed. To do that, kETL keeps track of the hashes of the files it has downloaded and will only redownload them if forced to or if the statistics of the file (e.g. size) have changed. kETL can also redownload a file if a certain configurable amount of time has passed, under the assumption that the contents of that file have changed.
Example
Suppose I want to get a bunch of data from the Census Bureau. Here’s a worked example of how I might do that. First, we need to configure the API.
from ketl.db import models
from ketl.extractor import DefaultExtractor
from ketl.transformer import DelimitedTableTransformer
from ketl.loader import DelimitedFileLoader, DatabaseLoader
from ketl.utils.db_utils import get_or_create
class CensusAPI(models.API):
def setup():
source, _ = get_or_create(Source,
url='http://www2.census.gov/programs-surveys/bds/tables/time-series/',
data_dir='downloads',
api_config_id=self.id)
cached_file, _ = get_or_create(CachedFile,
url='bds2018.csv',
expected_mode=models.ExpectedMode.self)
This is the entirety of the setup to grab the economy-wide dataset from the business dynamics page of the Census Bureau. A few things merit mention:
get_or_createis a utility function which returns a database instance with the supplied parameters, or creates one if such an instance does not exist. It also returns whether or not the instance was created, which can be ignored unless you need to know this.If the
ExpectedFileis equivalent to the CachedFile which is to be downloaded, theexpected_modeargument of theCachedFileconstructor can be set toExpectedMode.success. This will automatically create anExpectedFilewith identical information to the downloaded file.Although we have here defined the configuration statically, we need not do so as long as we have some place to begin. For example, it would be just as correct to scrape the public-facing website of the Census Bureau and generate the
CachedFileentries dynamically that way. In fact, that would be a paradigmatic usage of kETL.
Once the API is configured, we are off to the races:
api = models.API.get_instance(CensusAPI)
api.setup()
extractor = DefaultExtractor(api)
files = extractor.extract()
The extractor will now fetch all the entries defined by the setup function above and
put them in the download directory relative to the current working directory. Easy!
Note the use of the static get_instance convenience function, which will return an
instance of the CensusAPI, or create one if one does not exist in the database. API instances
must have unique names; see the data model section below for details.
We can now transform the CSV data into something else. For the time being we will simply turn it into a Parquet file with all the same data.
transformer = DelimitedTableTransformer()
loader = DatabaseLoader('bds_economy')
for df in transformer.transform(files):
loader.load(df)
This will incrementally process the data in the downloaded files and load it into whatever database you have configured. Now the data is yours to do with as you please.
The transformers and loaders all include additional options that can be read in the docstrings
of the specific classes. Note that the DatabaseLoader is agnostic to the underlying data
model of the passed data frame; it leaves it to Pandas to convert the data into the proper
SQL.
Data model
At the heart of kETL is a data model consisting of APIs, Sources, CachedFiles, and ExpectedFiles. Their functions are described below.
API
The API class is the basic unit of configuration around which everything
revolves. The API has one setup method that must be executed before it can be used; this
method should be used to configure the rest of the data that is to be fetched. API must
be subclassed and the setup method overridden by the user. The API may optionally be
given a name, though if one is not given, the API will use the name of the class itself. Note
that only one API of a given name may exist in a project.
Source
The Source class represents some actual location of data nested under
an API. The purpose of the Source is twofold:
to configure the base location of the data and to configure where on disk the data should be
placed. The Source itself does not actually configure any files to be
extracted, it merely provides a structure for their organization.
CachedFile
The CachedFile class represents an actual file to be downloaded from
somewhere. The location of the file may be either an FTP server or any location that is
accessible by smart_open. Note that the URL parameter of CachedFile
should be specified relative to the URL of its source. In other words, if the
Source has the base_url of https://path/to/some
then to retrieve the a file under this hierarchy we would create a CachedFile
whose url is file; this will be joined with the URL of the source to produce the
actual resource to be retrieved. Similarly, the path of CachedFile
should be relative to the data_dir of the parent Source.
ExpectedFile
The ExpectedFile class reflects files that actually appear on disk.
For example, the CachedFile may represent an archive that might need to
be decompressed somewhere; the ExpectedFile might represent a file that
is actually present within the archive. It is the ExpectedFile s that
represent the data that is actuall to be processed by Transformer s.
Functional Components
Extractors
The Extractor class is the direct link between the data model and the actual ETL operations.
The job of the Extractor is to actually fetch the CachedFile entries from wherever
they happen to reside. The initializer of the Extractor takes an API instance
as an argument. Assuming setup has been called on the API, the Extractor
can then be run with the extract function and will download the specified data.
Transformers
The job of the Transformer is to take the ExpectedFiles generated by the
Extractor and transform them in some way. There are two default transformers that are
part of kETL: the DelimitedTableTransformer and the JsonTableTransformer.
All Transformer subclasses of the BaseTransformer parent class must implement
the transform method, but they may also override any of the other methods as needed.
The transformers must produce a Pandas data frame, which is then passed to the loader.
Loaders
The Loader class is responsible for the final stage of the pipeline: putting the data
somewhere, either on disk or to a database. The DelimitedFileLoader writes a data frame
to disk a CSV file, and the DatabaseLoader loads the data into
a database table. Any database that can be interfaced with via SQLalchemy should work fine.