Skip to content

vigimite/aqueducts

Repository files navigation

Aqueducts

Build status Crates.io Documentation

Aqueducts is a framework to write and execute ETL data pipelines declaratively.

Features:

  • Define ETL pipelines in YAML
  • Extract data from csv files, JSONL, parquet files or delta tables
  • Process data using SQL
  • Load data into object stores as csv/parquet or delta tables
  • Support for file and delta table partitioning
  • Support for Upsert/Replace/Append operation on delta tables
  • Support for Local, S3, GCS and Azure Blob storage
  • EXPERIMENTAL Support for ODBC Sources and Destinations

This framework builds on the fantastic work done by projects such as:

Please show these projects some support ❤️!

Documentation

You can find the docs at https://vigimite.github.io/aqueducts

Change log: CHANGELOG

Quick start

To define and execute an Aqueduct pipeline there are a couple of options

  • using a yaml configuration file
  • using a json configuration file
  • manually in code

You can check out some examples in the examples directory. Here is a simple example defining an Aqueduct pipeline using the yaml config format link:

sources:
  # Register a local file source containing temperature readings for various cities
  - type: File
    name: temp_readings
    file_type:
      type: Csv
      options: {}
    # use built-in templating functionality
    location: ./examples/temp_readings_${month}_${year}.csv

  #Register a local file source containing a mapping between location_ids and location names
  - type: File
    name: locations
    file_type:
      type: Csv
      options: {}
    location: ./examples/location_dict.csv

stages:
  # Query to aggregate temperature data by date and location
  - - name: aggregated
      query: >
          SELECT
            cast(timestamp as date) date,
            location_id,
            round(min(temperature_c),2) min_temp_c,
            round(min(humidity),2) min_humidity,
            round(max(temperature_c),2) max_temp_c,
            round(max(humidity),2) max_humidity,
            round(avg(temperature_c),2) avg_temp_c,
            round(avg(humidity),2) avg_humidity
          FROM temp_readings
          GROUP by 1,2
          ORDER by 1 asc
      # print the query plan to stdout for debugging purposes
      explain: true

  # Enrich aggregation with the location name
  - - name: enriched
      query: >
        SELECT
          date,
          location_name,
          min_temp_c,
          max_temp_c,
          avg_temp_c,
          min_humidity,
          max_humidity,
          avg_humidity
        FROM aggregated
        JOIN locations 
          ON aggregated.location_id = locations.location_id
        ORDER BY date, location_name
      # print 10 rows to stdout for debugging purposes
      show: 10

# Write the pipeline result to a parquet file (can be omitted if you don't want an output)
destination:
  type: File
  name: results
  file_type:
    type: Parquet
    options: {}
  location: ./examples/output_${month}_${year}.parquet

This repository contains a minimal example implementation of the Aqueducts framework which can be used to test out pipeline definitions like the one above:

cargo install aqueducts-cli
aqueducts --file examples/aqueduct_pipeline_example.yml --param year=2024 --param month=jan

Roadmap

  • Docs
  • ODBC source
  • ODBC destination
  • Parallel processing of stages
  • Streaming Source (initially kafka + maybe aws kinesis)
  • Streaming destination (initially kafka)