Setup a daily pipeline to ingest raw Order API data into a PostgreSQL db.
- Define schema with SQLAlchemy
- Pandas inference isn't great
- Currently forces analysts to deal with type conversions, especially with datetimes
- Define order summary view
Developed using Python >=3.7 for latest features like typing and dataclasses.
- Recommend using an isolated environment manager like
virtualenv
,pyenv
,pipenv
, etc.- Install dependencies:
pip install -r requirements.txt
- Install dependencies:
- To avoid hardcoding resources, the pipeline will pull the following from environment variables:
_PGSQL_HOSTNAME
- Name of the db host to connect_PGSQL_PORT
- Port number on the host_PGSQL_USER
- Username for auth_PGSQL_PASS
- Password for auth_PGSQL_DATABASE
- Default database name to use_AWS_S3_URL_SOURCE
- s3 url to source data (object URI not given)
Note, while not necessary, I use direnv
for local development to automatically export these env variables based on my current working directory. It uses a dotfile called .envrc
which is git ignored to safely store sensitive info like credentials locally.
In cloud environments, I tend to use services like Secrets Manager or HashiCorp Vault to fetch credentials at runtime or rely on default compute metadata for authentication within the cloud environment. The trade-off is that it makes the code less portable and introduces vendor lock-in.
Run using a LocalExecutor with python pipeline.py
The pipeline is scheduled to run daily if it were to be deployed to a compute environment.
Decided to use Prefect for a lightweight, Pythonic ETL tool that is both easy to run+test locally as well as deploy to a production compute environment.
- Only using the Python package for this demo, otherwise I would use Docker to run a Prefect Server to provide pgsql metadata storage, GraphQL API, web server, etc.
Started off with the intention of using the Prefect PostgreSQL ExecuteMany Task but found the lower-level odbc client and SQL string manipulation would make the pipeline more complicated. Opted to use Pandas instead but in doing so gave up control over the table schemas - I've left this out due to time, otherwise it would be well-defined with SQLAlchemy data type boilerplate.
Regarding database normalization, it could be better. There is an argument to be made that OLAP workloads are better suited to columnar data stores which work best with denormalized tables because joins are expensive as opposed to relational databases like PostgreSQL designed for OLTP.
- Load tasks should ideally be concurrent
- Append-only pipeline and index order tables by
run_date
- Keep historical record of orders
- Would need to upsert users
- Better database normalization
- Namely, split orders into fact and dimensions
- Might be some other entities that could be factored out like location and device/app
- Current implementation won't scale to larger datasets:
- Ideally can hit API directly async and store intermediate results in S3
- Avoids iterating over every file and every row within each file
- Larger data volume would benefit from parallelism i.e. Dask instead of Pandas
- Currently relies on loading data dump into memory entirely
- If able to use blob storage, can reduce memory footprint of workers
- Currently relies on loading data dump into memory entirely
- Ideally can hit API directly async and store intermediate results in S3