Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink]update global committer for bounded case #469

Merged
merged 1 commit into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,7 @@ public boolean commitData(MetaInfo metaInfo, boolean changeSchema, CommitOp comm
Set<CommitOp> middleCommitOps = partitionInfoDao.getCommitOpsBetweenVersions(tableId, partitionDesc,
readPartitionVersion + 1, curVersion);
if (commitOp.equals(CommitOp.UpdateCommit)) {
// TODO: 2024/3/22 further considering for this UpdateCommit conflict case
if (
if (middleCommitOps.contains(CommitOp.UpdateCommit) ||
(middleCommitOps.size() > 1 && middleCommitOps.contains(CommitOp.CompactionCommit))) {
throw new IllegalStateException(
"current operation conflicts with other data writing tasks, table path: " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ public class LakeSoulSinkGlobalCommitter
private final DBManager dbManager;
private final Configuration conf;

private final boolean isBounded;

private final boolean logicallyDropColumn;

public LakeSoulSinkGlobalCommitter(Configuration conf) {
committer = LakeSoulSinkCommitter.INSTANCE;
dbManager = new DBManager();
this.conf = conf;
isBounded = conf.get(IS_BOUNDED).equals("true");
logicallyDropColumn = conf.getBoolean(LOGICALLY_DROP_COLUM);
}

Expand Down Expand Up @@ -94,7 +97,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> filterRecoveredCommittables
@Override
public LakeSoulMultiTableSinkGlobalCommittable combine(List<LakeSoulMultiTableSinkCommittable> committables)
throws IOException {
return LakeSoulMultiTableSinkGlobalCommittable.fromLakeSoulMultiTableSinkCommittable(committables);
return LakeSoulMultiTableSinkGlobalCommittable.fromLakeSoulMultiTableSinkCommittable(committables, isBounded);
}

/**
Expand All @@ -109,11 +112,11 @@ public LakeSoulMultiTableSinkGlobalCommittable combine(List<LakeSoulMultiTableSi
public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittables) throws IOException, InterruptedException {
LakeSoulMultiTableSinkGlobalCommittable globalCommittable =
LakeSoulMultiTableSinkGlobalCommittable.fromLakeSoulMultiTableSinkGlobalCommittable(globalCommittables);
LakeSoulMultiTableSinkGlobalCommittable.fromLakeSoulMultiTableSinkGlobalCommittable(globalCommittables, isBounded);
LOG.info("Committing: {}", globalCommittable);

int index = 0;
String dbType = this.conf.getString(SOURCE_DB_TYPE,"");
String dbType = this.conf.getString(SOURCE_DB_TYPE, "");

for (Map.Entry<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> entry :
globalCommittable.getGroupedCommittable()
Expand All @@ -122,7 +125,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
List<LakeSoulMultiTableSinkCommittable> lakeSoulMultiTableSinkCommittable = entry.getValue();
String tableName = identity.tableId.table();
String tableNamespace = identity.tableId.schema();
if (tableNamespace==null){
if (tableNamespace == null) {
tableNamespace = identity.tableId.catalog();
}
boolean isCdc = identity.useCDC;
Expand Down Expand Up @@ -161,7 +164,7 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
!new HashSet<>(partitionKeys.rangeKeys).containsAll(identity.partitionKeyList)) {
throw new IOException("Change of partition key column of table " + tableName + " is forbidden");
}
StructType origSchema ;
StructType origSchema;
if (TableInfoDao.isArrowKindSchema(tableInfo.getTableSchema())) {
Schema arrowSchema = Schema.fromJSON(tableInfo.getTableSchema());
origSchema = ArrowUtils.fromArrowSchema(arrowSchema);
Expand All @@ -179,11 +182,11 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
StructType mergeStructType = equalOrCanCastTuple3._3();

boolean schemaChangeFound = false;
if (dbType.equals("mongodb")){
if (mergeStructType.length() > origSchema.size()){
if (dbType.equals("mongodb")) {
if (mergeStructType.length() > origSchema.size()) {
schemaChangeFound = schemaChanged;
}
}else {
} else {
schemaChangeFound = equalOrCanCast.equals(DataTypeCastUtils.CAN_CAST());
}
if (schemaChangeFound) {
Expand All @@ -209,9 +212,9 @@ public List<LakeSoulMultiTableSinkGlobalCommittable> commit(
msgSchema,
identity.useCDC,
identity.cdcColumn);
if (dbType.equals("mongodb")){
dbManager.updateTableSchema(tableInfo.getTableId(), ArrowUtils.toArrowSchema(mergeStructType,"UTC").toJson());
}else {
if (dbType.equals("mongodb")) {
dbManager.updateTableSchema(tableInfo.getTableId(), ArrowUtils.toArrowSchema(mergeStructType, "UTC").toJson());
} else {
dbManager.updateTableSchema(tableInfo.getTableId(), msgSchema.toJson());
}
if (JSONObject.parseObject(tableInfo.getProperties()).containsKey(DBConfig.TableInfoProperty.DROPPED_COLUMN)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public String getCommitId() {

public void merge(LakeSoulMultiTableSinkCommittable committable) {
Preconditions.checkState(identity.equals(committable.getIdentity()));
Preconditions.checkState(creationTime == committable.getCreationTime());
// Preconditions.checkState(creationTime == committable.getCreationTime());

for (Map.Entry<String, List<InProgressFileWriter.PendingFileRecoverable>> entry : committable.getPendingFilesMap().entrySet()) {
String bucketId = entry.getKey();
Expand All @@ -213,17 +213,18 @@ private void mergeSourcePartitionInfo(LakeSoulMultiTableSinkCommittable committa
if (sourcePartitionInfo == null) {
sourcePartitionInfo = committable.getSourcePartitionInfo();
} else {
if (committable.getSourcePartitionInfo() == null || committable.getSourcePartitionInfo().isEmpty()) return;
try {
JniWrapper jniWrapper = JniWrapper
.parseFrom(sourcePartitionInfo.getBytes())
.parseFrom(Base64.getDecoder().decode(committable.getSourcePartitionInfo()))
.toBuilder()
.addAllPartitionInfo(
JniWrapper
.parseFrom(committable.getSourcePartitionInfo().getBytes())
.parseFrom(Base64.getDecoder().decode(committable.getSourcePartitionInfo()))
.getPartitionInfoList()
)
.build();
sourcePartitionInfo = new String(jniWrapper.toByteArray());
sourcePartitionInfo = Base64.getEncoder().encodeToString(jniWrapper.toByteArray());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class LakeSoulMultiTableSinkGlobalCommittable implements Serializable {
private final Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommittable;

public LakeSoulMultiTableSinkGlobalCommittable(
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables) {
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables, boolean isBounded) {
groupedCommitables.forEach((key, disorderedCommitables) -> {
disorderedCommitables.sort(LakeSoulMultiTableSinkCommittable::compareTo);
List<LakeSoulMultiTableSinkCommittable> mergedCommittables = new ArrayList<>();
Expand All @@ -35,7 +35,7 @@ public LakeSoulMultiTableSinkGlobalCommittable(
mergedCommittables.add(committable);
} else {
LakeSoulMultiTableSinkCommittable tail = mergedCommittables.get(mergedCommittables.size() - 1);
if (tail.getCreationTime() == committable.getCreationTime()) {
if (isBounded || tail.getCreationTime() == committable.getCreationTime()) {
tail.merge(committable);
} else {
mergedCommittables.add(committable);
Expand All @@ -49,22 +49,22 @@ public LakeSoulMultiTableSinkGlobalCommittable(
}

public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkGlobalCommittable(
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittables) {
List<LakeSoulMultiTableSinkGlobalCommittable> globalCommittables, boolean isBounded) {
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables =
new HashMap<>();
globalCommittables.forEach(globalCommittable -> globalCommittable.getGroupedCommittable().forEach(
(key, value) -> groupedCommitables.computeIfAbsent(key, tuple2 -> new ArrayList<>()).addAll(value)));
return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables);
return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables, isBounded);
}

public static LakeSoulMultiTableSinkGlobalCommittable fromLakeSoulMultiTableSinkCommittable(
List<LakeSoulMultiTableSinkCommittable> committables) {
List<LakeSoulMultiTableSinkCommittable> committables, boolean isBounded) {
Map<Tuple2<TableSchemaIdentity, String>, List<LakeSoulMultiTableSinkCommittable>> groupedCommitables =
new HashMap<>();
committables.forEach(committable -> groupedCommitables.computeIfAbsent(
Tuple2.of(committable.getIdentity(), committable.getBucketId()), tuple2 -> new ArrayList<>())
.add(committable));
return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables);
return new LakeSoulMultiTableSinkGlobalCommittable(groupedCommitables, isBounded);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ private LakeSoulMultiTableSinkGlobalCommittable deserializeV1(DataInputView data
committableSerializer, dataInputView));
}
}
return LakeSoulMultiTableSinkGlobalCommittable.fromLakeSoulMultiTableSinkCommittable(committables);
return LakeSoulMultiTableSinkGlobalCommittable.fromLakeSoulMultiTableSinkCommittable(committables, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream, Co
flinkConf.set(DML_TYPE, PARTITION_DELETE);
}
}
flinkConf.set(IS_BOUNDED, String.valueOf(sinkContext.isBounded()));
Path path = FlinkUtil.makeQualifiedPath(new Path(flinkConf.getString(CATALOG_PATH)));
int bucketParallelism = flinkConf.getInteger(HASH_BUCKET_NUM);
//rowData key tools
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ public class LakeSoulSinkOptions {
.defaultValue(INSERT)
.withDescription("DML type");

public static final ConfigOption<String> IS_BOUNDED = ConfigOptions
.key("is_bounded")
.stringType()
.defaultValue("true")
.withDescription("Whether sink is bounded");

public static final ConfigOption<String> SOURCE_PARTITION_INFO = ConfigOptions
.key("source_partition_info")
.stringType()
Expand Down
Loading