Skip to content

Commit

Permalink
Merge branch 'xxchan/latin-tyrannosaurus' into xxchan/source-test
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <xxchan22f@gmail.com>
  • Loading branch information
xxchan committed Apr 12, 2024
2 parents 0b6f74c + cf221e3 commit 530eb86
Show file tree
Hide file tree
Showing 63 changed files with 1,414 additions and 225 deletions.
13 changes: 7 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions Makefile.toml
Original file line number Diff line number Diff line change
Expand Up @@ -559,8 +559,8 @@ env_files = ["${PREFIX_CONFIG}/risedev-env"]
script = '''
#!/usr/bin/env bash
cat <<EOF > "${PREFIX_CONFIG}/psql-env"
export PGHOST=$RISEDEV_FRONTEND_LISTEN_ADDRESS
export PGPORT=$RISEDEV_FRONTEND_PORT
export PGHOST=$RISEDEV_RW_FRONTEND_LISTEN_ADDRESS
export PGPORT=$RISEDEV_RW_FRONTEND_PORT
export PGUSER=root
export PGDATABASE=dev
EOF
Expand All @@ -577,7 +577,7 @@ dependencies = ["check-risedev-env-file"]
env_files = ["${PREFIX_CONFIG}/risedev-env"]
script = '''
#!/usr/bin/env bash
psql -h $RISEDEV_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_FRONTEND_PORT -U root -d dev "$@"
psql -h $RISEDEV_RW_FRONTEND_LISTEN_ADDRESS -p $RISEDEV_RW_FRONTEND_PORT -U root -d dev "$@"
'''

[tasks.ctl]
Expand Down Expand Up @@ -1291,7 +1291,7 @@ echo "All processes has exited."
"""

[tasks.slt]
env = { SLT_HOST = "${RISEDEV_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_FRONTEND_PORT}", SLT_DB = "dev", RISEDEV_HIDE_CARGO_MAKE_LOG = "true" }
env = { SLT_HOST = "${RISEDEV_RW_FRONTEND_LISTEN_ADDRESS}", SLT_PORT = "${RISEDEV_RW_FRONTEND_PORT}", SLT_DB = "dev", RISEDEV_HIDE_CARGO_MAKE_LOG = "true" }
category = "RiseDev - Test - SQLLogicTest"
install_crate = { version = "0.20.0", crate_name = "sqllogictest-bin", binary = "sqllogictest", test_arg = [
"--help",
Expand Down
5 changes: 1 addition & 4 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ risedev slt './e2e_test/source/basic/alter/kafka_after_new_data.slt'

echo "--- e2e, kafka alter source again"
./scripts/source/prepare_data_after_alter.sh 3
risedev slt './e2e_test/source/basic/alter/kafka_after_new_data_2.slt'

echo "--- e2e, inline test"
risedev slt './e2e_test/source_inline/**/*.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/basic/alter/kafka_after_new_data_2.slt'

echo "--- Run CH-benCHmark"
risedev slt './e2e_test/ch_benchmark/batch/ch_benchmark.slt'
Expand Down
33 changes: 33 additions & 0 deletions e2e_test/ddl/alter_session_params.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
set RW_STREAMING_ENABLE_DELTA_JOIN to true;

statement error session param query_mode cannot be altered system wide
alter system set query_mode to auto;

connection other1
query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
false

statement ok
set RW_STREAMING_ENABLE_DELTA_JOIN to false;

statement ok
alter system set rw_streaming_enable_delta_join to true;

query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
false

sleep 1s

connection other2
query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
true

statement ok
alter system set RW_STREAMING_ENABLE_DELTA_JOIN to default;
12 changes: 0 additions & 12 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,3 @@ Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `maybe` for `rw_implicit_flush`
3: Invalid bool


statement error
set transaction_isolation to 'read committed';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `read committed` for `transaction_isolation`
3: Feature is not yet implemented: isolation level
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/10736
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
import static io.debezium.config.CommonConnectorConfig.TOPIC_PREFIX;
import static io.debezium.schema.AbstractTopicNamingStrategy.*;

import com.risingwave.connector.api.source.CdcEngine;
import com.risingwave.proto.ConnectorServiceProto;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class DbzCdcEngine implements CdcEngine {
public class DbzCdcEngine implements Runnable {
static final int DEFAULT_QUEUE_CAPACITY = 16;

private final DebeziumEngine<?> engine;
private final DbzCdcEventConsumer consumer;
private final DbzChangeEventConsumer changeEventConsumer;
private final long id;

/** If config is not valid will throw exceptions */
Expand All @@ -41,15 +40,15 @@ public DbzCdcEngine(
var topicPrefix = config.getProperty(TOPIC_PREFIX.name());
var transactionTopic = String.format("%s.%s", topicPrefix, DEFAULT_TRANSACTION_TOPIC);
var consumer =
new DbzCdcEventConsumer(
new DbzChangeEventConsumer(
sourceId,
heartbeatTopicPrefix,
transactionTopic,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY));

// Builds a debezium engine but not start it
this.id = sourceId;
this.consumer = consumer;
this.changeEventConsumer = consumer;
this.engine =
DebeziumEngine.create(Connect.class)
.using(config)
Expand All @@ -64,7 +63,6 @@ public void run() {
engine.run();
}

@Override
public long getId() {
return id;
}
Expand All @@ -73,8 +71,11 @@ public void stop() throws Exception {
engine.close();
}

@Override
public BlockingQueue<ConnectorServiceProto.GetEventStreamResponse> getOutputChannel() {
return consumer.getOutputChannel();
return changeEventConsumer.getOutputChannel();
}

public DbzChangeEventConsumer getChangeEventConsumer() {
return changeEventConsumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import org.slf4j.LoggerFactory;

/** Single-thread engine runner */
public class DbzCdcEngineRunner implements CdcEngineRunner {
public class DbzCdcEngineRunner {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEngineRunner.class);

private final ExecutorService executor;
private final AtomicBoolean running = new AtomicBoolean(false);
private CdcEngine engine;
private DbzCdcEngine engine;
private final DbzConnectorConfig config;

public static CdcEngineRunner newCdcEngineRunner(
public static DbzCdcEngineRunner newCdcEngineRunner(
DbzConnectorConfig config, StreamObserver<GetEventStreamResponse> responseObserver) {
DbzCdcEngineRunner runner = null;
try {
Expand Down Expand Up @@ -69,7 +69,7 @@ public static CdcEngineRunner newCdcEngineRunner(
return runner;
}

public static CdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
public static DbzCdcEngineRunner create(DbzConnectorConfig config, long channelPtr) {
DbzCdcEngineRunner runner = new DbzCdcEngineRunner(config);
try {
var sourceId = config.getSourceId();
Expand Down Expand Up @@ -123,7 +123,7 @@ private DbzCdcEngineRunner(DbzConnectorConfig config) {
this.config = config;
}

private void withEngine(CdcEngine engine) {
private void withEngine(DbzCdcEngine engine) {
this.engine = engine;
}

Expand Down Expand Up @@ -160,16 +160,18 @@ public void stop() throws Exception {
}
}

@Override
public CdcEngine getEngine() {
public DbzCdcEngine getEngine() {
return engine;
}

@Override
public boolean isRunning() {
return running.get();
}

public DbzChangeEventConsumer getChangeEventConsumer() {
return engine.getChangeEventConsumer();
}

private void cleanUp() {
running.set(false);
// interrupt the runner thread if it is still running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer;
import com.risingwave.proto.ConnectorServiceProto.CdcMessage;
import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.embedded.EmbeddedEngineChangeEventProxy;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
Expand All @@ -40,9 +44,9 @@ enum EventType {
DATA,
}

public class DbzCdcEventConsumer
public class DbzChangeEventConsumer
implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
static final Logger LOG = LoggerFactory.getLogger(DbzCdcEventConsumer.class);
static final Logger LOG = LoggerFactory.getLogger(DbzChangeEventConsumer.class);

private final BlockingQueue<GetEventStreamResponse> outputChannel;
private final long sourceId;
Expand All @@ -51,7 +55,10 @@ public class DbzCdcEventConsumer
private final String heartbeatTopicPrefix;
private final String transactionTopic;

DbzCdcEventConsumer(
private volatile DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>>
currentRecordCommitter;

DbzChangeEventConsumer(
long sourceId,
String heartbeatTopicPrefix,
String transactionTopic,
Expand Down Expand Up @@ -108,6 +115,7 @@ public void handleBatch(
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
var respBuilder = GetEventStreamResponse.newBuilder();
currentRecordCommitter = committer;
for (ChangeEvent<SourceRecord, SourceRecord> event : events) {
var record = event.value();
EventType eventType = getEventType(record);
Expand Down Expand Up @@ -199,9 +207,6 @@ var record = event.value();
default:
break;
}

// mark the event as processed
committer.markProcessed(event);
}

LOG.debug("recv {} events", respBuilder.getEventsCount());
Expand All @@ -211,16 +216,61 @@ var record = event.value();
var response = respBuilder.build();
outputChannel.put(response);
}
}

committer.markBatchFinished();
public BlockingQueue<GetEventStreamResponse> getOutputChannel() {
return this.outputChannel;
}

@Override
public boolean supportsTombstoneEvents() {
return DebeziumEngine.ChangeConsumer.super.supportsTombstoneEvents();
/**
* Commit the offset to the Debezium engine. NOTES: The input offset is passed from the source
* executor to here
*
* @param offset persisted offset in the Source state table
*/
@SuppressWarnings("unchecked")
public void commitOffset(DebeziumOffset offset) throws InterruptedException {
// Although the committer is read/write by multi-thread, the committer will be not changed
// frequently.
if (currentRecordCommitter == null) {
LOG.info(
"commitOffset() called on Debezium change consumer which doesn't receive records yet.");
return;
}

// only the offset is used
SourceRecord recordWrapper =
new SourceRecord(
offset.sourcePartition,
adjustSourceOffset((Map<String, Object>) offset.sourceOffset),
"DUMMY",
Schema.BOOLEAN_SCHEMA,
true);
ChangeEvent<SourceRecord, SourceRecord> changeEvent =
EmbeddedEngineChangeEventProxy.create(null, recordWrapper, recordWrapper);
currentRecordCommitter.markProcessed(changeEvent);
currentRecordCommitter.markBatchFinished();
}

public BlockingQueue<GetEventStreamResponse> getOutputChannel() {
return this.outputChannel;
/**
* We have to adjust type of LSN values to Long, because it might be Integer after
* deserialization, however {@link
* io.debezium.connector.postgresql.PostgresStreamingChangeEventSource#commitOffset(Map, Map)}
* requires Long.
*/
private Map<String, Object> adjustSourceOffset(Map<String, Object> sourceOffset) {
if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)) {
String value =
sourceOffset
.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)
.toString();
sourceOffset.put(
PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY, Long.parseLong(value));
}
if (sourceOffset.containsKey(PostgresOffsetContext.LAST_COMMIT_LSN_KEY)) {
String value = sourceOffset.get(PostgresOffsetContext.LAST_COMMIT_LSN_KEY).toString();
sourceOffset.put(PostgresOffsetContext.LAST_COMMIT_LSN_KEY, Long.parseLong(value));
}
return sourceOffset;
}
}
Loading

0 comments on commit 530eb86

Please sign in to comment.