Skip to content

Commit

Permalink
Merge pull request #3081 from redpanda-data/cdc_replication
Browse files Browse the repository at this point in the history
examples: add a postgres replication example using CDC
  • Loading branch information
rockwotj authored Dec 17, 2024
2 parents 884e46b + 592f22b commit e63612a
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions config/examples/cdc_replication.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
input:
postgres_cdc:
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
include_transaction_markers: true
slot_name: test_slot_native_decoder
stream_snapshot: true
schema: public
tables: [my_src_table]
# Group by transaction, each message batch is all rows changed in a transaction
# this might be massive, but might be required for foreign key constraints
batching:
check: '@operation == "commit"'
period: 10s
processors:
# But drop the placeholder messages for start/end transaction
- mapping: |
root = if @operation == "begin" || @operation == "commit" {
deleted()
} else {
this
}
output:
# Dispatch the write based on the operation metadata
switch:
strict_mode: true
cases:
- check: '@operation == "read" || @operation == "insert"'
output:
sql_insert:
driver: postgres
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
table: my_dst_table
columns: [id, foo, bar]
args_mapping: root = [this.id, this.foo, this.bar]
init_statement: |
CREATE TABLE IF NOT EXISTS my_dst_table (
id serial PRIMARY KEY,
foo text,
bar timestamp
);
- check: '@operation == "update"'
output:
sql_raw:
driver: postgres
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
query: UPDATE my_dst_table SET foo = $1, bar = $2 WHERE id = $3
args_mapping: root = [this.foo, this.bar, this.id]
- check: '@operation == "delete"'
output:
sql_raw:
driver: postgres
dsn: postgres://me:foobar@localhost:5432?sslmode=disable
query: DELETE FROM my_dst_table WHERE id = $1
args_mapping: root = [this.id]

0 comments on commit e63612a

Please sign in to comment.