From 606347684af1957bc6b2b154161077b4e0010292 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Mon, 18 Dec 2023 10:33:50 -0800 Subject: [PATCH 01/10] support reading from multiple tables using logical replication slots --- README.md | 10 ++++------ source.go | 4 ++-- source/config.go | 8 ++++---- source/logrepl/cdc.go | 23 ++++++++++++++--------- source/logrepl/cdc_test.go | 2 +- source/logrepl/handler.go | 17 +++++++++-------- source/logrepl/internal/subscription.go | 1 + source/paramgen.go | 6 +++--- 8 files changed, 38 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 67061c5..724e5ba 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ configuration. ## Change Data Capture This connector implements CDC features for PostgreSQL by creating a logical replication slot and a publication that -listens to changes in the configured table. Every detected change is converted into a record and returned in the call to +listens to changes in the configured tables. Every detected change is converted into a record and returned in the call to `Read`. If there is no record available at the moment `Read` is called, it blocks until a record is available or the connector receives a stop signal. @@ -59,11 +59,9 @@ returned. ## Configuration Options | name | description | required | default | -| ------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | -------- | ---------------------- | +|---------------------------|-------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------| | `url` | Connection string for the Postgres database. | true | | -| `table` | The name of the table in Postgres that the connector should read. | true | | -| `columns` | Comma separated list of column names that should be included in each Record's payload. | false | (all columns) | -| `key` | Column name that records should use for their `Key` fields. | false | (primary key of table) | +| `table` | List of table names to read from, separated by comma. example: `"employees,offices,payments"` | true | | | `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | | `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | | `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | @@ -91,7 +89,7 @@ If there is no key, the record will be simply appended. ## Configuration Options | name | description | required | default | -| ------- | --------------------------------------------------------------------------- | -------- | ------- | +|---------|-----------------------------------------------------------------------------|----------|---------| | `url` | Connection string for the Postgres database. | true | | | `table` | The name of the table in Postgres that the connector should write to. | false | | | `key` | Column name used to detect if the target table already contains the record. | false | | diff --git a/source.go b/source.go index aa37e5b..fe6c3b2 100644 --- a/source.go +++ b/source.go @@ -77,7 +77,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { Position: pos, SlotName: s.config.LogreplSlotName, PublicationName: s.config.LogreplPublicationName, - TableName: s.config.Table, + Tables: s.config.Table, KeyColumnName: s.config.Key, Columns: s.config.Columns, }) @@ -96,7 +96,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { snap, err := longpoll.NewSnapshotIterator( ctx, s.conn, - s.config.Table, + s.config.Table[0], //todo: only the first table for now s.config.Columns, s.config.Key) if err != nil { diff --git a/source/config.go b/source/config.go index abd8e61..f181c04 100644 --- a/source/config.go +++ b/source/config.go @@ -40,11 +40,11 @@ const ( type Config struct { // URL is the connection string for the Postgres database. URL string `json:"url" validate:"required"` - // The name of the table in Postgres that the connector should read. - Table string `json:"table" validate:"required"` - // Comma separated list of column names that should be included in each Record's payload. + // List of table names to read from, separated by a comma. + Table []string `json:"table" validate:"required"` + // todo: reove param, Comma separated list of column names that should be included in each Record's payload. Columns []string `json:"columns"` - // Column name that records should use for their `Key` fields. + // todo: remove param, Column name that records should use for their `Key` fields. Key string `json:"key"` // Whether or not the plugin will take a snapshot of the entire table before starting cdc mode. diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index ada92f9..39fff66 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -36,7 +36,7 @@ type Config struct { Position sdk.Position SlotName string PublicationName string - TableName string + Tables []string KeyColumnName string Columns []string } @@ -154,21 +154,26 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er } } - keyColumn, err := i.getKeyColumn(ctx, conn) - if err != nil { - return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", i.config.TableName, err) + var err error + keyColumnMp := make(map[string]string, len(i.config.Tables)) + for _, tableName := range i.config.Tables { + // Call function and store the result in the map + keyColumnMp[tableName], err = i.getKeyColumn(ctx, conn, tableName) + if err != nil { + return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) + } } sub := internal.NewSubscription( conn.Config().Config, i.config.SlotName, i.config.PublicationName, - []string{i.config.TableName}, + i.config.Tables, lsn, NewCDCHandler( internal.NewRelationSet(conn.ConnInfo()), - keyColumn, - i.config.Columns, + keyColumnMp, + i.config.Columns, // todo, delete this option, use processors instead i.records, ).Handle, ) @@ -179,7 +184,7 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er // getKeyColumn queries the db for the name of the primary key column for a // table if one exists and returns it. -func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn) (string, error) { +func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) { if i.config.KeyColumnName != "" { return i.config.KeyColumnName, nil } @@ -188,7 +193,7 @@ func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn) (string, FROM information_schema.key_column_usage WHERE table_name = $1 AND constraint_name LIKE '%_pkey' LIMIT 1;` - row := conn.QueryRow(ctx, query, i.config.TableName) + row := conn.QueryRow(ctx, query, tableName) var colName string err := row.Scan(&colName) diff --git a/source/logrepl/cdc_test.go b/source/logrepl/cdc_test.go index c33afd2..6e18b92 100644 --- a/source/logrepl/cdc_test.go +++ b/source/logrepl/cdc_test.go @@ -137,7 +137,7 @@ func TestIterator_Next(t *testing.T) { func testIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table string) *CDCIterator { is := is.New(t) config := Config{ - TableName: table, + Tables: []string{table}, PublicationName: table, // table is random, reuse for publication name SlotName: table, // table is random, reuse for slot name } diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index 73aaa98..ac6b7c8 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -27,7 +27,7 @@ import ( // CDCHandler is responsible for handling logical replication messages, // converting them to a record and sending them to a channel. type CDCHandler struct { - keyColumn string + keyColumnMp map[string]string columns map[string]bool // columns can be used to filter only specific columns relationSet *internal.RelationSet out chan<- sdk.Record @@ -35,7 +35,7 @@ type CDCHandler struct { func NewCDCHandler( rs *internal.RelationSet, - keyColumn string, + keyColumnMp map[string]string, columns []string, out chan<- sdk.Record, ) *CDCHandler { @@ -47,7 +47,7 @@ func NewCDCHandler( } } return &CDCHandler{ - keyColumn: keyColumn, + keyColumnMp: keyColumnMp, columns: columnSet, relationSet: rs, out: out, @@ -106,7 +106,7 @@ func (h *CDCHandler) handleInsert( rec := sdk.Util.Source.NewRecordCreate( LSNToPosition(lsn), h.buildRecordMetadata(rel), - h.buildRecordKey(newValues), + h.buildRecordKey(newValues, rel.RelationName), h.buildRecordPayload(newValues), ) return h.send(ctx, rec) @@ -139,7 +139,7 @@ func (h *CDCHandler) handleUpdate( rec := sdk.Util.Source.NewRecordUpdate( LSNToPosition(lsn), h.buildRecordMetadata(rel), - h.buildRecordKey(newValues), + h.buildRecordKey(newValues, rel.RelationName), h.buildRecordPayload(oldValues), h.buildRecordPayload(newValues), ) @@ -166,7 +166,7 @@ func (h *CDCHandler) handleDelete( rec := sdk.Util.Source.NewRecordDelete( LSNToPosition(lsn), h.buildRecordMetadata(rel), - h.buildRecordKey(oldValues), + h.buildRecordKey(oldValues, rel.RelationName), ) return h.send(ctx, rec) } @@ -190,10 +190,11 @@ func (h *CDCHandler) buildRecordMetadata(relation *pglogrepl.RelationMessage) ma // buildRecordKey takes the values from the message and extracts the key that // matches the configured keyColumnName. -func (h *CDCHandler) buildRecordKey(values map[string]pgtype.Value) sdk.Data { +func (h *CDCHandler) buildRecordKey(values map[string]pgtype.Value, table string) sdk.Data { + keyColumn := h.keyColumnMp[table] key := make(sdk.StructuredData) for k, v := range values { - if h.keyColumn == k { + if keyColumn == k { key[k] = v.Get() break // TODO add support for composite keys } diff --git a/source/logrepl/internal/subscription.go b/source/logrepl/internal/subscription.go index 72423f6..43b53d1 100644 --- a/source/logrepl/internal/subscription.go +++ b/source/logrepl/internal/subscription.go @@ -305,6 +305,7 @@ func (s *Subscription) createPublication(ctx context.Context, conn *pgconn.PgCon conn, s.Publication, CreatePublicationOptions{Tables: s.Tables}, + //CreatePublicationOptions{AllTables: true}, ); err != nil { // If creating the publication fails with code 42710, this means // the publication already exists. diff --git a/source/paramgen.go b/source/paramgen.go index d7fcbda..ffa4a12 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -19,13 +19,13 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "columns": { Default: "", - Description: "Comma separated list of column names that should be included in each Record's payload.", + Description: "todo: reove param, Comma separated list of column names that should be included in each Record's payload.", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{}, }, "key": { Default: "", - Description: "Column name that records should use for their `key` fields.", + Description: "todo: remove param, Column name that records should use for their `key` fields.", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{}, }, @@ -51,7 +51,7 @@ func (Config) Parameters() map[string]sdk.Parameter { }, "table": { Default: "", - Description: "The name of the table in Postgres that the connector should read.", + Description: "List of table names to read from, separated by a comma.", Type: sdk.ParameterTypeString, Validations: []sdk.Validation{ sdk.ValidationRequired{}, From f1ea3182016fdc821e18cd46e8722da63cc6dd80 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Mon, 18 Dec 2023 11:09:12 -0800 Subject: [PATCH 02/10] linter fix --- source/logrepl/internal/subscription.go | 1 - 1 file changed, 1 deletion(-) diff --git a/source/logrepl/internal/subscription.go b/source/logrepl/internal/subscription.go index 43b53d1..72423f6 100644 --- a/source/logrepl/internal/subscription.go +++ b/source/logrepl/internal/subscription.go @@ -305,7 +305,6 @@ func (s *Subscription) createPublication(ctx context.Context, conn *pgconn.PgCon conn, s.Publication, CreatePublicationOptions{Tables: s.Tables}, - //CreatePublicationOptions{AllTables: true}, ); err != nil { // If creating the publication fails with code 42710, this means // the publication already exists. From 27fa10e7d1ae838e2dc8aea9d476e1ea7482c4d0 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Mon, 8 Jan 2024 14:19:31 -0800 Subject: [PATCH 03/10] remove "columns" param --- source.go | 33 +++++++++++++++++++++++++++++++-- source/config.go | 2 -- source/logrepl/cdc.go | 2 -- source/logrepl/handler.go | 17 ++--------------- source/longpoll/snapshot.go | 2 +- source/paramgen.go | 6 ------ 6 files changed, 34 insertions(+), 28 deletions(-) diff --git a/source.go b/source.go index fe6c3b2..30efc28 100644 --- a/source.go +++ b/source.go @@ -52,6 +52,10 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { if err != nil { return fmt.Errorf("invalid url: %w", err) } + // todo: when cdcMode "auto" is implemented, change this check + if len(s.config.Table) > 1 && s.config.CDCMode == source.CDCModeLongPolling { + return fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table") + } return nil } func (s *Source) Open(ctx context.Context, pos sdk.Position) error { @@ -59,6 +63,10 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { if err != nil { return fmt.Errorf("failed to connect to database: %w", err) } + columns, err := s.getTableColumns(conn) + if err != nil { + return fmt.Errorf("failed to connect to database: %w", err) + } s.conn = conn switch s.config.CDCMode { @@ -79,7 +87,6 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { PublicationName: s.config.LogreplPublicationName, Tables: s.config.Table, KeyColumnName: s.config.Key, - Columns: s.config.Columns, }) if err != nil { return fmt.Errorf("failed to create logical replication iterator: %w", err) @@ -97,7 +104,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { ctx, s.conn, s.config.Table[0], //todo: only the first table for now - s.config.Columns, + columns, s.config.Key) if err != nil { return fmt.Errorf("failed to create long polling iterator: %w", err) @@ -131,3 +138,25 @@ func (s *Source) Teardown(ctx context.Context) error { } return nil } + +func (s *Source) getTableColumns(conn *pgx.Conn) ([]string, error) { + query := fmt.Sprintf("SELECT column_name FROM information_schema.columns WHERE table_name = '%s'", s.config.Table[0]) + + rows, err := conn.Query(context.Background(), query) + if err != nil { + return nil, err + } + defer rows.Close() + + var columns []string + for rows.Next() { + var columnName string + err := rows.Scan(&columnName) + if err != nil { + return nil, err + } + columns = append(columns, columnName) + } + + return columns, nil +} diff --git a/source/config.go b/source/config.go index f181c04..8d2cf06 100644 --- a/source/config.go +++ b/source/config.go @@ -42,8 +42,6 @@ type Config struct { URL string `json:"url" validate:"required"` // List of table names to read from, separated by a comma. Table []string `json:"table" validate:"required"` - // todo: reove param, Comma separated list of column names that should be included in each Record's payload. - Columns []string `json:"columns"` // todo: remove param, Column name that records should use for their `Key` fields. Key string `json:"key"` diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index 39fff66..a020f25 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -38,7 +38,6 @@ type Config struct { PublicationName string Tables []string KeyColumnName string - Columns []string } // CDCIterator asynchronously listens for events from the logical replication @@ -173,7 +172,6 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er NewCDCHandler( internal.NewRelationSet(conn.ConnInfo()), keyColumnMp, - i.config.Columns, // todo, delete this option, use processors instead i.records, ).Handle, ) diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index ac6b7c8..9418e7a 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -28,7 +28,6 @@ import ( // converting them to a record and sending them to a channel. type CDCHandler struct { keyColumnMp map[string]string - columns map[string]bool // columns can be used to filter only specific columns relationSet *internal.RelationSet out chan<- sdk.Record } @@ -36,19 +35,10 @@ type CDCHandler struct { func NewCDCHandler( rs *internal.RelationSet, keyColumnMp map[string]string, - columns []string, out chan<- sdk.Record, ) *CDCHandler { - var columnSet map[string]bool - if len(columns) > 0 { - columnSet = make(map[string]bool) - for _, col := range columns { - columnSet[col] = true - } - } return &CDCHandler{ keyColumnMp: keyColumnMp, - columns: columnSet, relationSet: rs, out: out, } @@ -210,11 +200,8 @@ func (h *CDCHandler) buildRecordPayload(values map[string]pgtype.Value) sdk.Data } payload := make(sdk.StructuredData) for k, v := range values { - // filter columns if columns are specified - if h.columns == nil || h.columns[k] { - value := v.Get() - payload[k] = value - } + value := v.Get() + payload[k] = value } return payload } diff --git a/source/longpoll/snapshot.go b/source/longpoll/snapshot.go index ef4e9de..08ea175 100644 --- a/source/longpoll/snapshot.go +++ b/source/longpoll/snapshot.go @@ -72,7 +72,7 @@ type SnapshotIterator struct { // * NewSnapshotIterator attempts to load the sql rows into the SnapshotIterator and will // immediately begin to return them to subsequent Read calls. // * It acquires a read only transaction lock before reading the table. -// * If Teardown is called while a snpashot is in progress, it will return an +// * If Teardown is called while a snapshot is in progress, it will return an // ErrSnapshotInterrupt error. func NewSnapshotIterator(ctx context.Context, conn *pgx.Conn, table string, columns []string, key string) (*SnapshotIterator, error) { s := &SnapshotIterator{ diff --git a/source/paramgen.go b/source/paramgen.go index ffa4a12..f859bca 100644 --- a/source/paramgen.go +++ b/source/paramgen.go @@ -17,12 +17,6 @@ func (Config) Parameters() map[string]sdk.Parameter { sdk.ValidationInclusion{List: []string{"auto", "logrepl", "long_polling"}}, }, }, - "columns": { - Default: "", - Description: "todo: reove param, Comma separated list of column names that should be included in each Record's payload.", - Type: sdk.ParameterTypeString, - Validations: []sdk.Validation{}, - }, "key": { Default: "", Description: "todo: remove param, Column name that records should use for their `key` fields.", From 744963172b91d58374ba4d054666fd77a2363337 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Fri, 12 Jan 2024 11:17:43 -0800 Subject: [PATCH 04/10] update "key" param to accept multiple tables --- README.md | 17 +++++++++-------- source.go | 23 +++++++++++++++++------ source/config.go | 4 ++-- source/logrepl/cdc.go | 22 +++++++++++++--------- 4 files changed, 41 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 724e5ba..c5307d3 100644 --- a/README.md +++ b/README.md @@ -58,14 +58,15 @@ returned. ## Configuration Options -| name | description | required | default | -|---------------------------|-------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------| -| `url` | Connection string for the Postgres database. | true | | -| `table` | List of table names to read from, separated by comma. example: `"employees,offices,payments"` | true | | -| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | -| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | -| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | -| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | +| name | description | required | default | +|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------| +| `url` | Connection string for the Postgres database. | true | | +| `table` | List of table names to read from, separated by comma. example: `"employees,offices,payments"` | true | | +| `key` | List of Key column names per table, separated by comma. example:`"table1:key1,table2:key2"`, if not supplied, the table primary key will be used as the `'Key'` field for the records. | false | | +| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` | +| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` | +| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` | +| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` | # Destination diff --git a/source.go b/source.go index 30efc28..2a7defa 100644 --- a/source.go +++ b/source.go @@ -17,6 +17,7 @@ package postgres import ( "context" "fmt" + "strings" "github.com/conduitio/conduit-connector-postgres/source" "github.com/conduitio/conduit-connector-postgres/source/logrepl" @@ -29,9 +30,10 @@ import ( type Source struct { sdk.UnimplementedSource - iterator source.Iterator - config source.Config - conn *pgx.Conn + iterator source.Iterator + config source.Config + conn *pgx.Conn + KeyColumnMp map[string]string } func NewSource() sdk.Source { @@ -53,9 +55,18 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { return fmt.Errorf("invalid url: %w", err) } // todo: when cdcMode "auto" is implemented, change this check - if len(s.config.Table) > 1 && s.config.CDCMode == source.CDCModeLongPolling { + if len(s.config.Table) != 1 && s.config.CDCMode == source.CDCModeLongPolling { return fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table") } + s.KeyColumnMp = make(map[string]string, len(s.config.Table)) + for _, pair := range s.config.Key { + // Split each pair into key and value + parts := strings.Split(pair, ":") + if len(parts) != 2 { + return fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key") + } + s.KeyColumnMp[parts[0]] = parts[1] + } return nil } func (s *Source) Open(ctx context.Context, pos sdk.Position) error { @@ -86,7 +97,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { SlotName: s.config.LogreplSlotName, PublicationName: s.config.LogreplPublicationName, Tables: s.config.Table, - KeyColumnName: s.config.Key, + KeyColumnMp: s.KeyColumnMp, }) if err != nil { return fmt.Errorf("failed to create logical replication iterator: %w", err) @@ -105,7 +116,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { s.conn, s.config.Table[0], //todo: only the first table for now columns, - s.config.Key) + s.KeyColumnMp[s.config.Table[0]]) if err != nil { return fmt.Errorf("failed to create long polling iterator: %w", err) } diff --git a/source/config.go b/source/config.go index 8d2cf06..991367a 100644 --- a/source/config.go +++ b/source/config.go @@ -42,8 +42,8 @@ type Config struct { URL string `json:"url" validate:"required"` // List of table names to read from, separated by a comma. Table []string `json:"table" validate:"required"` - // todo: remove param, Column name that records should use for their `Key` fields. - Key string `json:"key"` + // list of Key column names per table ex:"table1:key1,table2:key2", records should use the key values for their `Key` fields. + Key []string `json:"key"` // Whether or not the plugin will take a snapshot of the entire table before starting cdc mode. SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"` diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index a020f25..5d662e4 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -37,7 +37,7 @@ type Config struct { SlotName string PublicationName string Tables []string - KeyColumnName string + KeyColumnMp map[string]string } // CDCIterator asynchronously listens for events from the logical replication @@ -154,12 +154,16 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er } var err error - keyColumnMp := make(map[string]string, len(i.config.Tables)) + if i.config.KeyColumnMp == nil { + i.config.KeyColumnMp = make(map[string]string, len(i.config.Tables)) + } for _, tableName := range i.config.Tables { - // Call function and store the result in the map - keyColumnMp[tableName], err = i.getKeyColumn(ctx, conn, tableName) - if err != nil { - return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) + // get unprovided table keys + if _, ok := i.config.KeyColumnMp[tableName]; !ok { + i.config.KeyColumnMp[tableName], err = i.getKeyColumn(ctx, conn, tableName) + if err != nil { + return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) + } } } @@ -171,7 +175,7 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er lsn, NewCDCHandler( internal.NewRelationSet(conn.ConnInfo()), - keyColumnMp, + i.config.KeyColumnMp, i.records, ).Handle, ) @@ -183,8 +187,8 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er // getKeyColumn queries the db for the name of the primary key column for a // table if one exists and returns it. func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) { - if i.config.KeyColumnName != "" { - return i.config.KeyColumnName, nil + if i.config.KeyColumnMp[tableName] != "" { + return i.config.KeyColumnMp[tableName], nil } query := `SELECT column_name From cab22f43f6a9fa581835515116a1a98f6dc9ad22 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Tue, 16 Jan 2024 12:22:15 -0800 Subject: [PATCH 05/10] fix config doc comments --- source/config.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/config.go b/source/config.go index 991367a..775f491 100644 --- a/source/config.go +++ b/source/config.go @@ -40,12 +40,12 @@ const ( type Config struct { // URL is the connection string for the Postgres database. URL string `json:"url" validate:"required"` - // List of table names to read from, separated by a comma. + // Table is a List of table names to read from, separated by a comma. Table []string `json:"table" validate:"required"` - // list of Key column names per table ex:"table1:key1,table2:key2", records should use the key values for their `Key` fields. + // Key is a list of Key column names per table, ex:"table1:key1,table2:key2", records should use the key values for their `Key` fields. Key []string `json:"key"` - // Whether or not the plugin will take a snapshot of the entire table before starting cdc mode. + // SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode. SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"` // CDCMode determines how the connector should listen to changes. CDCMode CDCMode `json:"cdcMode" validate:"inclusion=auto|logrepl|long_polling" default:"auto"` From 3a86ee11d408f6e7f3b3f773e488bfd66f318542 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Tue, 16 Jan 2024 12:29:47 -0800 Subject: [PATCH 06/10] rename KeyColumnMp to tableKeys --- source.go | 16 ++++++++-------- source/logrepl/cdc.go | 22 +++++++++++----------- source/logrepl/handler.go | 8 ++++---- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/source.go b/source.go index 2a7defa..9808e4c 100644 --- a/source.go +++ b/source.go @@ -30,10 +30,10 @@ import ( type Source struct { sdk.UnimplementedSource - iterator source.Iterator - config source.Config - conn *pgx.Conn - KeyColumnMp map[string]string + iterator source.Iterator + config source.Config + conn *pgx.Conn + tableKeys map[string]string } func NewSource() sdk.Source { @@ -58,14 +58,14 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { if len(s.config.Table) != 1 && s.config.CDCMode == source.CDCModeLongPolling { return fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table") } - s.KeyColumnMp = make(map[string]string, len(s.config.Table)) + s.tableKeys = make(map[string]string, len(s.config.Table)) for _, pair := range s.config.Key { // Split each pair into key and value parts := strings.Split(pair, ":") if len(parts) != 2 { return fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key") } - s.KeyColumnMp[parts[0]] = parts[1] + s.tableKeys[parts[0]] = parts[1] } return nil } @@ -97,7 +97,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { SlotName: s.config.LogreplSlotName, PublicationName: s.config.LogreplPublicationName, Tables: s.config.Table, - KeyColumnMp: s.KeyColumnMp, + TableKeys: s.tableKeys, }) if err != nil { return fmt.Errorf("failed to create logical replication iterator: %w", err) @@ -116,7 +116,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { s.conn, s.config.Table[0], //todo: only the first table for now columns, - s.KeyColumnMp[s.config.Table[0]]) + s.tableKeys[s.config.Table[0]]) if err != nil { return fmt.Errorf("failed to create long polling iterator: %w", err) } diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index 5d662e4..ccc4e35 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -37,7 +37,7 @@ type Config struct { SlotName string PublicationName string Tables []string - KeyColumnMp map[string]string + TableKeys map[string]string } // CDCIterator asynchronously listens for events from the logical replication @@ -154,13 +154,13 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er } var err error - if i.config.KeyColumnMp == nil { - i.config.KeyColumnMp = make(map[string]string, len(i.config.Tables)) + if i.config.TableKeys == nil { + i.config.TableKeys = make(map[string]string, len(i.config.Tables)) } for _, tableName := range i.config.Tables { // get unprovided table keys - if _, ok := i.config.KeyColumnMp[tableName]; !ok { - i.config.KeyColumnMp[tableName], err = i.getKeyColumn(ctx, conn, tableName) + if _, ok := i.config.TableKeys[tableName]; !ok { + i.config.TableKeys[tableName], err = i.getTableKeys(ctx, conn, tableName) if err != nil { return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) } @@ -175,7 +175,7 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er lsn, NewCDCHandler( internal.NewRelationSet(conn.ConnInfo()), - i.config.KeyColumnMp, + i.config.TableKeys, i.records, ).Handle, ) @@ -184,11 +184,11 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er return nil } -// getKeyColumn queries the db for the name of the primary key column for a +// getTableKeys queries the db for the name of the primary key column for a // table if one exists and returns it. -func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) { - if i.config.KeyColumnMp[tableName] != "" { - return i.config.KeyColumnMp[tableName], nil +func (i *CDCIterator) getTableKeys(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) { + if i.config.TableKeys[tableName] != "" { + return i.config.TableKeys[tableName], nil } query := `SELECT column_name @@ -200,7 +200,7 @@ func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn, tableNam var colName string err := row.Scan(&colName) if err != nil { - return "", fmt.Errorf("getKeyColumn query failed: %w", err) + return "", fmt.Errorf("getTableKeys query failed: %w", err) } return colName, nil diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index 9418e7a..54152b5 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -27,18 +27,18 @@ import ( // CDCHandler is responsible for handling logical replication messages, // converting them to a record and sending them to a channel. type CDCHandler struct { - keyColumnMp map[string]string + tableKeys map[string]string relationSet *internal.RelationSet out chan<- sdk.Record } func NewCDCHandler( rs *internal.RelationSet, - keyColumnMp map[string]string, + tableKeys map[string]string, out chan<- sdk.Record, ) *CDCHandler { return &CDCHandler{ - keyColumnMp: keyColumnMp, + tableKeys: tableKeys, relationSet: rs, out: out, } @@ -181,7 +181,7 @@ func (h *CDCHandler) buildRecordMetadata(relation *pglogrepl.RelationMessage) ma // buildRecordKey takes the values from the message and extracts the key that // matches the configured keyColumnName. func (h *CDCHandler) buildRecordKey(values map[string]pgtype.Value, table string) sdk.Data { - keyColumn := h.keyColumnMp[table] + keyColumn := h.tableKeys[table] key := make(sdk.StructuredData) for k, v := range values { if keyColumn == k { From a111e9dfee8e916929c3fca3c3af99c7433823e8 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Tue, 16 Jan 2024 13:03:32 -0800 Subject: [PATCH 07/10] address reviews --- source.go | 12 +++++++----- source/logrepl/cdc.go | 15 ++++++--------- source/logrepl/handler.go | 3 +-- 3 files changed, 14 insertions(+), 16 deletions(-) diff --git a/source.go b/source.go index 9808e4c..860d153 100644 --- a/source.go +++ b/source.go @@ -74,7 +74,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { if err != nil { return fmt.Errorf("failed to connect to database: %w", err) } - columns, err := s.getTableColumns(conn) + columns, err := s.getTableColumns(ctx, conn) if err != nil { return fmt.Errorf("failed to connect to database: %w", err) } @@ -150,10 +150,10 @@ func (s *Source) Teardown(ctx context.Context) error { return nil } -func (s *Source) getTableColumns(conn *pgx.Conn) ([]string, error) { - query := fmt.Sprintf("SELECT column_name FROM information_schema.columns WHERE table_name = '%s'", s.config.Table[0]) +func (s *Source) getTableColumns(ctx context.Context, conn *pgx.Conn) ([]string, error) { + query := "SELECT column_name FROM information_schema.columns WHERE table_name = ?" - rows, err := conn.Query(context.Background(), query) + rows, err := conn.Query(ctx, query, s.config.Table[0]) if err != nil { return nil, err } @@ -168,6 +168,8 @@ func (s *Source) getTableColumns(conn *pgx.Conn) ([]string, error) { } columns = append(columns, columnName) } - + if err = rows.Err(); err != nil { + return nil, fmt.Errorf("rows error: %w", err) + } return columns, nil } diff --git a/source/logrepl/cdc.go b/source/logrepl/cdc.go index ccc4e35..265191c 100644 --- a/source/logrepl/cdc.go +++ b/source/logrepl/cdc.go @@ -159,11 +159,12 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er } for _, tableName := range i.config.Tables { // get unprovided table keys - if _, ok := i.config.TableKeys[tableName]; !ok { - i.config.TableKeys[tableName], err = i.getTableKeys(ctx, conn, tableName) - if err != nil { - return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) - } + if _, ok := i.config.TableKeys[tableName]; ok { + continue // key was provided manually + } + i.config.TableKeys[tableName], err = i.getTableKeys(ctx, conn, tableName) + if err != nil { + return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err) } } @@ -187,10 +188,6 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er // getTableKeys queries the db for the name of the primary key column for a // table if one exists and returns it. func (i *CDCIterator) getTableKeys(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) { - if i.config.TableKeys[tableName] != "" { - return i.config.TableKeys[tableName], nil - } - query := `SELECT column_name FROM information_schema.key_column_usage WHERE table_name = $1 AND constraint_name LIKE '%_pkey' diff --git a/source/logrepl/handler.go b/source/logrepl/handler.go index 54152b5..cdbdb37 100644 --- a/source/logrepl/handler.go +++ b/source/logrepl/handler.go @@ -200,8 +200,7 @@ func (h *CDCHandler) buildRecordPayload(values map[string]pgtype.Value) sdk.Data } payload := make(sdk.StructuredData) for k, v := range values { - value := v.Get() - payload[k] = value + payload[k] = v.Get() } return payload } From 7b9be7e21b30e83d54f740486bf129df1e92eeb6 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Tue, 16 Jan 2024 14:52:53 -0800 Subject: [PATCH 08/10] add config validate method + validate tests --- source.go | 19 ++--------- source/config.go | 30 ++++++++++++++++ source/config_test.go | 79 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 111 insertions(+), 17 deletions(-) create mode 100644 source/config_test.go diff --git a/source.go b/source.go index 860d153..cb9ec12 100644 --- a/source.go +++ b/source.go @@ -17,7 +17,6 @@ package postgres import ( "context" "fmt" - "strings" "github.com/conduitio/conduit-connector-postgres/source" "github.com/conduitio/conduit-connector-postgres/source/logrepl" @@ -49,23 +48,9 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error { if err != nil { return err } - // try parsing the url - _, err = pgx.ParseConfig(s.config.URL) + s.tableKeys, err = s.config.Validate() if err != nil { - return fmt.Errorf("invalid url: %w", err) - } - // todo: when cdcMode "auto" is implemented, change this check - if len(s.config.Table) != 1 && s.config.CDCMode == source.CDCModeLongPolling { - return fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table") - } - s.tableKeys = make(map[string]string, len(s.config.Table)) - for _, pair := range s.config.Key { - // Split each pair into key and value - parts := strings.Split(pair, ":") - if len(parts) != 2 { - return fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key") - } - s.tableKeys[parts[0]] = parts[1] + return err } return nil } diff --git a/source/config.go b/source/config.go index 775f491..cf86dab 100644 --- a/source/config.go +++ b/source/config.go @@ -16,6 +16,13 @@ package source +import ( + "fmt" + "strings" + + "github.com/jackc/pgx/v4" +) + type SnapshotMode string const ( @@ -57,3 +64,26 @@ type Config struct { // connector uses logical replication to listen to changes (see CDCMode). LogreplSlotName string `json:"logrepl.slotName" default:"conduitslot"` } + +// Validate validates the provided config values. +func (c Config) Validate() (map[string]string, error) { + // try parsing the url + _, err := pgx.ParseConfig(c.URL) + if err != nil { + return nil, fmt.Errorf("invalid url: %w", err) + } + // todo: when cdcMode "auto" is implemented, change this check + if len(c.Table) != 1 && c.CDCMode == CDCModeLongPolling { + return nil, fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table") + } + tableKeys := make(map[string]string, len(c.Table)) + for _, pair := range c.Key { + // Split each pair into key and value + parts := strings.Split(pair, ":") + if len(parts) != 2 { + return nil, fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key") + } + tableKeys[parts[0]] = parts[1] + } + return tableKeys, nil +} diff --git a/source/config_test.go b/source/config_test.go new file mode 100644 index 0000000..6d33ccb --- /dev/null +++ b/source/config_test.go @@ -0,0 +1,79 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package source + +import ( + "fmt" + "testing" + + "github.com/matryer/is" +) + +func TestConfig_Validate(t *testing.T) { + testCases := []struct { + name string + cfg Config + wantErr bool + }{{ + name: "valid config", + cfg: Config{ + URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", + Table: []string{"table1", "table2"}, + Key: []string{"table1:key1"}, + CDCMode: CDCModeLogrepl, + }, + wantErr: false, + }, { + name: "invalid postgres url", + cfg: Config{ + URL: "postgresql", + Table: []string{"table1", "table2"}, + Key: []string{"table1:key1"}, + CDCMode: CDCModeLogrepl, + }, + wantErr: true, + }, { + name: "invalid multiple tables for long polling", + cfg: Config{ + URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", + Table: []string{"table1", "table2"}, + Key: []string{"table1:key1"}, + CDCMode: CDCModeLongPolling, + }, + wantErr: true, + }, { + name: "invalid key list format", + cfg: Config{ + URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb", + Table: []string{"table1", "table2"}, + Key: []string{"key1,key2"}, + CDCMode: CDCModeLogrepl, + }, + wantErr: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + is := is.New(t) + _, err := tc.cfg.Validate() + fmt.Println(err) + if tc.wantErr { + is.True(err != nil) + return + } + is.True(err == nil) + }) + } +} From efe3fd6f315b61cd3bf1d6798b1ff65883696c92 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Wed, 17 Jan 2024 20:06:17 -0800 Subject: [PATCH 09/10] address reviews --- source.go | 3 ++- source/config_test.go | 2 -- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source.go b/source.go index cb9ec12..d2373fc 100644 --- a/source.go +++ b/source.go @@ -64,6 +64,7 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { return fmt.Errorf("failed to connect to database: %w", err) } s.conn = conn + fmt.Println("COLUMNS: ", columns) switch s.config.CDCMode { case source.CDCModeAuto: @@ -136,7 +137,7 @@ func (s *Source) Teardown(ctx context.Context) error { } func (s *Source) getTableColumns(ctx context.Context, conn *pgx.Conn) ([]string, error) { - query := "SELECT column_name FROM information_schema.columns WHERE table_name = ?" + query := "SELECT column_name FROM information_schema.columns WHERE table_name = $1" rows, err := conn.Query(ctx, query, s.config.Table[0]) if err != nil { diff --git a/source/config_test.go b/source/config_test.go index 6d33ccb..5d59360 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -15,7 +15,6 @@ package source import ( - "fmt" "testing" "github.com/matryer/is" @@ -68,7 +67,6 @@ func TestConfig_Validate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { is := is.New(t) _, err := tc.cfg.Validate() - fmt.Println(err) if tc.wantErr { is.True(err != nil) return From e851d4255770806e16a0675d88333acaf09a4587 Mon Sep 17 00:00:00 2001 From: maha-hajja Date: Wed, 17 Jan 2024 20:07:06 -0800 Subject: [PATCH 10/10] address reviews2 --- source.go | 1 - 1 file changed, 1 deletion(-) diff --git a/source.go b/source.go index d2373fc..8caf771 100644 --- a/source.go +++ b/source.go @@ -64,7 +64,6 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error { return fmt.Errorf("failed to connect to database: %w", err) } s.conn = conn - fmt.Println("COLUMNS: ", columns) switch s.config.CDCMode { case source.CDCModeAuto: