Skip to content

Commit

Permalink
[Improve][Connector-V2] Time supports default value (#7639)
Browse files Browse the repository at this point in the history
  • Loading branch information
corgy-w authored Sep 13, 2024
1 parent ecd3173 commit 3397868
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 8 deletions.
73 changes: 71 additions & 2 deletions docs/en/connector-v2/source/FakeSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -401,15 +401,86 @@ source {
}
```

### Options `defaultValue` Case

Custom data can be generated by `row` and `columns`. For the time type, obtain the current time by
`CURRENT_TIMESTAMP``CURRENT_TIME``CURRENT_DATE`

```hocon
schema = {
fields {
pk_id = bigint
name = string
score = int
time1 = timestamp
time2 = time
time3 = date
}
}
# use rows
rows = [
{
kind = INSERT
fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
}
]
```

```hocon
schema = {
# use columns
columns = [
{
name = book_publication_time
type = timestamp
defaultValue = "2024-09-12 15:45:30"
comment = "book publication time"
},
{
name = book_publication_time2
type = timestamp
defaultValue = CURRENT_TIMESTAMP
comment = "book publication time2"
},
{
name = book_publication_time3
type = time
defaultValue = "15:45:30"
comment = "book publication time3"
},
{
name = book_publication_time4
type = time
defaultValue = CURRENT_TIME
comment = "book publication time4"
},
{
name = book_publication_time5
type = date
defaultValue = "2024-09-12"
comment = "book publication time5"
},
{
name = book_publication_time6
type = date
defaultValue = CURRENT_DATE
comment = "book publication time6"
}
]
}
```

### Use Vector Example

```hocon
source {
FakeSource {
row.num = 10
# Low priority
vector.dimension= 4
binary.vector.dimension = 8
# Low priority
schema = {
table = "simple_example"
columns = [
Expand Down Expand Up @@ -452,8 +523,6 @@ source {
```

ps: columnScale needs to be improved in schema-feature , used to specify the dimension of vectors and precision of float. For details, see [here](../../concept/schema-feature.md#Columns)

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;
import java.util.ArrayList;
Expand Down Expand Up @@ -451,6 +452,7 @@ public static FakeConfig buildWithConfig(ReadonlyConfig readonlyConfig) {
}

@Getter
@Setter
@AllArgsConstructor
public static class RowData implements Serializable {
static final String KEY_KIND = "kind";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.ArrayType;
Expand All @@ -25,8 +31,11 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import org.apache.seatunnel.connectors.seatunnel.fake.utils.FakeDataRandomUtils;
Expand All @@ -35,12 +44,24 @@
import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;

import static org.apache.seatunnel.api.table.type.SqlType.TIME;

public class FakeDataGenerator {
private static final String CURRENT_DATE = "CURRENT_DATE";
private static final String CURRENT_TIME = "CURRENT_TIME";
private static final String CURRENT_TIMESTAMP = "CURRENT_TIMESTAMP";

private final ObjectMapper OBJECTMAPPER = new ObjectMapper();

private final CatalogTable catalogTable;
private final FakeConfig fakeConfig;
private final JsonDeserializationSchema jsonDeserializationSchema;
Expand Down Expand Up @@ -92,7 +113,10 @@ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
// Use manual configuration data preferentially
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
if (fakeConfig.getFakeRows() != null) {
SeaTunnelDataType<?>[] fieldTypes = catalogTable.getSeaTunnelRowType().getFieldTypes();
String[] fieldNames = catalogTable.getSeaTunnelRowType().getFieldNames();
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
customField(rowData, fieldTypes, fieldNames);
seaTunnelRows.add(convertRow(rowData));
}
} else {
Expand All @@ -103,6 +127,69 @@ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
return seaTunnelRows;
}

private void customField(
FakeConfig.RowData rowData, SeaTunnelDataType<?>[] fieldTypes, String[] fieldNames) {
if (rowData.getFieldsJson() == null) {
return;
}

try {
JsonNode jsonNode = OBJECTMAPPER.readTree(rowData.getFieldsJson());
int arity = fieldTypes.length;

for (int i = 0; i < arity; i++) {
SeaTunnelDataType<?> fieldType = fieldTypes[i];
JsonNode field = jsonNode.isArray() ? jsonNode.get(i) : jsonNode.get(fieldNames[i]);

if (field == null) {
continue;
}

String newValue = getNewValueForField(fieldType.getSqlType(), field.asText());
if (newValue != null) {
jsonNode = replaceFieldValue(jsonNode, i, fieldNames[i], newValue);
}
}

rowData.setFieldsJson(jsonNode.toString());
} catch (JsonProcessingException e) {
throw new FakeConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"The data type of the fake data is not supported",
e);
}
}

private String getNewValueForField(SqlType sqlType, String fieldValue) {
switch (sqlType) {
case TIME:
return fieldValue.equals(CURRENT_TIME) ? LocalTime.now().toString() : null;
case DATE:
return fieldValue.equalsIgnoreCase(CURRENT_DATE)
? LocalDate.now().toString()
: null;
case TIMESTAMP:
return fieldValue.equalsIgnoreCase(CURRENT_TIMESTAMP)
? LocalDateTime.now().toString()
: null;
default:
return null;
}
}

private JsonNode replaceFieldValue(
JsonNode jsonNode, int index, String fieldName, String newValue) {
JsonNode newFieldNode = OBJECTMAPPER.convertValue(newValue, JsonNode.class);

if (jsonNode.isArray()) {
((ArrayNode) jsonNode).set(index, newFieldNode);
} else {
((ObjectNode) jsonNode).set(fieldName, newFieldNode);
}

return jsonNode;
}

@SuppressWarnings("magicnumber")
private Object randomColumnValue(Column column) {
SeaTunnelDataType<?> fieldType = column.getDataType();
Expand Down Expand Up @@ -152,11 +239,47 @@ private Object randomColumnValue(Column column) {
case BYTES:
return value(column, String::getBytes, fakeDataRandomUtils::randomBytes);
case DATE:
return value(column, String::toString, fakeDataRandomUtils::randomLocalDate);
return value(
column,
defaultValue -> {
if (defaultValue.equalsIgnoreCase(CURRENT_DATE)) {
return LocalDate.now();
}
DateTimeFormatter dateTimeFormatter =
DateUtils.matchDateFormatter(defaultValue);
return LocalDate.parse(
defaultValue,
dateTimeFormatter == null
? DateTimeFormatter.ISO_LOCAL_DATE
: dateTimeFormatter);
},
fakeDataRandomUtils::randomLocalDate);
case TIME:
return value(column, String::toString, fakeDataRandomUtils::randomLocalTime);
return value(
column,
defaultValue -> {
if (defaultValue.equalsIgnoreCase(CURRENT_TIME)) {
return LocalTime.now();
}
return LocalTime.parse(defaultValue, DateTimeFormatter.ISO_LOCAL_TIME);
},
fakeDataRandomUtils::randomLocalTime);
case TIMESTAMP:
return value(column, String::toString, fakeDataRandomUtils::randomLocalDateTime);
return value(
column,
defaultValue -> {
if (defaultValue.equalsIgnoreCase(CURRENT_TIMESTAMP)) {
return LocalDateTime.now();
}
DateTimeFormatter dateTimeFormatter =
DateTimeUtils.matchDateTimeFormatter(defaultValue);
return LocalDateTime.parse(
defaultValue,
dateTimeFormatter == null
? DateTimeFormatter.ISO_LOCAL_DATE_TIME
: dateTimeFormatter);
},
fakeDataRandomUtils::randomLocalDateTime);
case ROW:
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes();
Object[] objects = new Object[fieldTypes.length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
Expand All @@ -41,6 +42,9 @@
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -173,13 +177,17 @@ public void testColumnDataParse(String conf) throws FileNotFoundException, URISy
8, ((ByteBuffer) seaTunnelRow.getField(8)).capacity() / 2);
// VectorType.VECTOR_SPARSE_FLOAT_TYPE
Assertions.assertEquals(8, ((Map) seaTunnelRow.getField(9)).size());
Assertions.assertNotNull(seaTunnelRow.getField(10).toString());
Assertions.assertNotNull(seaTunnelRow.getField(11).toString());
Assertions.assertEquals(
268,
436,
seaTunnelRow.getBytesSize(
new SeaTunnelRowType(
new String[] {
"field1", "field2", "field3", "field4", "field5",
"field6", "field7", "field8", "field9", "field10"
"field6", "field7", "field8", "field9", "field10",
"field11", "field12", "field13", "field14",
"field15", "field16"
},
new SeaTunnelDataType<?>[] {
BasicType.STRING_TYPE,
Expand All @@ -191,11 +199,36 @@ public void testColumnDataParse(String conf) throws FileNotFoundException, URISy
VectorType.VECTOR_BINARY_TYPE,
VectorType.VECTOR_FLOAT16_TYPE,
VectorType.VECTOR_BFLOAT16_TYPE,
VectorType.VECTOR_SPARSE_FLOAT_TYPE
VectorType.VECTOR_SPARSE_FLOAT_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_TIME_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TYPE
})));
});
}

@ParameterizedTest
@ValueSource(strings = {"fake-data.schema.default.conf"})
public void testDataParse(String conf) throws FileNotFoundException, URISyntaxException {
ReadonlyConfig testConfig = getTestConfigFile(conf);
FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig);
FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(fakeConfig);
List<SeaTunnelRow> seaTunnelRows =
fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum());
seaTunnelRows.forEach(
seaTunnelRow -> {
Assertions.assertInstanceOf(Long.class, seaTunnelRow.getField(0));
Assertions.assertInstanceOf(String.class, seaTunnelRow.getField(1));
Assertions.assertInstanceOf(Integer.class, seaTunnelRow.getField(2));
Assertions.assertInstanceOf(LocalDateTime.class, seaTunnelRow.getField(3));
Assertions.assertInstanceOf(LocalTime.class, seaTunnelRow.getField(4));
Assertions.assertInstanceOf(LocalDate.class, seaTunnelRow.getField(5));
});
}

private ReadonlyConfig getTestConfigFile(String configFile)
throws FileNotFoundException, URISyntaxException {
if (!configFile.startsWith("/")) {
Expand Down
Loading

0 comments on commit 3397868

Please sign in to comment.