Skip to content

PostgreSQL replication with DDL changes

License

Apache-2.0, Unknown licenses found

Licenses found

Apache-2.0
LICENSE
Unknown
license-header.txt
Notifications You must be signed in to change notification settings

xataio/pgstream

Repository files navigation

pgstream logo

License - Apache 2.0  CI Build   Go Reference  Discord   X (formerly Twitter) Follow

pgstream - Postgres replication with DDL changes

pgstream is an open source CDC command-line tool and library that offers Postgres replication support with DDL changes to any provided output.

Features

  • Schema change tracking and replication of DDL changes
  • Modular deployment configuration, only requires Postgres
  • Schema based message partitioning
  • Schema filtering
  • Elasticsearch/OpenSearch replication output plugin support
  • Webhook support
  • Automatic discovery of table primary key/unique not null columns for use as event identity
  • Highly customisable modules when used as library
  • Core metrics available via opentelemetry
  • Extendable support for custom replication output plugins
  • Continuous consumption of replication slot with configurable memory guards

Table of Contents

Usage

pgstream can be used via the readily available CLI or as a library.

CLI

Installation

Binaries

Binaries are available for Linux, macOS & Windows, check our Releases.

From source

To install pgstream from the source, run the following command:

go install github.com/xataio/pgstream@latest
From package manager - Homebrew

To install pgstream with homebrew, run the following command:

# macOS or Linux
brew tap xataio/pgstream
brew install pgstream

Environment setup

If you have an environment available, with at least Postgres and whichever module resources you're planning on running, then you can skip this step. Otherwise, a docker setup is available in this repository that starts Postgres, Kafka and OpenSearch (as well as OpenSearch dashboards for easy visualisation).

docker-compose -f build/docker/docker-compose.yml up

Prepare the database

This will create the pgstream schema in the configured Postgres database, along with the tables/functions/triggers required to keep track of the schema changes. See Tracking schema changes section for more details. It will also create a replication slot for the configured database which will be used by the pgstream service.

pgstream init --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"

If there are any issues or if you want to clean up the pgstream setup, you can run the following.

pgstream tear-down --pgurl "postgres://postgres:postgres@localhost?sslmode=disable"

This command will clean up all pgstream state.

Run pgstream

Run will require the configuration to be provided, either via environment variables, config file or a combination of both. There are some sample configuration files provided in the repo that can be used as guidelines.

Example running pgstream with Postgres -> OpenSearch:

pgstream run -c pg2os.env --log-level trace

Example running pgstream with Postgres -> Kafka, and in a separate terminal, Kafka->OpenSearch:

pgstream run -c pg2kafka.env --log-level trace
pgstream run -c kafka2os.env --log-level trace

The run command will parse the configuration provided, and initialise the configured modules. It requires at least one listener and one processor.

Configuration

Here's a list of all the environment variables that can be used to configure the individual modules, along with their descriptions and default values.

Listeners

Postgres Listener
Environment Variable Default Required Description
PGSTREAM_POSTGRES_LISTENER_URL N/A Yes URL of the Postgres database to connect to for replication purposes.
Kafka Listener
Environment Variable Default Required Description
PGSTREAM_KAFKA_SERVERS N/A Yes URLs for the Kafka servers to connect to.
PGSTREAM_KAFKA_TOPIC_NAME N/A Yes Name of the Kafka topic to read from.
PGSTREAM_KAFKA_READER_CONSUMER_GROUP_ID N/A Yes Name of the Kafka consumer group for the WAL Kafka reader.
PGSTREAM_KAFKA_READER_CONSUMER_GROUP_START_OFFSET Earliest No Kafka offset from which the consumer will start if there's no offset available for the consumer group.
PGSTREAM_KAFKA_TLS_ENABLED False No Enable TLS connection to the Kafka servers.
PGSTREAM_KAFKA_TLS_CA_CERT_FILE "" When TLS enabled Path to the CA PEM certificate to use for Kafka TLS authentication.
PGSTREAM_KAFKA_TLS_CLIENT_CERT_FILE "" No Path to the client PEM certificate to use for Kafka TLS client authentication.
PGSTREAM_KAFKA_TLS_CLIENT_KEY_FILE "" No Path to the client PEM private key to use for Kafka TLS client authentication.
PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_INITIAL_INTERVAL 0 No Initial interval for the exponential backoff policy to be applied to the Kafka commit retries.
PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_MAX_INTERVAL 0 No Max interval for the exponential backoff policy to be applied to the Kafka commit retries.
PGSTREAM_KAFKA_COMMIT_EXP_BACKOFF_MAX_RETRIES 0 No Max retries for the exponential backoff policy to be applied to the Kafka commit retries.
PGSTREAM_KAFKA_COMMIT_BACKOFF_INTERVAL 0 No Constant interval for the backoff policy to be applied to the Kafka commit retries.
PGSTREAM_KAFKA_COMMIT_BACKOFF_MAX_RETRIES 0 No Max retries for the backoff policy to be applied to the Kafka commit retries.

