Skip to content

Commit

Permalink
Add messages by prefix
Browse files Browse the repository at this point in the history
The new parameter 'add-msg-prefixes' adds only messages with these
prefixes. It is a comma separated value. By default all messages are
included. wal2json enforces this rule after 'filter-msg-prefixes'. Per
off-list discussion with @martinmarques
  • Loading branch information
Euler Taveira committed Aug 6, 2019
1 parent 1daa0b6 commit 26774a4
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 3 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ Parameters
* `filter-tables`: exclude rows from the specified tables. Default is empty which means that no table will be filtered. It is a comma separated value. The tables should be schema-qualified. `*.foo` means table foo in all schemas and `bar.*` means all tables in schema bar. Special characters (space, single quote, comma, period, asterisk) must be escaped with backslash. Schema and table are case-sensitive. Table `"public"."Foo bar"` should be specified as `public.Foo\ bar`.
* `add-tables`: include only rows from the specified tables. Default is all tables from all schemas. It has the same rules from `filter-tables`.
* `filter-msg-prefixes`: exclude messages if prefix is in the list. Default is empty which means that no message will be filtered. It is a comma separated value.
* `add-msg-prefixes`: include only messages if prefix is in the list. Default is all prefixes. It is a comma separated value. `wal2json` applies `filter-msg-prefixes` before this parameter.
* `format-version`: defines which format to use. Default is _1_.

Examples
Expand Down
30 changes: 28 additions & 2 deletions expected/message.out
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,19 @@ SELECT 'msg11' FROM pg_logical_emit_message(true, 'filtered', 'this message will
msg11
(1 row)

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'filter-msg-prefixes', 'foo, filtered, bar');
SELECT 'msg12' FROM pg_logical_emit_message(true, 'added1', 'this message will be printed');
?column?
----------
msg12
(1 row)

SELECT 'msg13' FROM pg_logical_emit_message(true, 'added2', 'this message will be filtered');
?column?
----------
msg13
(1 row)

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'filter-msg-prefixes', 'foo, filtered, bar', 'add-msg-prefixes', 'added1, added3, wal2json');
data
---------------------------------------------------------------------------------------------------------
{ +
Expand Down Expand Up @@ -170,7 +182,21 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pre
"change": [ +
] +
}
(9 rows)
{ +
"change": [ +
{ +
"kind": "message", +
"transactional": true, +
"prefix": "added1", +
"content": "this message will be printed" +
} +
] +
}
{ +
"change": [ +
] +
}
(11 rows)

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
?column?
Expand Down
4 changes: 3 additions & 1 deletion sql/message.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ SELECT 'msg10' FROM pg_logical_emit_message(true, 'wal2json', 'this is message #
COMMIT;

SELECT 'msg11' FROM pg_logical_emit_message(true, 'filtered', 'this message will be filtered');
SELECT 'msg12' FROM pg_logical_emit_message(true, 'added1', 'this message will be printed');
SELECT 'msg13' FROM pg_logical_emit_message(true, 'added2', 'this message will be filtered');

SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'filter-msg-prefixes', 'foo, filtered, bar');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'pretty-print', '1', 'filter-msg-prefixes', 'foo, filtered, bar', 'add-msg-prefixes', 'added1, added3, wal2json');

SELECT 'stop' FROM pg_drop_replication_slot('regression_slot');
46 changes: 46 additions & 0 deletions wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ typedef struct
List *filter_tables; /* filter out tables */
List *add_tables; /* add only these tables */
List *filter_msg_prefixes; /* filter by message prefixes */
List *add_msg_prefixes; /* add only messages with these prefixes */

int format_version; /* support different formats */

Expand Down Expand Up @@ -149,6 +150,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
data->include_not_null = false;
data->filter_tables = NIL;
data->filter_msg_prefixes = NIL;
data->add_msg_prefixes = NIL;

data->format_version = WAL2JSON_FORMAT_VERSION;

Expand Down Expand Up @@ -395,6 +397,29 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
pfree(rawstr);
}
}
else if (strcmp(elem->defname, "add-msg-prefixes") == 0)
{
char *rawstr;

if (elem->arg == NULL)
{
elog(DEBUG1, "add-msg-prefixes argument is null");
data->add_msg_prefixes = NIL;
}
else
{
rawstr = pstrdup(strVal(elem->arg));
if (!split_string_to_list(rawstr, ',', &data->add_msg_prefixes))
{
pfree(rawstr);
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
pfree(rawstr);
}
}
else if (strcmp(elem->defname, "format-version") == 0)
{
if (elem->arg == NULL)
Expand Down Expand Up @@ -1096,6 +1121,27 @@ pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
}
}

/* Add messages by prefix */
if (list_length(data->add_msg_prefixes) > 0)
{
ListCell *lc;
bool skip = true;

foreach(lc, data->add_msg_prefixes)
{
char *p = lfirst(lc);

if (strcmp(p, prefix) == 0)
skip = false;
}

if (skip)
{
elog(DEBUG2, "message prefix \"%s\" was skipped", prefix);
return;
}
}

/*
* write immediately iif (i) write-in-chunks=1 or (ii) non-transactional
* messages.
Expand Down

0 comments on commit 26774a4

Please sign in to comment.