Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add replication protocol API #550

Merged
merged 14 commits into from
Nov 25, 2016

Conversation

Gordiychuk
Copy link
Contributor

@Gordiychuk Gordiychuk commented May 3, 2016

Replication for protocol version 3 works via CopyAPI. Replication protocol is not supported for protocol version 2 .
The main class used is PGReplicationStream. This class encapsulates the low level
replication protocol and periodical update status messages. The output of this is the decode plugin payload.

Current implementation is faced with logical replication protocol bug:
After close ReplicationStream backend not send CommandCompleate and ReadyForQuery packages.
As result it bug broke scenario when from replication stream fetches WAL and send to another system for example elasticsearch asynchonize - after get first problem during asynchronize message send, replication protocol close and restart replication from last success send WAL record.

Example logical API:

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName("test_decoding")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();
    while (true) {
      ByteBuffer buffer = stream.read();
      //process logical changes
    }

Example physical API:

    LogSequenceNumber lsn = getCurrentLSN();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(lsn)
            .start();

    while (true) {
      ByteBuffer read = stream.read();
      //process binary WAL logs
    }

The main purpose for add replication protocol to driver - it logical replication and ability create realtime t integration with external system(for me it kafka+elasticsearch)

@davecramer
Copy link
Member

Awesome! Thanks for this. I'll try to find some time to review soon

@Gordiychuk
Copy link
Contributor Author

Problem scenario for logical replication protocol seem like this

  PGConnection pgConnection = (PGConnection) replConnection;

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table(name) values('message to repeat')");
    st.close();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    List<String> result = new ArrayList<String>();
    result.addAll(receiveMessage(stream, 3));

    stream.close();

Logs:

