Skip to content

Commit

Permalink
[HUDI-4833] Add Postgres Schema Name to Postgres Debezium Source (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
modi95 authored Sep 12, 2022
1 parent ddb1c1a commit dc5ec0c
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class DebeziumConstants {
public static final String INCOMING_TS_MS_FIELD = "ts_ms";

public static final String INCOMING_SOURCE_NAME_FIELD = "source.name";
public static final String INCOMING_SOURCE_SCHEMA_FIELD = "source.schema";
public static final String INCOMING_SOURCE_TS_MS_FIELD = "source.ts_ms";
public static final String INCOMING_SOURCE_TXID_FIELD = "source.txId";

Expand All @@ -51,6 +52,7 @@ public class DebeziumConstants {
public static final String FLATTENED_OP_COL_NAME = "_change_operation_type";
public static final String UPSTREAM_PROCESSING_TS_COL_NAME = "_upstream_event_processed_ts_ms";
public static final String FLATTENED_SHARD_NAME = "db_shard_source_partition";
public static final String FLATTENED_SCHEMA_NAME = "db_schema_source_partition";
public static final String FLATTENED_TS_COL_NAME = "_event_origin_ts_ms";
public static final String FLATTENED_TX_ID_COL_NAME = "_event_tx_id";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ protected Dataset<Row> processDataset(Dataset<Row> rowDataset) {
String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_SCHEMA_FIELD, DebeziumConstants.FLATTENED_SCHEMA_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_LSN_FIELD, DebeziumConstants.FLATTENED_LSN_COL_NAME),
Expand All @@ -70,6 +71,7 @@ protected Dataset<Row> processDataset(Dataset<Row> rowDataset) {
String.format("%s as %s", DebeziumConstants.INCOMING_OP_FIELD, DebeziumConstants.FLATTENED_OP_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_TS_MS_FIELD, DebeziumConstants.UPSTREAM_PROCESSING_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_NAME_FIELD, DebeziumConstants.FLATTENED_SHARD_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_SCHEMA_FIELD, DebeziumConstants.FLATTENED_SCHEMA_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TS_MS_FIELD, DebeziumConstants.FLATTENED_TS_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_TXID_FIELD, DebeziumConstants.FLATTENED_TX_ID_COL_NAME),
String.format("%s as %s", DebeziumConstants.INCOMING_SOURCE_LSN_FIELD, DebeziumConstants.FLATTENED_LSN_COL_NAME),
Expand Down

0 comments on commit dc5ec0c

Please sign in to comment.