Skip to content

Commit

Permalink
[Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (
Browse files Browse the repository at this point in the history
apache#4233)

* [Fix][Connector-V2][ES]source deserializer error and inappropriate

* [WIP][Connector-V2][ES]try fix e2e

* [WIP][Connector-V2][ES]try fix e2e

* [WIP][Connector-V2][ES]try fix e2e

* [WIP][Connector-V2][ES]try fix e2e

* [Fix][Connector-V2][ES]es special types may result in a null pointer. e.g. ip type

* [Fix][Connector-V2][ES]fix the parsing exception when the es time type is epoch_millis
  • Loading branch information
kpretty authored and EricJoy2048 committed Apr 9, 2023
1 parent 039acb2 commit f2dbdc6
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
Expand All @@ -37,7 +38,9 @@

import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
Expand Down Expand Up @@ -111,7 +114,12 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) {
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
if (value instanceof TextNode) {
seaTunnelFields[i] =
convertValue(seaTunnelDataType, ((TextNode) value).textValue());
} else {
seaTunnelFields[i] = convertValue(seaTunnelDataType, value.toString());
}
}
}
} catch (Exception ex) {
Expand Down Expand Up @@ -188,6 +196,13 @@ Object convertValue(SeaTunnelDataType<?> fieldType, String fieldValue)
}

private LocalDateTime parseDate(String fieldValue) {
// handle strings of timestamp type
try {
long ts = Long.parseLong(fieldValue);
return LocalDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault());
} catch (NumberFormatException e) {
// no op
}
String formatDate = fieldValue.replace("T", " ");
if (fieldValue.length() == "yyyyMMdd".length()
|| fieldValue.length() == "yyyy-MM-dd".length()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@

import java.io.IOException;
import java.math.BigDecimal;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -131,8 +131,7 @@ public void testElasticsearch(TestContainer container)
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

private List<String> generateTestDataSet()
throws JsonProcessingException, UnknownHostException {
private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
"c_map",
Expand Down Expand Up @@ -170,7 +169,7 @@ private List<String> generateTestDataSet()
BigDecimal.valueOf(11, 1),
"test".getBytes(),
LocalDate.now().toString(),
LocalDateTime.now().toString()
System.currentTimeMillis()
};
for (int j = 0; j < fields.length; j++) {
doc.put(fields[j], values[j]);
Expand Down Expand Up @@ -215,6 +214,13 @@ private List<String> readSinkData() throws InterruptedException {
x.remove("_index");
x.remove("_type");
x.remove("_id");
// I don’t know if converting the test cases in this way complies with
// the CI specification
x.replace(
"c_timestamp",
LocalDateTime.parse(x.get("c_timestamp").toString())
.toInstant(ZoneOffset.UTC)
.toEpochMilli());
});
List<String> docs =
scrollResult.getDocs().stream()
Expand Down

0 comments on commit f2dbdc6

Please sign in to comment.