From 54353c4bfad8edbf3b1c03af85ad6bd6430c8d88 Mon Sep 17 00:00:00 2001 From: uicosp Date: Wed, 27 Mar 2024 12:13:11 +0800 Subject: [PATCH] Update StarRocksSinkOptions.java auto add `__op` for primary key table even when user specified `sink.properties.columns` Signed-off-by: uicosp auto add `__op` for primary key table even when user specified `sink.properties.columns` auto add `__op` for primary key table even when user specified `sink.properties.columns` --- .../flink/table/sink/StarRocksSinkOptions.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 7e4a5337..2510fe30 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -514,7 +514,15 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .enableUpsertDelete(supportUpsertDelete()); if (hasColumnMappingProperty()) { - defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns")); + List columns = new ArrayList<>(Arrays.asList(streamLoadProps.get("columns").split(","))); + if (supportUpsertDelete()) { + // auto add `__op` for primary key table even when user specified `sink.properties.columns`. + // in case user use a bitmap datatype and need set up `sink.properties.columns`, may forget to add `__op`. + if (columns.stream().noneMatch(it -> it.equals("__op"))) { + columns.add("__op"); + } + } + defaultTablePropertiesBuilder.columns(String.join(",", columns)); } else if (getTableSchemaFieldNames() != null) { // don't need to add "columns" header in following cases // 1. use csv format but the flink and starrocks schemas are aligned