One of exponential/constant backoff policies can be provided for the Kafka committing retry strategy. If none is provided, no retries apply.

Processors

Kafka Batch Writer
Environment Variable Default Required Description
PGSTREAM_KAFKA_SERVERS N/A Yes URLs for the Kafka servers to connect to.
PGSTREAM_KAFKA_TOPIC_NAME N/A Yes Name of the Kafka topic to write to.
PGSTREAM_KAFKA_TOPIC_PARTITIONS 1 No Number of partitions created for the Kafka topic if auto create is enabled.
PGSTREAM_KAFKA_TOPIC_REPLICATION_FACTOR 1 No Replication factor used when creating the Kafka topic if auto create is enabled.
PGSTREAM_KAFKA_TOPIC_AUTO_CREATE False No Auto creation of configured Kafka topic if it doesn't exist.
PGSTREAM_KAFKA_TLS_ENABLED False No Enable TLS connection to the Kafka servers.
PGSTREAM_KAFKA_TLS_CA_CERT_FILE "" When TLS enabled Path to the CA PEM certificate to use for Kafka TLS authentication.
PGSTREAM_KAFKA_TLS_CLIENT_CERT_FILE "" No Path to the client PEM certificate to use for Kafka TLS client authentication.
PGSTREAM_KAFKA_TLS_CLIENT_KEY_FILE "" No Path to the client PEM private key to use for Kafka TLS client authentication.
PGSTREAM_KAFKA_WRITER_BATCH_TIMEOUT 1s No Max time interval at which the batch sending to Kafka is triggered.
PGSTREAM_KAFKA_WRITER_BATCH_BYTES 1572864 No Max size in bytes for a given batch. When this size is reached, the batch is sent to Kafka.
PGSTREAM_KAFKA_WRITER_BATCH_SIZE 100 No Max number of messages to be sent per batch. When this size is reached, the batch is sent to Kafka.
PGSTREAM_KAFKA_WRITER_MAX_QUEUE_BYTES 100MiB No Max memory used by the Kafka batch writer for inflight batches.
Search Batch Indexer

| Environment Variable | Default | Required | Description | | -------------------------------------------------- | ------- | -------- | -------------------------------------------------------------------------------------------------------------- | --- | | PGSTREAM_OPENSEARCH_STORE_URL | N/A | Yes | URL for the opensearch store to connect to (at least one of the URLs must be provided). | | PGSTREAM_ELASTICSEARCH_STORE_URL | N/A | Yes | URL for the elasticsearch store to connect to (at least one of the URLs must be provided). | | PGSTREAM_SEARCH_INDEXER_BATCH_TIMEOUT | 1s | No | Max time interval at which the batch sending to the search store is triggered. | | PGSTREAM_SEARCH_INDEXER_BATCH_SIZE | 100 | No | Max number of messages to be sent per batch. When this size is reached, the batch is sent to the search store. | | PGSTREAM_SEARCH_INDEXER_MAX_QUEUE_BYTES | 100MiB | No | Max memory used by the search batch indexer for inflight batches. | | | PGSTREAM_SEARCH_STORE_EXP_BACKOFF_INITIAL_INTERVAL | 1s | No | Initial interval for the exponential backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_INTERVAL | 1min | No | Max interval for the exponential backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_EXP_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the exponential backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_BACKOFF_INTERVAL | 0 | No | Constant interval for the backoff policy to be applied to the search store operation retries. | | PGSTREAM_SEARCH_STORE_BACKOFF_MAX_RETRIES | 0 | No | Max retries for the backoff policy to be applied to the search store operation retries. |

