Skip to content

Commit

Permalink
[Fix][Doris] Fix the abnormality of deleting data in CDC scenario. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Aug 6, 2024
1 parent c94ea32 commit bb2c912
Show file tree
Hide file tree
Showing 8 changed files with 356 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonGenerator;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -29,6 +30,7 @@
import org.apache.seatunnel.format.text.TextSerializationSchema;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand All @@ -42,39 +44,54 @@ public class SeaTunnelRowSerializer implements DorisSerializer {
private final SeaTunnelRowType seaTunnelRowType;
private final String fieldDelimiter;
private final boolean enableDelete;
private final SerializationSchema serialize;

public SeaTunnelRowSerializer(
String type,
SeaTunnelRowType seaTunnelRowType,
String fieldDelimiter,
boolean enableDelete) {
this.type = type;
this.seaTunnelRowType = seaTunnelRowType;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
}
List<Object> fieldNames = new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
List<SeaTunnelDataType<?>> fieldTypes =
new ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));

if (enableDelete) {
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}

public byte[] buildJsonString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {
this.seaTunnelRowType =
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0]));

JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(seaTunnelRowType, NULL_VALUE);
ObjectMapper mapper = jsonSerializationSchema.getMapper();
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
return jsonSerializationSchema.serialize(row);
if (JSON.equals(type)) {
JsonSerializationSchema jsonSerializationSchema =
new JsonSerializationSchema(this.seaTunnelRowType, NULL_VALUE);
ObjectMapper mapper = jsonSerializationSchema.getMapper();
mapper.configure(JsonGenerator.Feature.WRITE_BIGDECIMAL_AS_PLAIN, true);
this.serialize = jsonSerializationSchema;
} else {
this.serialize =
TextSerializationSchema.builder()
.seaTunnelRowType(this.seaTunnelRowType)
.delimiter(fieldDelimiter)
.nullValue(NULL_VALUE)
.build();
}
}

public byte[] buildCSVString(SeaTunnelRow row, SeaTunnelRowType seaTunnelRowType)
throws IOException {
public byte[] buildJsonString(SeaTunnelRow row) {

return serialize.serialize(row);
}

TextSerializationSchema build =
TextSerializationSchema.builder()
.seaTunnelRowType(seaTunnelRowType)
.delimiter(fieldDelimiter)
.nullValue(NULL_VALUE)
.build();
public byte[] buildCSVString(SeaTunnelRow row) {

return build.serialize(row);
return serialize.serialize(row);
}

public String parseDeleteSign(RowKind rowKind) {
Expand All @@ -93,29 +110,17 @@ public void open() throws IOException {}
@Override
public byte[] serialize(SeaTunnelRow seaTunnelRow) throws IOException {

List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
List<SeaTunnelDataType<?>> fieldTypes = Arrays.asList(seaTunnelRowType.getFieldTypes());

if (enableDelete) {
SeaTunnelRow seaTunnelRowEnableDelete = seaTunnelRow.copy();
seaTunnelRowEnableDelete.setField(
seaTunnelRow.getFields().length, parseDeleteSign(seaTunnelRow.getRowKind()));
fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);

List<Object> newFields = new ArrayList<>(Arrays.asList(seaTunnelRow.getFields()));
newFields.add(parseDeleteSign(seaTunnelRow.getRowKind()));
seaTunnelRow = new SeaTunnelRow(newFields.toArray());
}

if (JSON.equals(type)) {
return buildJsonString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
return buildJsonString(seaTunnelRow);
} else if (CSV.equals(type)) {
return buildCSVString(
seaTunnelRow,
new SeaTunnelRowType(
fieldNames.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0])));
return buildCSVString(seaTunnelRow);
} else {
throw new IllegalArgumentException("The type " + type + " is not supported!");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.seatunnel.connectors.doris.sink.writer;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;

import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
Expand All @@ -31,9 +34,9 @@
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.util.EntityUtils;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
Expand Down Expand Up @@ -64,23 +67,23 @@ public class DorisStreamLoad implements Serializable {
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
private static final String JOB_EXIST_FINISHED = "FINISHED";
private final String loadUrlStr;
private final String hostPort;
@Getter private final String hostPort;
private final String abortUrlStr;
private final String user;
private final String passwd;
private final String db;
@Getter private final String db;
private final String table;
private final boolean enable2PC;
private final boolean enableDelete;
private final Properties streamLoadProp;
private final RecordStream recordStream;
private Future<CloseableHttpResponse> pendingLoadFuture;
@Getter private Future<CloseableHttpResponse> pendingLoadFuture;
private final CloseableHttpClient httpClient;
private final ExecutorService executorService;
private volatile boolean loadBatchFirstRecord;
private volatile boolean loading = false;
private String label;
private long recordCount = 0;
@Getter private long recordCount = 0;

public DorisStreamLoad(
String hostPort,
Expand Down Expand Up @@ -115,18 +118,6 @@ public DorisStreamLoad(
loadBatchFirstRecord = true;
}

public String getDb() {
return db;
}

public String getHostPort() {
return hostPort;
}

public Future<CloseableHttpResponse> getPendingLoadFuture() {
return pendingLoadFuture;
}

public void abortPreCommit(String labelSuffix, long chkID) throws Exception {
long startChkID = chkID;
log.info("abort for labelSuffix {}. start chkId {}.", labelSuffix, chkID);
Expand Down Expand Up @@ -196,10 +187,6 @@ public void writeRecord(byte[] record) throws IOException {
recordCount++;
}

public long getRecordCount() {
return recordCount;
}

public String getLoadFailedMsg() {
if (!loading) {
return null;
Expand Down Expand Up @@ -300,10 +287,9 @@ public void abortTransaction(long txnID) throws Exception {
"Fail to abort transaction " + txnID + " with url " + abortUrlStr);
}

ObjectMapper mapper = new ObjectMapper();
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
mapper.readValue(loadResult, new TypeReference<HashMap<String, String>>() {});
JsonUtils.parseObject(loadResult, new TypeReference<HashMap<String, String>>() {});
if (!LoadStatus.SUCCESS.equals(res.get("status"))) {
if (ResponseUtil.isCommitted(res.get("msg"))) {
throw new DorisConnectorException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,19 @@
<version>${mysql.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading

0 comments on commit bb2c912

Please sign in to comment.