Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase #6526

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ default JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
}

/** Get a connection pool factory to create connection pool. */
JdbcConnectionPoolFactory getPooledDataSourceFactory();
default JdbcConnectionPoolFactory getPooledDataSourceFactory() {
throw new UnsupportedOperationException();
}

/** Query and build the schema of table. */
TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public Optional<SourceSplitBase> getNext() {

@Override
public boolean waitingForCompletedSplits() {
return snapshotSplitAssigner.waitingForCompletedSplits();
return snapshotSplitAssigner.waitingForCompletedSplits()
|| incrementalSplitAssigner.waitingForAssignedSplits();
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,16 @@ private void assignSplits() {
awaitingReader.remove();
LOG.debug("Assign split {} to subtask {}", sourceSplit, nextAwaiting);
} else {
// there is no available splits by now, skip assigning
break;
if (splitAssigner.waitingForCompletedSplits()) {
// there is no available splits by now, skip assigning
break;
} else {
LOG.info(
"No more splits available, signal no more splits to subtask {}",
nextAwaiting);
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,8 @@ public boolean completedSnapshotPhase(List<TableId> tableIds) {
return context.getAssignedSnapshotSplit().isEmpty()
&& context.getSplitCompletedOffsets().isEmpty();
}

public boolean waitingForAssignedSplits() {
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
return !(splitAssigned && noMoreSplits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ public void pollNext(Collector<T> output) throws Exception {
context.sendSplitRequest();
needSendSplitRequest.compareAndSet(true, false);
}
super.pollNext(output);

if (isNoMoreSplitsAssignment() && isNoMoreElement()) {
log.info("Reader {} send NoMoreElement event", context.getIndexOfSubtask());
context.signalNoMoreElement();
} else {
super.pollNext(output);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand All @@ -34,6 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.MySqlSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.binlog.MySqlBinlogFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch.scan.MySqlSnapshotFetchTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlSchema;
import org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.TableDiscoveryUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
Expand Down Expand Up @@ -70,20 +70,24 @@ public String getName() {
@Override
public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig)) {
return isTableIdCaseSensitive(jdbcConnection);
return isDataCollectionIdCaseSensitive(jdbcConnection);
} catch (SQLException e) {
throw new SeaTunnelException("Error reading MySQL variables: " + e.getMessage(), e);
}
}

private boolean isDataCollectionIdCaseSensitive(JdbcConnection jdbcConnection) {
return isTableIdCaseSensitive(jdbcConnection);
}

@Override
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new MySqlChunkSplitter(sourceConfig, this);
public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
return MySqlConnectionUtils.createMySqlConnection(sourceConfig.getDbzConfiguration());
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new MysqlPooledDataSourceFactory();
public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new MySqlChunkSplitter(sourceConfig, this);
}

@Override
Expand All @@ -101,8 +105,7 @@ public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
if (mySqlSchema == null) {
mySqlSchema =
new MySqlSchema(
sourceConfig, isDataCollectionIdCaseSensitive(sourceConfig), tableMap);
new MySqlSchema(sourceConfig, isDataCollectionIdCaseSensitive(jdbc), tableMap);
}
return mySqlSchema.getTableSchema(jdbc, tableId);
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
Expand All @@ -40,12 +41,15 @@
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.util.Clock;
import lombok.extern.slf4j.Slf4j;

import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;

import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.NO_STOPPING_OFFSET;

@Slf4j
public class MySqlBinlogFetchTask implements FetchTask<SourceSplitBase> {
private final IncrementalSplit split;
private volatile boolean taskRunning = false;
Expand All @@ -72,6 +76,22 @@ public void execute(FetchTask.Context context) throws Exception {
BinlogSplitChangeEventSourceContext changeEventSourceContext =
new BinlogSplitChangeEventSourceContext();

sourceFetchContext
.getBinaryLogClient()
.registerLifecycleListener(
new BinaryLogClient.AbstractLifecycleListener() {
@Override
public void onConnect(BinaryLogClient client) {
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
try {
sourceFetchContext.getConnection().close();
log.info(
"Binlog client connected, closed idle jdbc connection.");
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
});

mySqlStreamingChangeEventSource.execute(
changeEventSourceContext, sourceFetchContext.getOffsetContext());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand Down Expand Up @@ -90,11 +89,6 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new OracleChunkSplitter(sourceConfig, this);
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new OraclePooledDataSourceFactory();
}

@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
OracleSourceConfig oracleSourceConfig = (OracleSourceConfig) sourceConfig;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
Expand Down Expand Up @@ -96,11 +95,6 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new PostgresChunkSplitter(sourceConfig, this);
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new PostgresPooledDataSourceFactory();
}

@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig) sourceConfig;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -180,28 +180,29 @@ public void configure(SourceSplitBase sourceSplitBase) {
snapshotter.init(connectorConfig, offsetContext.asOffsetState(), slotInfo);
}

SlotCreationResult slotCreatedInfo = null;
if (snapshotter.shouldStream()) {
final boolean doSnapshot = snapshotter.shouldSnapshot();
createReplicationConnection(
doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
if (sourceSplitBase.isIncrementalSplit() || slotInfo == null) {
final boolean doSnapshot = snapshotter.shouldSnapshot();
createReplicationConnection(
doSnapshot, connectorConfig.maxRetries(), connectorConfig.retryDelay());
}
// we need to create the slot before we start streaming if it doesn't exist
// otherwise we can't stream back changes happening while the snapshot is taking
// place
if (slotInfo == null) {
try {
slotCreatedInfo =
SlotCreationResult slotCreatedInfo =
hailin0 marked this conversation as resolved.
Show resolved Hide resolved
replicationConnection.createReplicationSlot().orElse(null);
} catch (SQLException ex) {
String message = "Creation of replication slot failed";
if (ex.getMessage().contains("already exists")) {
message +=
"; when setting up multiple connectors for the same database host, please make sure to use a distinct replication slot name for each.";
log.warn(message);
} else {
throw new DebeziumException(message, ex);
}
throw new DebeziumException(message, ex);
}
} else {
slotCreatedInfo = null;
}
}

Expand Down Expand Up @@ -350,7 +351,9 @@ public Offset getStreamOffset(SourceRecord sourceRecord) {
public void close() {
try {
this.dataConnection.close();
this.replicationConnection.close();
if (this.replicationConnection != null) {
this.replicationConnection.close();
}
} catch (Exception e) {
log.warn("Failed to close connection", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.connection.JdbcConnectionPoolFactory;
import org.apache.seatunnel.connectors.cdc.base.source.enumerator.splitter.ChunkSplitter;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand Down Expand Up @@ -85,11 +84,6 @@ public ChunkSplitter createChunkSplitter(JdbcSourceConfig sourceConfig) {
return new SqlServerChunkSplitter(sourceConfig, this);
}

@Override
public JdbcConnectionPoolFactory getPooledDataSourceFactory() {
return new SqlServerPooledDataSourceFactory();
}

@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig) sourceConfig;
Expand Down
Loading
Loading