One of exponential/constant backoff policies can be provided for the search indexer cleanup retry strategy. If none is provided, no retries apply.

One of exponential/constant backoff policies can be provided for the search store retry strategy. If none is provided, a default exponential backoff policy applies.

Webhook Notifier
Environment Variable Default Required Description
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_URL N/A Yes URL for the webhook subscription store to connect to.
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_ENABLED False No Caching applied to the subscription store retrieval queries.
PGSTREAM_WEBHOOK_SUBSCRIPTION_STORE_CACHE_REFRESH_INTERVAL 60s When cache enabled Interval at which the subscription store cache will be refreshed. Indicates max cache staleness.
PGSTREAM_WEBHOOK_NOTIFIER_MAX_QUEUE_BYTES 100MiB No Max memory used by the webhook notifier for inflight notifications.
PGSTREAM_WEBHOOK_NOTIFIER_WORKER_COUNT 10 No Max number of concurrent workers that will send webhook notifications for a given WAL event.
PGSTREAM_WEBHOOK_NOTIFIER_CLIENT_TIMEOUT 10s No Max time the notifier will wait for a response from a webhook URL before timing out.
PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_ADDRESS ":9900" No Address for the subscription server to listen on.
PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_READ_TIMEOUT 5s No Max duration for reading an entire server request, including the body before timing out.
PGSTREAM_WEBHOOK_SUBSCRIPTION_SERVER_WRITE_TIMEOUT 10s No Max duration before timing out writes of the response. It is reset whenever a new request's header is read.
Translator
Environment Variable Default Required Description
PGSTREAM_TRANSLATOR_STORE_POSTGRES_URL N/A Yes URL for the postgres URL where the schema log table is stored.

Tracking schema changes

One of the main differentiators of pgstream is the fact that it tracks and replicates schema changes automatically. It relies on SQL triggers that will populate a Postgres table (pgstream.schema_log) containing a history log of all DDL changes for a given schema. Whenever a schema change occurs, this trigger creates a new row in the schema log table with the schema encoded as a JSON value. This table tracks all the schema changes, forming a linearised change log that is then parsed and used within the pgstream pipeline to identify modifications and push the relevant changes downstream.

The detailed SQL used can be found in the migrations folder.

The schema and data changes are part of the same linear stream - the downstream consumers always observe the schema changes as soon as they happen, before any data arrives that relies on the new schema. This prevents data loss and manual intervention.

Architecture

pgstream is constructed as a streaming pipeline, where data from one module streams into the next, eventually reaching the configured output plugins. It keeps track of schema changes and replicates them along with the data changes to ensure a consistent view of the source data downstream. This modular approach makes adding and integrating output plugin implementations simple and painless.

pgstream architecture v1

At a high level the implementation is split into WAL listeners and WAL processors.

WAL Listener

A listener is anything that listens for WAL data, regardless of the source. It has a single responsibility: consume and manage the WAL events, delegating the processing of those entries to modules that form the processing pipeline. Depending on the listener implementation, it might be required to also have a checkpointer to flag the events as processed once the processor is done.

There are currently two implementations of the listener:

  • Postgres listener: listens to WAL events directly from the replication slot. Since the WAL replication slot is sequential, the Postgres WAL listener is limited to run as a single process. The associated Postgres checkpointer will sync the LSN so that the replication lag doesn't grow indefinitely.

  • Kafka reader: reads WAL events from a Kafka topic. It can be configured to run concurrently by using partitions and Kafka consumer groups, applying a fan-out strategy to the WAL events. The data will be partitioned by database schema by default, but can be configured when using pgstream as a library. The associated Kafka checkpointer will commit the message offsets per topic/partition so that the consumer group doesn't process the same message twice.

