Skip to content

Commit

Permalink
Stop logical decoding by get CopyDone
Browse files Browse the repository at this point in the history
Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
pgjdbc/pgjdbc#550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
  • Loading branch information
Vladimir Gordiychuk committed May 6, 2016
1 parent a712487 commit a275206
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 25 deletions.
15 changes: 10 additions & 5 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
LogicalOutputPluginWriterWrite do_write,
ReorderBufferIsActive is_active)
{
ReplicationSlot *slot;
MemoryContext context,
Expand Down Expand Up @@ -182,6 +183,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->reorder->apply_change = change_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
ctx->reorder->is_active = is_active;

ctx->out = makeStringInfo();
ctx->prepare_write = prepare_write;
Expand Down Expand Up @@ -214,7 +216,8 @@ CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
LogicalOutputPluginWriterWrite do_write,
LogicalDecondingContextIsActive is_active)
{
TransactionId xmin_horizon = InvalidTransactionId;
ReplicationSlot *slot;
Expand Down Expand Up @@ -290,7 +293,7 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();

ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
read_page, prepare_write, do_write);
read_page, prepare_write, do_write, is_active);

/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
Expand Down Expand Up @@ -330,7 +333,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
LogicalOutputPluginWriterWrite do_write,
LogicalDecondingContextIsActive is_active)
{
LogicalDecodingContext *ctx;
ReplicationSlot *slot;
Expand Down Expand Up @@ -380,7 +384,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,

ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId,
read_page, prepare_write, do_write);
read_page, prepare_write, do_write,
is_active);

/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
Expand Down
15 changes: 14 additions & 1 deletion src/backend/replication/logical/logicalfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi
p->returned_rows++;
}

/**
* Stab function that necessary for LogicalDecondign context.
* Function always return true and it means that decoding WALs
* can't be interrupt in contrast of logical replication.
*/
static bool
LogicalContextAlwaysActive(void)
{
return true;
}

static void
check_permissions(void)
{
Expand Down Expand Up @@ -243,7 +254,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
options,
logical_read_local_xlog_page,
LogicalOutputPrepareWrite,
LogicalOutputWrite);
LogicalOutputWrite,
LogicalContextAlwaysActive /* converting to Datum(sql api) can't be interrupted in contrast of replication*/
);

MemoryContextSwitchTo(oldcontext);

Expand Down
9 changes: 6 additions & 3 deletions src/backend/replication/logical/reorderbuffer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
rb->begin(rb, txn);

iterstate = ReorderBufferIterTXNInit(rb, txn);
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->is_active())
{
Relation relation = NULL;
Oid reloid;
Expand Down Expand Up @@ -1646,8 +1646,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
ReorderBufferIterTXNFinish(rb, iterstate);
iterstate = NULL;

/* call commit callback */
rb->commit(rb, txn, commit_lsn);
if(rb->is_active())
{
/* call commit callback */
rb->commit(rb, txn, commit_lsn);
}

/* this is just a sanity check against bad output plugin behaviour */
if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
Expand Down
2 changes: 1 addition & 1 deletion src/backend/replication/slotfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
*/
ctx = CreateInitDecodingContext(
NameStr(*plugin), NIL,
logical_read_local_xlog_page, NULL, NULL);
logical_read_local_xlog_page, NULL, NULL, NULL);

/* build initial snapshot, might take a while */
DecodingContextFindStartpoint(ctx);
Expand Down
38 changes: 25 additions & 13 deletions src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);

static void XLogRead(char *buf, XLogRecPtr startptr, Size count);

static bool IsStreamingActive(void);


/* Initialize walsender process before entering the main command loop */
void
Expand Down Expand Up @@ -816,7 +818,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)

ctx = CreateInitDecodingContext(cmd->plugin, NIL,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData);
WalSndPrepareWrite, WalSndWriteData,
IsStreamingActive);

/*
* Signal that we don't need the timeout mechanism. We're just
Expand Down Expand Up @@ -995,7 +998,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
logical_decoding_ctx = CreateDecodingContext(
cmd->startpoint, cmd->options,
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData);
WalSndPrepareWrite, WalSndWriteData, IsStreamingActive);

/* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn;
Expand Down Expand Up @@ -1086,14 +1089,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
tmpbuf.data, sizeof(int64));

/* fast path */
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
WalSndShutdown();

if (!pq_is_send_pending())
return;

for (;;)
{
int wakeEvents;
Expand Down Expand Up @@ -1225,7 +1220,14 @@ WalSndWaitForWal(XLogRecPtr loc)
break;

/*
* We only send regular messages to the client for full decoded
* If we have received CopyDone from the client, sent CopyDone
* ourselves, it's time to exit streaming.
*/
if (!IsStreamingActive()) {
break;
}

/* We only send regular messages to the client for full decoded
* transactions, but a synchronous replication and walsender shutdown
* possibly are waiting for a later location. So we send pings
* containing the flush location every now and then.
Expand Down Expand Up @@ -1853,7 +1855,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
* again until we've flushed it ... but we'd better assume we are not
* caught up.
*/
if (!pq_is_send_pending())
if (!pq_is_send_pending() && !streamingDoneReceiving)
send_data();
else
WalSndCaughtUp = false;
Expand Down Expand Up @@ -2911,7 +2913,11 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
return;

if (waiting_for_ping_response)
/*
* Keep alive should be send only if protocol in active state. When get or send CopyDone
* it means that protocol preparing to complete.
*/
if (waiting_for_ping_response || !IsStreamingActive())
return;

/*
Expand All @@ -2931,3 +2937,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
WalSndShutdown();
}
}

static
bool IsStreamingActive(void)
{
return !streamingDoneReceiving && !streamingDoneSending;
}
13 changes: 11 additions & 2 deletions src/include/replication/logical.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ typedef void (*LogicalOutputPluginWriterWrite) (

typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite;

/*
* Callback function that allow interrupt logical replication during decoding.
* Function return true if decoding can be continue decode, but if function return false
* logical decoding will stop as soon as possible.
*/
typedef ReorderBufferIsActive LogicalDecondingContextIsActive;

typedef struct LogicalDecodingContext
{
/* memory context this is all allocated in */
Expand Down Expand Up @@ -81,13 +88,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
LogicalOutputPluginWriterWrite do_write,
LogicalDecondingContextIsActive is_active);
extern LogicalDecodingContext *CreateDecodingContext(
XLogRecPtr start_lsn,
List *output_plugin_options,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
LogicalOutputPluginWriterWrite do_write,
LogicalDecondingContextIsActive is_active);
extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx);
extern bool DecodingContextReady(LogicalDecodingContext *ctx);
extern void FreeDecodingContext(LogicalDecodingContext *ctx);
Expand Down
7 changes: 7 additions & 0 deletions src/include/replication/reorderbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ typedef void (*ReorderBufferMessageCB) (
bool transactional,
const char *prefix, Size sz,
const char *message);
/* callback signature for check decoding status */
typedef bool (*ReorderBufferIsActive) (void);

struct ReorderBuffer
{
Expand Down Expand Up @@ -320,6 +322,11 @@ struct ReorderBuffer
ReorderBufferCommitCB commit;
ReorderBufferMessageCB message;

/*
* Callback to define status of decoding. Return false if decoding not necessary continue
*/
ReorderBufferIsActive is_active;

/*
* Pointer that will be passed untouched to the callbacks.
*/
Expand Down

0 comments on commit a275206

Please sign in to comment.