Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Common] Fix the CommonError msg for paimon sink #7591

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ public static SeaTunnelRuntimeException writeRowErrorWithFiledsCountNotMatch(
String connector, int sourceFieldsNum, int sinkFieldsNum) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("sourceFiledName", String.valueOf(sourceFieldsNum));
params.put("sourceFiledType", String.valueOf(sinkFieldsNum));
params.put("sourceFieldsNum", String.valueOf(sourceFieldsNum));
params.put("sinkFieldsNum", String.valueOf(sinkFieldsNum));
return new SeaTunnelRuntimeException(
WRITE_SEATUNNEL_ROW_ERROR_WITH_FILEDS_NOT_MATCH, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import lombok.extern.slf4j.Slf4j;

import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -62,6 +64,7 @@
import java.util.Map;

/** Unit tests for {@link RowConverter} */
@Slf4j
public class RowConverterTest {

private SeaTunnelRow seaTunnelRow;
Expand All @@ -71,6 +74,7 @@ public class RowConverterTest {
private SeaTunnelRowType seaTunnelRowType;

private volatile boolean isCaseSensitive = false;
private volatile boolean subtractOneFiledInSource = false;
private volatile int index = 0;
private static final String[] filedNames = {
"c_tinyint",
Expand All @@ -89,6 +93,23 @@ public class RowConverterTest {
"c_array"
};

public static final SeaTunnelDataType<?>[] seaTunnelDataTypes = {
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
new DecimalType(30, 8),
BasicType.STRING_TYPE,
PrimitiveByteArrayType.INSTANCE,
BasicType.BOOLEAN_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
};

public static final List<String> KEY_NAME_LIST = Arrays.asList("c_tinyint");

public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
Expand Down Expand Up @@ -139,7 +160,7 @@ public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {

@BeforeEach
public void generateTestData() {
initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index);
initSeaTunnelRowTypeCaseSensitive(isCaseSensitive, index, subtractOneFiledInSource);
byte tinyint = 1;
short smallint = 2;
int intNum = 3;
Expand Down Expand Up @@ -216,34 +237,27 @@ public void generateTestData() {
internalRow = binaryRow;
}

private void initSeaTunnelRowTypeCaseSensitive(boolean isUpperCase, int index) {
String[] oneUpperCaseFiledNames = Arrays.copyOf(filedNames, filedNames.length);
private void initSeaTunnelRowTypeCaseSensitive(
boolean isUpperCase, int index, boolean subtractOneFiledInSource) {
String[] oneUpperCaseFiledNames =
Arrays.copyOf(
filedNames,
subtractOneFiledInSource ? filedNames.length - 1 : filedNames.length);
if (isUpperCase) {
oneUpperCaseFiledNames[index] = oneUpperCaseFiledNames[index].toUpperCase();
}
seaTunnelRowType =
new SeaTunnelRowType(
oneUpperCaseFiledNames,
new SeaTunnelDataType<?>[] {
BasicType.BYTE_TYPE,
BasicType.SHORT_TYPE,
BasicType.INT_TYPE,
BasicType.LONG_TYPE,
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
new DecimalType(30, 8),
BasicType.STRING_TYPE,
PrimitiveByteArrayType.INSTANCE,
BasicType.BOOLEAN_TYPE,
LocalTimeType.LOCAL_DATE_TYPE,
LocalTimeType.LOCAL_DATE_TIME_TYPE,
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE),
ArrayType.STRING_ARRAY_TYPE
});
SeaTunnelDataType<?>[] newSeaTunnelDataTypes =
Arrays.copyOf(
seaTunnelDataTypes,
subtractOneFiledInSource
? seaTunnelDataTypes.length - 1
: filedNames.length);
seaTunnelRowType = new SeaTunnelRowType(oneUpperCaseFiledNames, newSeaTunnelDataTypes);
}

@Test
public void seaTunnelToPaimon() {
TableSchema sinkTableSchema = getTableSchema(30, 8);
SeaTunnelRuntimeException actualException =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
Expand All @@ -259,9 +273,26 @@ public void seaTunnelToPaimon() {
Assertions.assertEquals(exceptedException.getMessage(), actualException.getMessage());

InternalRow reconvert =
RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, getTableSchema(30, 8));
RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, sinkTableSchema);
Assertions.assertEquals(reconvert, internalRow);

subtractOneFiledInSource = true;
generateTestData();
SeaTunnelRuntimeException filedNumsActualException =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() ->
RowConverter.reconvert(
seaTunnelRow, seaTunnelRowType, sinkTableSchema));
SeaTunnelRuntimeException filedNumsExceptException =
CommonError.writeRowErrorWithFiledsCountNotMatch(
"Paimon",
seaTunnelRowType.getTotalFields(),
sinkTableSchema.fields().size());
Assertions.assertEquals(
filedNumsExceptException.getMessage(), filedNumsActualException.getMessage());

subtractOneFiledInSource = false;
isCaseSensitive = true;

for (int i = 0; i < filedNames.length; i++) {
Expand All @@ -271,7 +302,6 @@ public void seaTunnelToPaimon() {
DataType exceptDataType =
RowTypeConverter.reconvert(sourceFiledname, seaTunnelRowType.getFieldType(i));
DataField exceptDataField = new DataField(i, sourceFiledname, exceptDataType);
TableSchema sinkTableSchema = getTableSchema(30, 8);
SeaTunnelRuntimeException actualException1 =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
Expand Down
Loading