Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: CDC replication/apply is slow #704

Merged
merged 6 commits into from
Mar 25, 2024

Conversation

arajkumar
Copy link
Contributor

@arajkumar arajkumar commented Mar 14, 2024

Our existing implementation is basic and it will execute a statement and waits for its result to arrive before issuing next statement. We already improved DML statements targeting same table with in a transaction by coalescing them into multi-value insert statement, but it will be beneficial only for a transaction which has multiple DML statements.

However, we have encountered few cases where the transaction had only one DML statement. This kind of txn will be slower because statements are executed sequentially from the client's perspective and network round trip latency would be added for each statement. For e.g. consider a network round trip of 10ms, executing 100 statements would simply add 1000ms(1s) to the overall response time.

Here is an example of how a single statement transaction will be executed now,

  1. Execute "BEGIN"
  2. Execute bunch of "SET" statements which are related to replication session setup
  3. Prepare DML statement
  4. Execute prepared statement with values
  5. Execute procedure to update replication origin progress
  6. Execute COMMIT
  7. Map target current insert lsn to commit lsn for feedback reporting (i.e. sentinel replay_lsn)

Solution: Use pipeline mode from libpq. It would facilitate sending commands without waiting for it's result.

The goal of this implementation is to reduce network latency as much as possible while apply logical messages.

The proposed implementation would enter into pipeline mode as soon as a new libpq connection is created for the ld_apply/ld_replay process. All the statements would be executed on a pipeline by default except few for statements which needs response immediately(e.g. step 7). A dedicated connection would be used to serve (step 7), because it needs synchronous response.

The following functions are being called on the target PGSQL handle from ld_apply & ld_replay.

  • pgsql_begin
  • pgsql_set_gucs
  • pgsql_execute
  • pgsql_replication_origin_xact_setup
  • pgsql_prepare
  • pgsql_execute_prepared
  • pgsql_current_wal_insert_lsn
  • pgsql_current_wal_flush_lsn

Among all of the above function, only pgsql_current_wal_insert_lsn and pgsql_current_wal_flush_lsn returns values and other functions are write only.

The idea is to have 2 PGSQL connection handles, 1 for all write activity which can go through pipeline and another one could be used for reading.

Pipeline connection has to be synced/drained at some point to avoid accumulating results on the server & client which would end up eating lots of heap memory. The current implementation syncs based on the time interval(i.e. for every 1s). There are other methods like statement/txn count based sync, which may or may not be efficient.

The following command can be used to generate loads to understand the performance improvement made by this commit,

CREATE TABLE metrics (
    time TIMESTAMP NOT NULL,
    name TEXT,
    id NUMERIC,
    value FLOAT
);
-- insert_metric.sql
\set id random(1, 1000000)
\set value random(0,100)
INSERT INTO metrics (time, name, value) VALUES (NOW(), 'metric_' || :id, :value);
pgbench -n -c 40 -j 1 -t 10000 -f insert_metric.sql $SOURCE
  • -c number of database connection to utlize (i.e. server side concurrency)
  • -j number of threads to create on the client machine (i.e. client side concurrency)

synchronous_commit=off)

pgbench -n -c 1 -j 1 -t 10000 -f insert_metric.sql $TARGET
tps = 1177 (without initial connection time)
tps = 175
tps = 1652

This commit improves the single statement txn throughput by 10x

Ideally, we should aim to get performance number close to direct ingestion(i.e. 1200 txn/s). We are 40% performing better than the baseline in this iteration. However, we can aim more as in the real system there will be more than 1 connection will be utilized. We can't really race against multiple connection doing steady ingestion around 1000 txn/s per connection, but lets optimize the single connection throughput to the max!

This change will be a foundation to the future improvements which steers towards that.

  1. Optimize step 2 - Instead of executing bunch of SET statements for every txn, run once in the beginning for the session
  2. Optimize step 9 - Probably we can simply use pg_replication_origin_progress as replay_lsn?

TODO

  • Conditionally enable pipeline on libpq version >= 14 - Should we still need to do this?

@arajkumar
Copy link
Contributor Author

I will keep this as draft until I fix all CI failures.

@arajkumar
Copy link
Contributor Author

@dimitri Would you still suggest to conditionally enable the pipeline implementation for libpq >= 14?

@dimitri
Copy link
Owner

dimitri commented Mar 14, 2024

Thanks @arajkumar for working on this, that's much appreciated!

My understanding is that we need libpq 14+ at build time to enable this yes. Unfortunately not all build systems out there understand that we can build with libpq 16 and then work with different Postgres versions (RPM is one of these).

Another aspect: is it possible to have a “sync” step in the pipeline mode, for the COMMIT and LSN tracking? if that was possible, we could skip using a second transaction entirely, and I think we have to do that for correctness.

@arajkumar
Copy link
Contributor Author

arajkumar commented Mar 14, 2024

My understanding is that we need libpq 14+ at build time to enable this yes. Unfortunately not all build systems out there understand that we can build with libpq 16 and then work with different Postgres versions (RPM is one of these).

