From 48fa663ba9bdc98ae79b8ec11552b460ffaa9208 Mon Sep 17 00:00:00 2001 From: uicosp Date: Wed, 27 Mar 2024 12:13:11 +0800 Subject: [PATCH] Update StarRocksSinkOptions.java auto add `__op` for primary key table even when user specified `sink.properties.columns` --- .../flink/table/sink/StarRocksSinkOptions.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 7e4a5337..76d4bfd2 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -514,7 +514,15 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .enableUpsertDelete(supportUpsertDelete()); if (hasColumnMappingProperty()) { - defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns")); + String columns = streamLoadProps.get("columns"); + if (supportUpsertDelete()) { + // auto add `__op` for primary key table even when user specified `sink.properties.columns`. + // in case user use a bitmap datatype and need set up `sink.properties.columns`, may forget to add `__op`. + if (!columns.endsWith(",__op")) { + columns = columns + ",__op"; + } + } + defaultTablePropertiesBuilder.columns(columns); } else if (getTableSchemaFieldNames() != null) { // don't need to add "columns" header in following cases // 1. use csv format but the flink and starrocks schemas are aligned