22:13:14.087 (2)  FE=> StartReplication(query: START_REPLICATION SLOT pgjdbc_logical_replication_slot LOGICAL 0/18FCFD0 ("include-xids" 'false', "skip-empty-xacts" 'true'))
22:13:14.087 (2)  FE=> Query(CopyStart)
22:13:14.088 (2)  <=BE CopyBothResponse
22:13:14.093 (2)  FE=> StandbyStatusUpdate(received: 0/18FCFD0, flushed: 0/0, applied: 0/0, clock: Tue May 03 22:13:14 MSK 2016)
22:13:14.094 (2)  FE=> CopyData(34)
22:13:14.094 (2)  <=BE CopyData
22:13:14.094 (2) k    ���� ���`�� 
22:13:14.094 (2)  <=BE CopyData
22:13:14.094 (2) w                        BEGIN
22:13:14.095 (2)   <=BE Keepalive(lastServerWal: 0/18FCFD0, clock: Tue May 03 22:13:14 MSK 2016 needReply: false)
22:13:14.095 (2)   <=BE XLogData(currWal: 0/0, lastServerWal: 0/0, clock: 0)
22:13:14.095 (2)  <=BE CopyData
22:13:14.095 (2) w    ����Рtable public.test_logic_table: INSERT: pk[integer]:1 name[character varying]:'message to repeat'
22:13:14.096 (2)   <=BE XLogData(currWal: 0/18FD0A0, lastServerWal: 0/18FD0A0, clock: 0)
22:13:14.096 (2)  <=BE CopyData
22:13:14.096 (2) w    ��Ѱ    ��Ѱ        COMMIT
22:13:14.096 (2)   <=BE XLogData(currWal: 0/18FD1B0, lastServerWal: 0/18FD1B0, clock: 0)
22:13:14.096 (2)  FE=> StopReplication
22:13:14.096 (2)  <=BE CopyData
22:13:14.096 (2) k    ��Ѱ ���`�' 
22:13:14.096 (2)  FE=> CopyDone
22:13:14.097 (2)  <=BE CopyDone
22:13:14.097 (2)  <=BE CopyData
22:13:14.097 (2) k    ��Ѱ ���`�� 

org.postgresql.util.PSQLException: Database connection failed when ending copy

    at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:834)
    at org.postgresql.core.v3.CopyDualImpl.endCopy(CopyDualImpl.java:23)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.close(V3PGReplicationStream.java:244)
    at org.postgresql.replication.LogicalReplicationTest.testRepeatWalPositionTwice(LogicalReplicationTest.java:281)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
    at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:143)
    at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:112)
    at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:70)
    at org.postgresql.core.PGStream.ReceiveChar(PGStream.java:283)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:947)
    at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:830)
    ... 33 more

As you can see, after get CopyDone from backend. Backend still send KeepAlive message. But same scenario on physycal replication work well.

@Gordiychuk
Copy link
Contributor Author

@davecramer do you think problem describe above postgresql sever bug or bug implementation in driver?

@davecramer
Copy link
Member

@Gordiychuk I'd suggest posting to hackers... I don't know enough right now to comment

@davecramer
Copy link
Member

@Gordiychuk I suspect this is a bug with the driver implementation. Replication is fairly well tested

@Gordiychuk
Copy link
Contributor Author

After some code analize, I found problem in postgresql. And right now prepare patch. Inside walsender. WalSndLoop conaint next check

        /*
         * If we don't have any pending data in the output buffer, try to send
         * some more.  If there is some, we don't bother to call send_data
         * again until we've flushed it ... but we'd better assume we are not
         * caught up.
         */
        if (!pq_is_send_pending())
            send_data();
        else
            WalSndCaughtUp = false;

that execute callback for tranform wal record and send it. Then for logical replication executes

/*
 * read_page callback for logical decoding contexts, as a walsender process.
 *
 * Inside the walsender we can do better than logical_read_local_xlog_page,
 * which has to do a plain sleep/busy loop, because the walsender's latch gets
 * set everytime WAL is flushed.
 */
static int
logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
                XLogRecPtr targetRecPtr, char *cur_page, TimeLineID *pageTLI)

and it code start long loop with waiting available to send wals

/*
 * Wait till WAL < loc is flushed to disk so it can be safely read.
 */
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
{
    int         wakeEvents;
    static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;


    /*
     * Fast path to avoid acquiring the spinlock in the we already know we
     * have enough WAL available. This is particularly interesting if we're
     * far behind.
     */
    if (RecentFlushPtr != InvalidXLogRecPtr &&
        loc <= RecentFlushPtr)
        return RecentFlushPtr;

    /* Get a more recent flush pointer. */
    if (!RecoveryInProgress())
        RecentFlushPtr = GetFlushRecPtr();
    else
        RecentFlushPtr = GetXLogReplayRecPtr(NULL);

    for (;;)
    {
        long        sleeptime;
        TimestampTz now;

        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
         * necessity for manual cleanup of all postmaster children.
         */
        if (!PostmasterIsAlive())
            exit(1);

        /* Clear any already-pending wakeups */
        ResetLatch(MyLatch);

        CHECK_FOR_INTERRUPTS();

        /* Process any requests or signals received recently */
        if (got_SIGHUP)
        {
            got_SIGHUP = false;
            ProcessConfigFile(PGC_SIGHUP);
            SyncRepInitConfig();
        }

        /* Check for input from the client */
        ProcessRepliesIfAny();

        /* Update our idea of the currently flushed position. */
        if (!RecoveryInProgress())
            RecentFlushPtr = GetFlushRecPtr();
        else
            RecentFlushPtr = GetXLogReplayRecPtr(NULL);

        /*
         * If postmaster asked us to stop, don't wait here anymore. This will
         * cause the xlogreader to return without reading a full record, which
         * is the fastest way to reach the mainloop which then can quit.
         *
         * It's important to do this check after the recomputation of
         * RecentFlushPtr, so we can send all remaining data before shutting
         * down.
         */
        if (walsender_ready_to_stop)
            break;

        /*
         * We only send regular messages to the client for full decoded
         * transactions, but a synchronous replication and walsender shutdown
         * possibly are waiting for a later location. So we send pings
         * containing the flush location every now and then.
         */
        if (MyWalSnd->flush < sentPtr &&
            MyWalSnd->write < sentPtr &&
            !waiting_for_ping_response)
        {
            WalSndKeepalive(false);
            waiting_for_ping_response = true;
        }

        /* check whether we're done */
        if (loc <= RecentFlushPtr)
            break;

        /* Waiting for new WAL. Since we need to wait, we're now caught up. */
        WalSndCaughtUp = true;

        /*
         * Try to flush pending output to the client. Also wait for the socket
         * becoming writable, if there's still pending output after an attempt
         * to flush. Otherwise we might just sit on output data while waiting
         * for new WAL being generated.
         */
        if (pq_flush_if_writable() != 0)
            WalSndShutdown();

        now = GetCurrentTimestamp();

        /* die if timeout was reached */
        WalSndCheckTimeOut(now);

        /* Send keepalive if the time has come */
        WalSndKeepaliveIfNecessary(now);

        sleeptime = WalSndComputeSleeptime(now);

        wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
            WL_SOCKET_READABLE | WL_TIMEOUT;

        if (pq_is_send_pending())
            wakeEvents |= WL_SOCKET_WRITEABLE;

        /* Sleep until something happens or we time out */
        WaitLatchOrSocket(MyLatch, wakeEvents,
                          MyProcPort->sock, sleeptime);
    }

    /* reactivate latch so WalSndLoop knows to continue */
    SetLatch(MyLatch);
    return RecentFlushPtr;
}

In this sicle cycle after execute ProcessRepliesIfAny(); we get CopyDone command, reply with CopyDone and continue waiting WALs, sending WALs and sending keep alives with ignore streamingDoneReceiving and streamingDoneSending flags. Thats why same test for physical replication work well but for logical fail with timeout.

@Gordiychuk
Copy link
Contributor Author

The second problem is that postgresql after reply with CopyData still send CopyData messages, it problem presend for logical and physical replication. And I also want fix it.

        /* Check for input from the client */
        ProcessRepliesIfAny();

        /*
         * If we have received CopyDone from the client, sent CopyDone
         * ourselves, and the output buffer is empty, it's time to exit
         * streaming.
         */
        if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
            break;

        /*
         * If we don't have any pending data in the output buffer, try to send
         * some more.  If there is some, we don't bother to call send_data
         * again until we've flushed it ... but we'd better assume we are not
         * caught up.
         */
        if (!pq_is_send_pending())
            send_data();
        else
            WalSndCaughtUp = false;

        /* Try to flush pending output to the client */
        if (pq_flush_if_writable() != 0)
            WalSndShutdown();

        /* If nothing remains to be sent right now ... */
        if (WalSndCaughtUp && !pq_is_send_pending())
        {
            /*
             * If we're in catchup state, move to streaming.  This is an
             * important state change for users to know about, since before
             * this point data loss might occur if the primary dies and we
             * need to failover to the standby. The state change is also
             * important for synchronous replication, since commits that
             * started to wait at that point might wait for some time.
             */
            if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
            {
                ereport(DEBUG1,
                     (errmsg("standby \"%s\" has now caught up with primary",
                             application_name)));
                WalSndSetState(WALSNDSTATE_STREAMING);
            }

            /*
             * When SIGUSR2 arrives, we send any outstanding logs up to the
             * shutdown checkpoint record (i.e., the latest record), wait for
             * them to be replicated to the standby, and exit. This may be a
             * normal termination at shutdown, or a promotion, the walsender
             * is not sure which.
             */
            if (walsender_ready_to_stop)
                WalSndDone(send_data);
        }

        now = GetCurrentTimestamp();

        /* Check for replication timeout. */
        WalSndCheckTimeOut(now);

        /* Send keepalive if the time has come */
        WalSndKeepaliveIfNecessary(now);

In ProcessRepliesIfAny(); we get CopyDone and send CopyDone as responce, message still present in output buffer so condition

        if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
            break;

will return false and we will execute next functions even if streamingDoneSending and streamingDoneReceiving equal to true

        /* Check for replication timeout. */
        WalSndCheckTimeOut(now);

        /* Send keepalive if the time has come */
        WalSndKeepaliveIfNecessary(now);

@davecramer
Copy link
Member

I would suggest posting this to hackers to get their take first

Gordiychuk pushed a commit to Gordiychuk/postgres that referenced this pull request May 6, 2016
Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
pgjdbc/pgjdbc#550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
Gordiychuk pushed a commit to Gordiychuk/postgres that referenced this pull request May 6, 2016
Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
pgjdbc/pgjdbc#550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
Gordiychuk pushed a commit to Gordiychuk/postgres that referenced this pull request May 6, 2016
Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
pgjdbc/pgjdbc#550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
Gordiychuk pushed a commit to Gordiychuk/postgres that referenced this pull request May 6, 2016
Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
pgjdbc/pgjdbc#550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
Gordiychuk added a commit to Gordiychuk/postgres that referenced this pull request May 6, 2016
Logical decoding during decode WALs ignore message that can reponse receiver on XLogData.
So during big transaction for example that change 1 million record it can lead to two problem:

1. Receiver can disconect server because it not responce on keepalive message with required respose marker.
2. Receiver can't stop replication, until whole transaction will not send to receiver.

Not available stop replication it's main problem. Because receiver will fail during stop replication
with timeout and also backend will generate many not network traffic. This problem
was found during implement physical\logical replication protocol in pgjdbc driver
pgjdbc/pgjdbc#550 And it broke scenario when WALs consumer
receive decoded WALs and put it to external system asynchroneze were if some problem occurs
callback say which LSN was fail, so we can rollback to last success process LSN and
start logical replication again from it place.

I measure stopping replication with fix and without by this test:

For physical replicaion:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(startLSN)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

For logical replication:

    LogSequenceNumber startLSN = getCurrentLSN();

    Statement st = sqlConnection.createStatement();
    st.execute("insert into test_logic_table\n"
        + "  select id, md5(random()::text) as name from generate_series(1, 1000000) as id;");
    st.close();

    long start = System.nanoTime();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(startLSN)
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();

    //read single message
    stream.read();
    long startStopping = System.nanoTime();

    stream.close();

    long now = System.nanoTime();

    long startAndStopTime = now - start;
    long stopTime = now - startStopping;

    System.out.println(TimeUnit.NANOSECONDS.toMillis(startAndStopTime));
    System.out.println(TimeUnit.NANOSECONDS.toMillis(stopTime));

And get next timing:

Before
-----
logical start and stopping: 15446ms
logical stopping: 13820ms

physical start and stopping: 462ms
physical stopping: 348

After
-----
logical start and stopping: 2424ms
logical stopping: 26ms

physical start and stopping: 458ms
physical stopping: 329ms

As you can see, not it allow stop logical replication very fast. For do it, not we check
replies first and only after that send decoded data. After get CopyDone from frontend we
stoping decoding as soon as possible.

The second part of fix, it disable sending keep alive message to frontend if already got CopyDone.
@Gordiychuk Gordiychuk force-pushed the support_replication_protocol branch from 03b9f72 to 86474f3 Compare May 6, 2016 16:46
@jarreds
Copy link

jarreds commented May 7, 2016

Awesome. I'm really excited to get this in. I'd like to get some clarification around the applied/flushed LSN handling.

In this code snippet, there is no handling of flushed or applied LSN:

    while (true) {
      ByteBuffer read = stream.read();
      //process binary WAL logs
    }

I'm trying to grok how manual handling LSN data would work. Something like this:

    while (true) {
      ByteBuffer read = stream.read();

      // this is the current LSN of the copy data read above?
      LogSequenceNumber lsn = stream.getLastReceiveLSN();

      // do some external async processing
      processor.doAsyncProcessing(lsn, read);

      // for status updates to the server
      // get LSN apply/flush data from the async processor
      stream.setAppliedLSN(processor.getLastAppliedLSN());
      stream.setFlushedLSN(processor.getLastFlushedLSN());
    }

?

Let me know if you'd like any help. I'm really interested in this feature. It would really help simplify the current C based system we use for this today.

@Gordiychuk
Copy link
Contributor Author

@jarreds Yes. And for logical decoding

stream.setAppliedLSN(processor.getLastAppliedLSN());

can be skip, because parameter uses only for physical replication.

I also think that need method for check active stream or not, in stream present or not pending messages. Because I think iterate in a cycle forever is not always convenient.

@Gordiychuk
Copy link
Contributor Author

@codecov-io
Copy link

codecov-io commented May 12, 2016

Current coverage is 64.08% (diff: 80.72%)

Merging #550 into master will increase coverage by 0.74%

@@             master       #550   diff @@
==========================================
  Files           151        165    +14   
  Lines         14787      15129   +342   
  Methods           0          0          
  Messages          0          0          
  Branches       2934       2978    +44   
==========================================
+ Hits           9366       9695   +329   
+ Misses         4213       4200    -13   
- Partials       1208       1234    +26   

Powered by Codecov. Last update d32b077...518444e

@Gordiychuk Gordiychuk force-pushed the support_replication_protocol branch 2 times, most recently from 7e20bc9 to 9236c1a Compare May 12, 2016 07:05
@davecramer
Copy link
Member

@Gordiychuk So given that the server won't fix their end until 9.7 where does that leave us?

On another note Travis is failing on a few things. checkstyle and test does not have replication privileges.

Checkstyle should be relatively easy to fix you can run mvn checkstyle:check to find the errors.

Your tests may have to run as user postgres to get them to work...

@Gordiychuk
Copy link
Contributor Author

@Gordiychuk So given that the server won't fix their end until 9.7 where does that leave us?

@davecramer No, we can delivery solution with assumption that org.postgresql.replication.PGReplicationStream#close can take long time and as workaround can be use Connection#close method until fixes will not include to postgresql.

On another note Travis is failing on a few things. checkstyle and test does not have replication privileges.
Checkstyle should be relatively easy to fix you can run mvn checkstyle:check to find the errors.
Your tests may have to run as user postgres to get them to work...

Sorry, on this week I have not much time. I am planning finish api and fix CI problems on this weekend. After that start implementation by #558 issue. I think they should be merged to master together.

@davecramer
Copy link
Member

@Gordiychuk Thanks! No rush

@Gordiychuk Gordiychuk force-pushed the support_replication_protocol branch 3 times, most recently from 97a0b20 to b6dff39 Compare May 14, 2016 20:17
addons:
postgresql: "9.4"
sudo: required
dist: trusty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-1 for converting lots of jobs to sudo: required. It will make Travis a lot slower

@Gordiychuk Gordiychuk force-pushed the support_replication_protocol branch 2 times, most recently from 8c84236 to 167a696 Compare May 15, 2016 09:29
Now user in tests should have replication privilege and also postgres should be configure for accept replication connection, it necessary for test replication protocol
Replication protocol use CopyBothResponce package for initial bidirectional copy protocol.
Then WAL data is sent as a series of CopyData messages. With periodical exchange KeepAlive package.
Replication for protocol version 3 work via CopyAPI. For protocol version 2 replication protocol not supports.
Main class for work with replication protocol it PGReplicationStream. It class hide low level
replication protocol and prediodical update status messages allow work only with payload.

Current implementation faced with logical replication protocol bug:
After close ReplicationStream backend not send CommandCompleate and ReadyForQuery packages.
As result it bug broke scenario when from replication stream fetches wals and send to another system for example elasticsearch asynchonize
- after get first problem during asynchronize message send, replication protocol close and restart replication from last success send wal record.

Example logical API:

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName("test_decoding")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();
    while (true) {
      ByteBuffer buffer = stream.read();
      //process logical changes
    }

Example physical API:
    LogSequenceNumber lsn = getCurrentLSN();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(lsn)
            .start();

    while (true) {
      ByteBuffer read = stream.read();
      //process binary WAL logs
    }
…cking

Periodical check replication stream allow for example process asynchronize
feedback from external system that accept changes from logical replication
stream more faster, because waiting new changes may take a lot of time that
not allow get feedback messages from external system where message can contain
fail and we should restart replication from fail position but we can't because
new changes absent in replication stream.

Example use:

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName(SLOT_NAME)
            .withStartPosition(getCurrentLSN())
            .start();

    while (true) {
      ByteBuffer result = stream.readPending();
      if(result == null) {
        TimeUnit.MILLISECONDS.sleep(10);
        continue;
      }

      //process message
    }
org.postgresql.replication.PGReplication necessary for extend replication
commands in futures.

Example start logical replication:

    pgConnection
        .getReplicationAPI()
        .createReplicationSlot()
        .logical()
        .withSlotName("mySlot")
        .withOutputPlugin("test_decoding")
        .make();

    PGReplicationStream stream =
        pgConnection
            .getReplicationAPI()
            .replicationStream()
            .logical()
            .withSlotName("test_decoding")
            .withSlotName("mySlot")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();
When first message received from server it keep allive we set as last receive
lsn last lsn position on server, as result during reply on keep alive we can
lost reserved wal. Also during process XLogData we can't use last server lsn
from message.
Official documentation say that status update should send back
lsn + offcet that was write or apply to dist:
"The location of the last WAL byte + 1 received
and written to disk in the standby.". In our case it means that
last receive LSN should contains also payload offset.
@Gordiychuk Gordiychuk force-pushed the support_replication_protocol branch from ac59499 to e04d171 Compare November 25, 2016 08:46
Check max_wal_senders not enough for decide, run or not replication tests
because replication can be configure on server, but tests user doesn't have
grants to use replication. For deal with this scenario now we check not
only configured replication but also that user have replication grants.
@Gordiychuk Gordiychuk force-pushed the support_replication_protocol branch from e04d171 to 518444e Compare November 25, 2016 09:12
@vlsi vlsi changed the title feat: Add Support replication protocol feat: add replication protocol API Nov 25, 2016
@vlsi vlsi merged this pull request into pgjdbc:master Nov 25, 2016
import java.util.Queue;

public class CopyDualImpl extends CopyOperationImpl implements CopyDual {
private Queue<byte[]> received = new LinkedList<byte[]>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for the history: ArrayDeque should be preferred over LinkedList

@vlsi
Copy link
Member

vlsi commented Nov 25, 2016

@Gordiychuk , thanks for pushing it forward.

vlsi pushed a commit that referenced this pull request Nov 25, 2016
The replication protocol is managed by PGReplicationStream. It hides low level
replication protocol details and enables end user deal with just payload data.

The entry point is `PGConnection#getReplicationAPI`.

