Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Connector-V2][ES]source deserializer error and inappropriate #4233

Merged
merged 9 commits into from
Apr 1, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
Expand All @@ -37,7 +38,9 @@

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
Expand Down Expand Up @@ -111,7 +114,12 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) {
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
if (value instanceof TextNode) {
seaTunnelFields[i] =
convertValue(seaTunnelDataType, ((TextNode) value).textValue());
} else {
seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
}
}
}
} catch (Exception ex) {
Expand Down Expand Up @@ -188,6 +196,13 @@ Object convertValue(SeaTunnelDataType<?> fieldType, String fieldValue)
}

private LocalDateTime parseDate(String fieldValue) {
// handle strings of timestamp type
try {
long ts = Long.parseLong(fieldValue);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault());
} catch (NumberFormatException e) {
// no op
}
String formatDate = fieldValue.replace("T", " ");
if (fieldValue.length() == "yyyyMMdd".length()
|| fieldValue.length() == "yyyy-MM-dd".length()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -131,8 +131,7 @@ public void testElasticsearch(TestContainer container)
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

private List<String> generateTestDataSet()
throws JsonProcessingException, UnknownHostException {
private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
"c_map",
Expand Down Expand Up @@ -170,7 +169,7 @@ private List<String> generateTestDataSet()
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now().toString(),
LocalDateTime.now().toString()
System.currentTimeMillis()
};
for (int j = 0; j < fields.length; j++) {
doc.put(fields[j], values[j]);
Expand Down Expand Up @@ -215,6 +214,13 @@ private List<String> readSinkData() throws InterruptedException {
x.remove("_index");
x.remove("_type");
x.remove("_id");
// I don’t know if converting the test cases in this way complies with
// the CI specification
x.replace(
"c_timestamp",
LocalDateTime.parse(x.get("c_timestamp").toString())
.toInstant(ZoneOffset.UTC)
.toEpochMilli());
});
List<String> docs =
scrollResult.getDocs().stream()
Expand Down