Welcome to the PostgreSQL Logical Replication Streaming Plugin for Benthos! This plugin allows you to seamlessly stream data changes from your PostgreSQL database using Benthos, a versatile stream processor.
-
Real-time Data Streaming: Capture data changes in real-time as they happen in your PostgreSQL database.
-
Flexible Configuration: Easily configure the plugin to specify the database connection details, replication slot, and table filtering rules.
-
Checkpoints: Store your replication consuming progress in Redis
Before you begin, make sure you have the following prerequisites:
-
Benthos: Required to import into your Golang code
-
PostgreSQL: Ensure you have a PostgreSQL database instance that supports logical replication.
To get started you have to run benthos with custom plugins. Since this plugin is not adopted by benthos itself you have to create a new benthos build with plugin registered
package main
import (
"github.com/Jeffail/benthos/v3/lib/service"
// import pg_stream plugins
_ "github.com/usedatabrew/pg_stream_benthos/pg_stream_schemaless"
_ "github.com/usedatabrew/pg_stream_benthos/pg_stream"
)
func main() {
// here we initialize benthos
service.Run()
}
input:
label: postgres_cdc_input
# register new plugin
pg_stream:
host: datbase hoat
slot_name: reqplication slot name
user: postgres username with replication permissions
password: password
port: 5432
schema: schema you want to replicate tables from
stream_snapshot: set true if you want to stream existing data. If set to false only a new data will be streamed
database: name of the database
checkpoint_storage: redis uri if you want to store checkpoints
tables: ## list of tables you want to replicate
- table_name
By default, plugins exports raw wal2json
message. If you want to receive your data as json structure
without metadata to transform it with benthos - you can register pg_stream_schemaless
plugin to transform it
pipeline:
processors:
- label: pretty_changes_processor
pg_stream_schemaless: { }