Current PostgreSQL backend has issues with terminating of replication connection (e.g. "stop decode" message might be ignored for a while, so termination would take some time).
Relevant hacker's thread is https://www.postgresql.org/message-id/CAFgjRd3hdYOa33m69TbeOfNNer2BZbwa8FFjt2V5VFzTBvUU3w%40mail.gmail.com

Locgical replication API:

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .logical()
            .withSlotName("test_decoding")
            .withSlotOption("include-xids", false)
            .withSlotOption("skip-empty-xacts", true)
            .start();
    while (true) {
      ByteBuffer buffer = stream.read();
      //process logical changes
    }

Physical replication API:

    LogSequenceNumber lsn = getCurrentLSN();

    PGReplicationStream stream =
        pgConnection
            .replicationStream()
            .physical()
            .withStartPosition(lsn)
            .start();

    while (true) {
      ByteBuffer read = stream.read();
      //process binary WAL logs
    }

The main purpose for supporting of the replication protocol at the driver level is to provide an ability to create realtime time integration with external systems (e.g. Kafka+ElasticSearch)
@jorsol
Copy link
Member

jorsol commented Dec 1, 2016

I know this is already merged, but is V2ReplicationProtocol.java necessary?

The driver already dropped support for v2 protocol and it looks that V2ReplicationProtocol.java is not used anywhere.

@vlsi
Copy link
Member

vlsi commented Dec 1, 2016

@jorsol , seems it can be dropped. I just did not catch that.

davecramer pushed a commit to pgjdbc/www that referenced this pull request Feb 3, 2017
* docs: Replication protocol

PR contains sample of use replication protocol implemented in PR
pgjdbc/pgjdbc#550

* Update replication.md

@Gordyichuck, hopefully I haven't changed the essence ?

* Move replication to extension sub group
@davecramer
Copy link
Member

@Gordiychuk I'd like to add Autoclosable to both PGReplicationStream and PGReplicationConnection?

Thoughts ?

@Gordiychuk
Copy link
Contributor Author

@davecramer good idea

@jorsol
Copy link
Member

jorsol commented Feb 20, 2017

AutoCloseable is a Java 7 feature, the support for Java 6 will be dropped in the near future?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants