diff --git a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java index 7e4a5337..76d4bfd2 100644 --- a/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java +++ b/src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java @@ -514,7 +514,15 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) { .enableUpsertDelete(supportUpsertDelete()); if (hasColumnMappingProperty()) { - defaultTablePropertiesBuilder.columns(streamLoadProps.get("columns")); + String columns = streamLoadProps.get("columns"); + if (supportUpsertDelete()) { + // auto add `__op` for primary key table even when user specified `sink.properties.columns`. + // in case user use a bitmap datatype and need set up `sink.properties.columns`, may forget to add `__op`. + if (!columns.endsWith(",__op")) { + columns = columns + ",__op"; + } + } + defaultTablePropertiesBuilder.columns(columns); } else if (getTableSchemaFieldNames() != null) { // don't need to add "columns" header in following cases // 1. use csv format but the flink and starrocks schemas are aligned