Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LSN now visible in json #1

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*.o
*.so
53 changes: 3 additions & 50 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,55 +1,8 @@
# contrib/wal2json/Makefile
MODULES = wal2json

MODULE_big = wal2json
OBJS = wal2json.o
REGRESS = basic
REGRESS_OPTS = --temp-config=./logical.conf

# Note: because we don't tell the Makefile there are any regression tests,
# we have to clean those result files explicitly
EXTRA_CLEAN = -r $(pg_regress_clean_files)

ifdef USE_PGXS
PG_CONFIG = pg_config
PGXS := $(shell $(PG_CONFIG) --pgxs)
include $(PGXS)
else
subdir = contrib/wal2json
top_builddir = ../..
include $(top_builddir)/src/Makefile.global
include $(top_srcdir)/contrib/contrib-global.mk
endif

# Disabled because these tests require "wal_level=logical", which
# typical installcheck users do not have (e.g. buildfarm clients).
installcheck:;

# But it can nonetheless be very helpful to run tests on preexisting
# installation, allow to do so, but only if requested explicitly.
installcheck-force: regresscheck-install-force

check: regresscheck

submake-regress:
$(MAKE) -C $(top_builddir)/src/test/regress all

submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding

REGRESSCHECKS=insert1 update1 update2 update3 update4 delete1 delete2 delete3 delete4 \
savepoint specialvalue toast bytea

regresscheck: all | submake-regress submake-test_decoding
$(pg_regress_check) \
--temp-config $(top_srcdir)/contrib/test_decoding/logical.conf \
--temp-install=./tmp_check \
--extra-install=contrib/wal2json \
--extra-install=contrib/test_decoding \
$(REGRESSCHECKS)

regresscheck-install-force: | submake-regress submake-test_decoding
$(pg_regress_installcheck) \
--extra-install=contrib/wal2json \
--extra-install=contrib/test_decoding \
$(REGRESSCHECKS)

PHONY: check submake-regress submake-test_decoding \
regresscheck regresscheck-install-force
36 changes: 34 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,36 @@
wal2json
========
# wal2json

JSON output plugin for changeset extraction

### Forked

This is a fork of the great utility at https://github.com/eulerto/wal2json

I take no credit for the C code. Just a little packaging.

Unfortunately, the source repository didn't make it into the postgres mainline source. Which makes it a bit tricky to compile with this makefile. I updated the makefile, so it works.


### Install Steps

(On Debian)

```
# You need postgres 9.4 installed on your machine, you probably have it already
sudo apt-get install postgres-9.4
sudo apt-get install postgres-contrib-9.4

# to compile this code, you need the following
sudo apt-get install build-essential
sudo apt-get build-dep postgresql-9.4
sudo apt-get install postgres-server-dev-9.4

# now go into the directory and...
make
sudo make install
```


### Logical Replication

You need to know quite a few things about logical replication to make use of this. If this is new to you, please check out [the official postgres documentation](http://www.postgresql.org/docs/9.4/static/logicaldecoding-example.html). You can replace 'test_decoding' with 'wal2json' to see the magic of this plugin.
2 changes: 2 additions & 0 deletions logical.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
wal_level = logical
max_replication_slots = 1
32 changes: 31 additions & 1 deletion wal2json.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
#include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
Expand All @@ -40,6 +41,7 @@ typedef struct
{
MemoryContext context;
bool include_xids; /* include transaction ids */
bool include_lsn; /* include lsn for safer syncing */
bool include_timestamp; /* include transaction timestamp */
bool include_schemas; /* qualify tables */
bool include_types; /* include data types */
Expand Down Expand Up @@ -91,6 +93,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
data->include_xids = true;
data->include_lsn = true;
data->include_timestamp = false;
data->include_schemas = true;
data->include_types = true;
Expand Down Expand Up @@ -160,6 +163,20 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else if (strcmp(elem->defname, "include-lsn") == 0)
{
/* If option does not provide a value, it means its value is true */
if (elem->arg == NULL)
{
elog(LOG, "include-lsn argument is null");
data->include_xids = true;
}
else if (!parse_bool(strVal(elem->arg), &data->include_lsn))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
else
{
elog(WARNING, "option %s = %s is unknown",
Expand Down Expand Up @@ -194,6 +211,12 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
if (data->include_xids)
appendStringInfo(ctx->out, "\t\"xid\": %u,\n", txn->xid);

if (data->include_lsn) {
char *lsn_str = DatumGetCString(DirectFunctionCall1(pg_lsn_out, ctx->write_location));
appendStringInfo(ctx->out, "\t\"start_lsn\": \"%s\",\n", lsn_str);
pfree(lsn_str);
}

if (data->include_timestamp)
appendStringInfo(ctx->out, "\t\"timestamp\": \"%s\",\n", timestamptz_to_str(txn->commit_time));

Expand All @@ -219,7 +242,14 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
/* Transaction ends */
OutputPluginPrepareWrite(ctx, true);

appendStringInfoString(ctx->out, "\t]\n}");
/* Make sure the commas and new lines work with and without include_lsn */
appendStringInfoString(ctx->out, "\t]");
if (data->include_lsn) {
char *lsn_str = DatumGetCString(DirectFunctionCall1(pg_lsn_out, commit_lsn));
appendStringInfo(ctx->out, ",\n\t\"end_lsn\": \"%s\"", lsn_str);
pfree(lsn_str);
}
appendStringInfoString(ctx->out, "\n}");

OutputPluginWrite(ctx, true);
}
Expand Down