WAL Processor

A processor processes a WAL event. Depending on the implementation it might also be required to checkpoint the event once it's done processing it as described above.

There are currently two implementations of the processor:

  • Kafka batch writer: it writes the WAL events into a Kafka topic, using the event schema as the Kafka key for partitioning. This implementation allows to fan-out the sequential WAL events, while acting as an intermediate buffer to avoid the replication slot to grow when there are slow consumers. It has a memory guarded buffering system internally to limit the memory usage of the buffer. The buffer is sent to Kafka based on the configured linger time and maximum size. It treats both data and schema events equally, since it doesn't care about the content.

  • Search batch indexer: it indexes the WAL events into an OpenSearch/Elasticsearch compatible search store. It implements the same kind of mechanism than the Kafka batch writer to ensure continuous processing from the listener, and it also uses a batching mechanism to minimise search store calls. The search mapping logic is configurable when used as a library. The WAL event identity is used as the search store document id, and if no other version is provided, the LSN is used as the document version. Events that do not have an identity are not indexed. Schema events are stored in a separate search store index (pgstream), where the schema log history is kept for use within the search store (i.e, read queries).

  • Webhook notifier: it sends a notification to any webhooks that have subscribed to the relevant wal event. It relies on a subscription HTTP server receiving the subscription requests and storing them in the shared subscription store which is accessed whenever a wal event is processed. It sends the notifications to the different subscribed webhook urls in parallel based on a configurable number of workers (client timeouts apply). Similar to the two previous processor implementations, it uses a memory guarded buffering system internally, which allows to separate the wal event processing from the webhook url sending, optimising the processor latency.

In addition to the implementations described above, there's an optional processor decorator, the translator, that injects some of the pgstream logic into the WAL event. This includes:

  • Data events:

    • Setting the WAL event identity. If provided, it will use the configured id finder (only available when used as a library), otherwise it will default to using the table primary key/unique not null column.
    • Setting the WAL event version. If provided, it will use the configured version finder (only available when used as a library), otherwise it will default to using the event LSN.
    • Adding pgstream IDs to all columns. This allows us to have a constant identifier for a column, so that if there are renames the column id doesn't change. This is particularly helpful for the search store, where a rename would require a reindex, which can be costly depending on the data.
  • Schema events:

    • Acknolwedging the new incoming schema in the Postgres pgstream.schema_log table.

Limitations

Some of the limitations of the initial release include:

  • Single Kafka topic support
  • Postgres plugin support limited to wal2json
  • Data filtering limited to schema level
  • No initial/automatic data backfill
  • Primary key/unique not null column required for replication
  • Kafka serialisation support limited to JSON

Glossary

  • CDC: Change Data Capture
  • WAL: Write Ahead Logging
  • LSN: Log Sequence Number
  • DDL: Data Definition Language

Contributing

We welcome contributions from the community! If you'd like to contribute to pgstream, please follow these guidelines:

  • Create an issue for any questions, bug reports, or feature requests.
  • Check the documentation and existing issues before opening a new issue.

Contributing Code

  1. Fork the repository.
  2. Create a new branch for your feature or bug fix.
  3. Make your changes and write tests if applicable.
  4. Ensure your code passes linting and tests.
    • There's a pre-commit configuration available on the root directory (.pre-commit-config.yaml), which can be used to validate the CI checks locally.
  5. Submit a pull request.

For this project, we pledge to act and interact in ways that contribute to an open, welcoming, diverse, inclusive, and healthy community.

License

This project is licensed under the Apache License 2.0 - see the LICENSE file for details.

Support

If you have any questions, encounter issues, or need assistance, open an issue in this repository our join our Discord, and our community will be happy to help.


Made with ❤️ by Xata 🦋