@dimitri I've added this in the recent commit.

Another aspect: is it possible to have a “sync” step in the pipeline mode, for the COMMIT and LSN tracking? if that was possible, we could skip using a second transaction entirely, and I think we have to do that for correctness.

This would make pipeline to be synced for each commit and reduce the overall throughput of the pipeline implementation. It would essentially make pipeline mode to used only with in a txn. We are having exactly same implementation on our internal fork which is branched out around 0.13. The throughput is not so great when you have lots of txns with single DML statement.

Since we already do synchronous_commit=off, I'm not sure how much difference we would add by having a sync step for each COMMIT. I also thought of eliminating the entire LSN tracking and replace it with pg_replication_origin_progress. Do you think this is not a good idea?

@arajkumar arajkumar marked this pull request as ready for review March 15, 2024 18:07
@arajkumar arajkumar changed the title Problem: CDC replication is slow Problem: CDC replication/apply is slow Mar 15, 2024
@arajkumar
Copy link
Contributor Author

@dimitri

Here more details about overhead caused by the current LSN tracking implementation. This is kinda micro benchmark where it uses only pgcopydb apply to ingest 122987 records with same amount tranaction.

$ grep BEGIN ~/dimitri/bench.sql | wc -l
122987
$ grep INSERT ~/dimitri/bench.sql | wc -l
122987

$ time pgcopydb stream apply --resume --not-consistent ~/dimitri/bench.sql
06:05:02.850 19838 INFO   Running pgcopydb version 0.15.38.gc1bc3d9 from "/home/ubuntu/dimitri/pgcopydb/src/bin/pgcopydb/pgcopydb"
06:05:02.887 19838 INFO   Using work dir "/tmp/pgcopydb"
06:05:02.936 19838 INFO   Setting up previous LSN from replication origin "pgcopydb" progress at 0/0

06:05:02.982 19838 INFO   Replaying changes from file "/home/ubuntu/dimitri/bench.sql"

06:06:29.934 19838 INFO   Before syncing with the pgcopydb sentinel
06:09:37.642 19838 INFO   After syncing with the pgcopydb sentinel

pgcopydb stream apply --resume --not-consistent ~/dimitri/bench.sql  5.11s user 6.72s system 4% cpu 4:34.82 total

With pipeline mode, apply took 4m 34 seconds to ingest 122987 records. i.e 448 records/sec.

If we dissect those steps further, DML insert took (06:06:29.934 - 06:05:02.982) => 1m 27 seconds, i.e. 1,413 recods/sec.

Pgcopydb sentinel took (06:09:37.642-06:06:29.934) => 3m 8seconds.

You can find the CPU profiling taken during the above benchmark => https://pprof.me/46714b5a2a244609af5e72e035427a5c/

The benchmark shows considerable amount of time (~80%) of the time spend in stream_apply_sync_sentinel and stream_apply_track_insert_lsn.

image

Is there an opportunity to optimize stream_apply_sync_sentinel & stream_apply_track_insert_lsn?, probably yes, I didn't really spent time on that, but should we do that? Why not make replication_origin work and simply use replication_origin_progress as replay_lsn.?

Copy link
Owner

@dimitri dimitri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First round of review is in, lots of naming issue, the build-time vs run-time thing with libpq14 for pipeline mode, and not so much after that. I understand why we need to stay in pipeline mode in the connection and can't have sync queries in there, but I failed to read a comment explaining why we need two connections now, what they are used for, etc. It's also a problem with the PGSQL client connection names.

Comment on lines 1873 to 1878
/*
* pgsql_pipeline_enter enables the pipeline mode in the given PGSQL
* connection. It also sets the connection to non-blocking mode.
*/
bool
pgsql_pipeline_enter(PGSQL *pgsql)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would prefer another function name, such as pgsql_enable_pipeline_mode.

src/bin/pgcopydb/pgsql.c Outdated Show resolved Hide resolved
src/bin/pgcopydb/pgsql.c Outdated Show resolved Hide resolved

