diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/DebeziumConstants.java b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/DebeziumConstants.java index d3e115e3d9d8..3acaca513755 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/DebeziumConstants.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/debezium/DebeziumConstants.java @@ -35,6 +35,7 @@ public class DebeziumConstants { public static final String INCOMING_TS_MS_FIELD = "ts_ms"; public static final String INCOMING_SOURCE_NAME_FIELD = "source.name"; + public static final String INCOMING_SOURCE_SCHEMA_FIELD = "source.schema"; public static final String INCOMING_SOURCE_TS_MS_FIELD = "source.ts_ms"; public static final String INCOMING_SOURCE_TXID_FIELD = "source.txId"; @@ -51,6 +52,7 @@ public class DebeziumConstants { public static final String FLATTENED_OP_COL_NAME = "_change_operation_type"; public static final String UPSTREAM_PROCESSING_TS_COL_NAME = "_upstream_event_processed_ts_ms"; public static final String FLATTENED_SHARD_NAME = "db_shard_source_partition"; + public static final String FLATTENED_SCHEMA_NAME = "db_schema_source_partition"; public static final String FLATTENED_TS_COL_NAME = "_event_origin_ts_ms"; public static final String FLATTENED_TX_ID_COL_NAME = "_event_tx_id"; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java index bf4381792cfc..19b0508735ab 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/debezium/PostgresDebeziumSource.java @@ -57,6 +57,7 @@ protected Dataset processDataset(Dataset rowDataset) { String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME), + String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_SCHEMA_FIELD, DebeziumConstants.FLATTENED_SCHEMA_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_LSN_FIELD, DebeziumConstants.FLATTENED_LSN_COL_NAME), @@ -70,6 +71,7 @@ protected Dataset processDataset(Dataset rowDataset) { String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME), + String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_SCHEMA_FIELD, DebeziumConstants.FLATTENED_SCHEMA_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME), String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_LSN_FIELD, DebeziumConstants.FLATTENED_LSN_COL_NAME),