From a27520654b412a24d6770511f8b83face53c5fa0 Mon Sep 17 00:00:00 2001 From: Vladimir Gordiychuk Date: Fri, 6 May 2016 16:29:08 +0300 Subject: [PATCH] Stop logical decoding by get CopyDone Logical decoding during decode WALs ignore message that can reponse receiver on XLogData. So during big transaction for example that change 1 million record it can lead to two problem: 1. Receiver can disconect server because it not responce on keepalive message with required respose marker. 2. Receiver can't stop replication, until whole transaction will not send to receiver. Not available stop replication it's main problem. Because receiver will fail during stop replication with timeout and also backend will generate many not network traffic. This problem was found during implement physical\logical replication protocol in pgjdbc driver https://github.com/pgjdbc/pgjdbc/pull/550 And it broke scenario when WALs consumer receive decoded WALs and put it to external system asynchroneze were if some problem occurs callback say which LSN was fail, so we can rollback to last success process LSN and start logical replication again from it place. I measure stopping replication with fix and without by this test: For physical replicaion: LogSequenceNumber startLSN = getCurrentLSN(); Statement st = sqlConnection.createStatement(); st.execute("insert into test_logic_table\n" + " select id, md5(random()::text) as name from generate_series(1, 1000000) as id;"); st.close(); long start = System.nanoTime(); PGReplicationStream stream = pgConnection .replicationStream() .physical() .withStartPosition(startLSN) .start(); //read single message stream.read(); long startStopping = System.nanoTime(); stream.close(); long now = System.nanoTime(); long startAndStopTime = now - start; long stopTime = now - startStopping; System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime)); System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime)); For logical replication: LogSequenceNumber startLSN = getCurrentLSN(); Statement st = sqlConnection.createStatement(); st.execute("insert into test_logic_table\n" + " select id, md5(random()::text) as name from generate_series(1, 1000000) as id;"); st.close(); long start = System.nanoTime(); PGReplicationStream stream = pgConnection .replicationStream() .logical() .withSlotName(SLOT_NAME) .withStartPosition(startLSN) .withSlotOption("include-xids", false) .withSlotOption("skip-empty-xacts", true) .start(); //read single message stream.read(); long startStopping = System.nanoTime(); stream.close(); long now = System.nanoTime(); long startAndStopTime = now - start; long stopTime = now - startStopping; System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime)); System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime)); And get next timing: Before ----- logical start and stopping: 15446ms logical stopping: 13820ms physical start and stopping: 462ms physical stopping: 348 After ----- logical start and stopping: 2424ms logical stopping: 26ms physical start and stopping: 458ms physical stopping: 329ms As you can see, not it allow stop logical replication very fast. For do it, not we check replies first and only after that send decoded data. After get CopyDone from frontend we stoping decoding as soon as possible. The second part of fix, it disable sending keep alive message to frontend if already got CopyDone. --- src/backend/replication/logical/logical.c | 15 +++++--- .../replication/logical/logicalfuncs.c | 15 +++++++- .../replication/logical/reorderbuffer.c | 9 +++-- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 38 ++++++++++++------- src/include/replication/logical.h | 13 ++++++- src/include/replication/reorderbuffer.h | 7 ++++ 7 files changed, 74 insertions(+), 25 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 5ccfd3105f085..cfb4ae4a2def1 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + ReorderBufferIsActive is_active) { ReplicationSlot *slot; MemoryContext context, @@ -182,6 +183,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->is_active = is_active; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -214,7 +216,8 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -290,7 +293,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, is_active); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -330,7 +333,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -380,7 +384,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, + is_active); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 99112ac1b4d72..e90e1070e26ef 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -104,6 +104,17 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi p->returned_rows++; } +/** + * Stab function that necessary for LogicalDecondign context. + * Function always return true and it means that decoding WALs + * can't be interrupt in contrast of logical replication. + */ +static bool +LogicalContextAlwaysActive(void) +{ + return true; +} + static void check_permissions(void) { @@ -243,7 +254,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin options, logical_read_local_xlog_page, LogicalOutputPrepareWrite, - LogicalOutputWrite); + LogicalOutputWrite, + LogicalContextAlwaysActive /* converting to Datum(sql api) can't be interrupted in contrast of replication*/ + ); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 57821c34027e4..deb09dceb0c7d 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1420,7 +1420,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); iterstate = ReorderBufferIterTXNInit(rb, txn); - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && rb->is_active()) { Relation relation = NULL; Oid reloid; @@ -1646,8 +1646,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + if(rb->is_active()) + { + /* call commit callback */ + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 9cc24eadf2337..8b30bd355ddeb 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) */ ctx = CreateInitDecodingContext( NameStr(*plugin), NIL, - logical_read_local_xlog_page, NULL, NULL); + logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 926a247b66cde..ff6fc6d7e5381 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -218,6 +218,8 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +static bool IsStreamingActive(void); + /* Initialize walsender process before entering the main command loop */ void @@ -816,7 +818,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, + IsStreamingActive); /* * Signal that we don't need the timeout mechanism. We're just @@ -995,7 +998,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) logical_decoding_ctx = CreateDecodingContext( cmd->startpoint, cmd->options, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, IsStreamingActive); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; @@ -1086,14 +1089,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - WalSndShutdown(); - - if (!pq_is_send_pending()) - return; - for (;;) { int wakeEvents; @@ -1225,7 +1220,14 @@ WalSndWaitForWal(XLogRecPtr loc) break; /* - * We only send regular messages to the client for full decoded + * If we have received CopyDone from the client, sent CopyDone + * ourselves, it's time to exit streaming. + */ + if (!IsStreamingActive()) { + break; + } + + /* We only send regular messages to the client for full decoded * transactions, but a synchronous replication and walsender shutdown * possibly are waiting for a later location. So we send pings * containing the flush location every now and then. @@ -1853,7 +1855,7 @@ WalSndLoop(WalSndSendDataCallback send_data) * again until we've flushed it ... but we'd better assume we are not * caught up. */ - if (!pq_is_send_pending()) + if (!pq_is_send_pending() && !streamingDoneReceiving) send_data(); else WalSndCaughtUp = false; @@ -2911,7 +2913,11 @@ WalSndKeepaliveIfNecessary(TimestampTz now) if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + /* + * Keep alive should be send only if protocol in active state. When get or send CopyDone + * it means that protocol preparing to complete. + */ + if (waiting_for_ping_response || !IsStreamingActive()) return; /* @@ -2931,3 +2937,9 @@ WalSndKeepaliveIfNecessary(TimestampTz now) WalSndShutdown(); } } + +static +bool IsStreamingActive(void) +{ + return !streamingDoneReceiving && !streamingDoneSending; +} diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 947000e63f898..ba54ddc198285 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -26,6 +26,13 @@ typedef void (*LogicalOutputPluginWriterWrite) ( typedef LogicalOutputPluginWriterWrite LogicalOutputPluginWriterPrepareWrite; +/* + * Callback function that allow interrupt logical replication during decoding. + * Function return true if decoding can be continue decode, but if function return false + * logical decoding will stop as soon as possible. + */ +typedef ReorderBufferIsActive LogicalDecondingContextIsActive; + typedef struct LogicalDecodingContext { /* memory context this is all allocated in */ @@ -81,13 +88,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active); extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + LogicalDecondingContextIsActive is_active); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index e0708940a04e1..28bd692b81a2d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -291,6 +291,8 @@ typedef void (*ReorderBufferMessageCB) ( bool transactional, const char *prefix, Size sz, const char *message); +/* callback signature for check decoding status */ +typedef bool (*ReorderBufferIsActive) (void); struct ReorderBuffer { @@ -320,6 +322,11 @@ struct ReorderBuffer ReorderBufferCommitCB commit; ReorderBufferMessageCB message; + /* + * Callback to define status of decoding. Return false if decoding not necessary continue + */ + ReorderBufferIsActive is_active; + /* * Pointer that will be passed untouched to the callbacks. */