Skip to content

Commit

Permalink
Merge pull request #3080 from redpanda-data/db_polling_example
Browse files Browse the repository at this point in the history
examples: add an example for polling a database
  • Loading branch information
rockwotj authored Dec 17, 2024
2 parents c6c7469 + 49021f2 commit 884e46b
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions config/examples/stateful_polling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# This example shows how to periodically poll postgres and fetch a series of records
# saving a cursor in postgres of the last poll.
input:
generate:
interval: '@every 5s'
mapping: 'root = {}'
pipeline:
processors:
# Our cron has kicked off again - let's restart from our last cached
# version.
- cache:
resource: cached_pgstate
operator: get
key: table_cursor
# get operator results in an error if not found,
# so set our default by catching the error
- catch:
- mapping: 'root.id = -1'
# Now we can do our periodic poll
- sql_select:
driver: "postgres"
dsn: "postgres://me:foobar@localhost:5432"
table: my_table
columns: ["*"]
suffix: 'ORDER BY id ASC'
where: 'id > ?'
args_mapping: root = [this.id]
# Break each row from the sql_select into it's own message within
# a single batch.
- unarchive:
format: json_array
# TODO: Insert your actual pipeline starting here

output:
broker:
# Send each processed message to each output sequentially.
pattern: fan_out_sequential
outputs:
- stdout: {} # TODO: Do your actual output
# It's important that the last thing we do is save our state
# This allows at-least-once delivery semantics.
- processors:
# We only need to save the max ID of the batch in the cache.
- mapping: |
root.id = json('id').from_all().max()
cache:
target: cached_pgstate
key: table_cursor
max_in_flight: 1

cache_resources:
# Use a multilevel cache so that only the first poll needs to
# hit postgres.
- label: cached_pgstate
multilevel: [ inmem, pgstate ]

- label: inmem
memory:
# disable TTL
compaction_interval: ''

- label: pgstate
sql:
driver: "postgres"
dsn: "postgres://me:foobar@localhost:5432"
table: redpanda_connect_state
key_column: key
value_column: val
set_suffix: ON CONFLICT(key) DO UPDATE SET val=excluded.val
init_statement: |
CREATE TABLE IF NOT EXISTS redpanda_connect_state (
key varchar(64) PRIMARY KEY,
val jsonb
);
# You can use the below configuration to generate some data into
# postgres to see the above pipeline working.

# input:
# generate:
# interval: '@every 1s'
# mapping: 'root = {"foo": uuid_v4(), "ts": now()}'
# output:
# sql_insert:
# driver: postgres
# dsn: "postgres://me:foobar@localhost:5432"
# init_statement: |
# CREATE TABLE IF NOT EXISTS my_table (
# id serial NOT NULL,
# foo text,
# ts text,
# primary key (id)
# );
# table: my_table
# columns: [foo, ts]
# args_mapping: root = [this.foo, this.ts]

0 comments on commit 884e46b

Please sign in to comment.