if (!is_response_ok(res))
{
(void) pgcopy_log_error(pgsql, res, "Read after pipeline sync failed");
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do better in terms of error message here?

src/bin/pgcopydb/pgsql.c Outdated Show resolved Hide resolved
src/bin/pgcopydb/ld_stream.h Outdated Show resolved Hide resolved
Copy link
Owner

@dimitri dimitri left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we're almost there, thanks again!

src/bin/pgcopydb/ld_stream.h Outdated Show resolved Hide resolved
src/bin/pgcopydb/pgsql.c Outdated Show resolved Hide resolved
Our existing implementation is basic and it will execute a statement and waits for its result to come before issuing next statement. We already improved DML statements targetting same table with in a transaction by coalscing them into multi-value insert statement, but it will be beneficial only for logical decoding transaction which has multiple DML statements with in a txn.

However, we have encountered few cases where the transaction had only one DML statement. This kind of txn will be slower because statements are executed sequentially from the client's perspective.

Here is an example of how a single statement transaction will be executed now,

1. Execute ["BEGIN"]()
2. Execute [bunch of "SET" statements]() which are related to replication session setup
3. Prepare [DML statement]()
4. Execute [prepared statement with values]()
5. Execute procedure to update [replication origin progress]()
6. Execute [COMMIT]()
7. Map [target current insert lsn to commit lsn]() for feedback reporting (i.e. sentinel replay_lsn)

**Solution**: Use [pipeline
API](https://www.postgresql.org/docs/current/libpq-pipeline-mode.html) from libpq client library

The proposed implementation would enter into pipeline mode as soon as a new libpq connection is created for the ld_apply/ld_replay process. All the statements would be executed on a pipeline by default except few for statements which needs response immediately(e.g. step 7). A dedicated connection would be used to serve (step 7), because it needs synchronous response.

The following functions are being called on the target PGSQL handle from ld_apply & ld_replay.

* pgsql_begin
* pgsql_set_gucs
* pgsql_execute
* pgsql_replication_origin_xact_setup
* pgsql_prepare
* pgsql_execute_prepared
* pgsql_current_wal_insert_lsn
* pgsql_current_wal_flush_lsn

Among all of the above function, only pgsql_current_wal_insert_lsn and pgsql_current_wal_flush_lsn returns values and other functions are write only.

The idea is to have 2 PGSQL connection handles, 1 for all write activity which can go through pipeline and another one could be used for reading.

Pipeline connection has to be synced/drained at some point to avoid accumulating results on the server & client which would end up eating lots of heap memory. The current implementation syncs based on the time interval(i.e. for every 1s). There are other methods like statement/txn count based sync, which may or may not be efficient.

The following command can be used to generate loads to understand the performance improvement made by this commit,

```
CREATE TABLE metrics (
    time TIMESTAMP NOT NULL,
    name TEXT,
    id NUMERIC,
    value FLOAT
);
```

```
-- insert_metric.sql
\set id random(1, 1000000)
\set value random(0,100)
INSERT INTO metrics (time, name, value) VALUES (NOW(), 'metric_' || :id, :value);
```
```
pgbench -n -c 40 -j 1 -t 10000 -f insert_metric.sql $SOURCE
```

- `-c` number of database connection to utlize (i.e. server side concurrency)
- `-j` number of threads to create on the client machine (i.e. client side concurrency)

synchronous_commit=off)
```
pgbench -n -c 1 -j 1 -t 10000 -f insert_metric.sql $TARGET
```
```
tps = 1177 (without initial connection time)
```

```
tps = 175
```

```
tps = 1652
```

**This commit improves the single statement txn throughput by 10x**

Ideally, we should aim to get performance number close to direct ingestion(i.e. 1200 txn/s). We are 40% performing better than the baseline in this iteration. However, we can aim more as in the real system there will be more than 1 connection will be utilized. We can't really race against multiple connection doing steady ingestion around 1000 txn/s per connection, but lets optimize the single connection throughput to the max!

This change will be a foundation to the future improvements which steers towards that.

1. Optimize step 2 - Instead of executing bunch of SET statements for every txn, run once in the beginning for the session
2. Optimize step 9 - Probably we can simply use pg_replication_origin_progress as replay_lsn?

Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
@dimitri dimitri merged commit 14fed35 into dimitri:main Mar 25, 2024
18 checks passed
@arajkumar
Copy link
Contributor Author

Thanks a lot @dimitri for reviewing and providing detailed review comments. Much appreciated!

Next, I'm planning to change Dockerfile to install newer versions of postgres-client(14?) libraries to enable the pipeline mode by default.

@dimitri
Copy link
Owner

dimitri commented Mar 25, 2024

Next, I'm planning to change Dockerfile to install newer versions of postgres-client(14?) libraries to enable the pipeline mode by default.

Sounds good. I have also been wondering if switching the copy protocol calls that we do to asynchronous would raise any performance improvements... would that be an area you'd be willing to investigate?

@arajkumar
Copy link
Contributor Author

Sounds good. I have also been wondering if switching the copy protocol calls that we do to asynchronous would raise any performance improvements... would that be an area you'd be willing to investigate?

@dimitri Do you mean copy protocol used for initial data copying?

@dimitri
Copy link
Owner

dimitri commented Mar 27, 2024

I was thinking about PQgetCopyData but I now see that we already did that work, I just failed to remember about it because it was done to solve error handling rather than performance characteristics.

arajkumar added a commit to arajkumar/pgcopydb that referenced this pull request Apr 1, 2024
We have implemented pipeline mode in
dimitri#704, but it is not yet enabled
by default as the current docker image has PG13.

Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
dimitri pushed a commit that referenced this pull request Apr 2, 2024
We have implemented pipeline mode in
#704, but it is not yet enabled
by default as the current docker image has PG13.

Signed-off-by: Arunprasad Rajkumar <ar.arunprasad@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants