Skip to content

Commit

Permalink
review feedback from Craig
Browse files Browse the repository at this point in the history
  • Loading branch information
rockwotj committed Dec 17, 2024
1 parent 7a1dfce commit 49021f2
Showing 1 changed file with 5 additions and 9 deletions.
14 changes: 5 additions & 9 deletions config/examples/stateful_polling.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ pipeline:
dsn: "postgres://me:foobar@localhost:5432"
table: my_table
columns: ["*"]
suffix: 'ORDER BY id ASC'
where: 'id > ?'
args_mapping: root = [this.id]
# Break each row from the sql_select into it's own message within
# a single batch.
- unarchive:
format: json_array
# TODO: Insert your actual pipeline starting here
- mapping:
root.fetched = this

output:
broker:
Expand All @@ -41,12 +40,9 @@ output:
# It's important that the last thing we do is save our state
# This allows at-least-once delivery semantics.
- processors:
# *Assuming* your processing is in order and the output is atomic
# then we only need to save the ID of the last message in the cache.
- select_parts:
parts: [-1]
# We only need to save the max ID of the batch in the cache.
- mapping: |
root.id = this.fetched.id
root.id = json('id').from_all().max()
cache:
target: cached_pgstate
key: table_cursor
Expand All @@ -73,8 +69,8 @@ cache_resources:
set_suffix: ON CONFLICT(key) DO UPDATE SET val=excluded.val
init_statement: |
CREATE TABLE IF NOT EXISTS redpanda_connect_state (
key bytea PRIMARY KEY,
val bytea
key varchar(64) PRIMARY KEY,
val jsonb
);
Expand Down

0 comments on commit 49021f2

Please sign in to comment.