Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support reading from multiple tables using logical replication slots #113

Merged
merged 10 commits into from
Jan 18, 2024
23 changes: 11 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ configuration.
## Change Data Capture

This connector implements CDC features for PostgreSQL by creating a logical replication slot and a publication that
listens to changes in the configured table. Every detected change is converted into a record and returned in the call to
listens to changes in the configured tables. Every detected change is converted into a record and returned in the call to
`Read`. If there is no record available at the moment `Read` is called, it blocks until a record is available or the
connector receives a stop signal.

Expand Down Expand Up @@ -58,16 +58,15 @@ returned.

## Configuration Options

| name | description | required | default |
| ------------------------- | ----------------------------------------------------------------------------------------------------------------------------------- | -------- | ---------------------- |
| `url` | Connection string for the Postgres database. | true | |
| `table` | The name of the table in Postgres that the connector should read. | true | |
| `columns` | Comma separated list of column names that should be included in each Record's payload. | false | (all columns) |
| `key` | Column name that records should use for their `Key` fields. | false | (primary key of table) |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |
| name | description | required | default |
|---------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | List of table names to read from, separated by comma. example: `"employees,offices,payments"` | true | |
| `key` | List of Key column names per table, separated by comma. example:`"table1:key1,table2:key2"`, if not supplied, the table primary key will be used as the `'Key'` field for the records. | false | |
| `snapshotMode` | Whether or not the plugin will take a snapshot of the entire table before starting cdc mode (allowed values: `initial` or `never`). | false | `initial` |
| `cdcMode` | Determines the CDC mode (allowed values: `auto`, `logrepl` or `long_polling`). | false | `auto` |
| `logrepl.publicationName` | Name of the publication to listen for WAL events. | false | `conduitpub` |
| `logrepl.slotName` | Name of the slot opened for replication events. | false | `conduitslot` |

# Destination

Expand All @@ -91,7 +90,7 @@ If there is no key, the record will be simply appended.
## Configuration Options

| name | description | required | default |
| ------- | --------------------------------------------------------------------------- | -------- | ------- |
|---------|-----------------------------------------------------------------------------|----------|---------|
| `url` | Connection string for the Postgres database. | true | |
| `table` | The name of the table in Postgres that the connector should write to. | false | |
| `key` | Column name used to detect if the target table already contains the record. | false | |
Expand Down
51 changes: 39 additions & 12 deletions source.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
type Source struct {
sdk.UnimplementedSource

iterator source.Iterator
config source.Config
conn *pgx.Conn
iterator source.Iterator
config source.Config
conn *pgx.Conn
tableKeys map[string]string
}

func NewSource() sdk.Source {
Expand All @@ -47,10 +48,9 @@ func (s *Source) Configure(_ context.Context, cfg map[string]string) error {
if err != nil {
return err
}
// try parsing the url
_, err = pgx.ParseConfig(s.config.URL)
s.tableKeys, err = s.config.Validate()
if err != nil {
return fmt.Errorf("invalid url: %w", err)
return err
}
return nil
}
Expand All @@ -59,6 +59,10 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
columns, err := s.getTableColumns(ctx, conn)
if err != nil {
return fmt.Errorf("failed to connect to database: %w", err)
}
s.conn = conn

switch s.config.CDCMode {
Expand All @@ -77,9 +81,8 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
Position: pos,
SlotName: s.config.LogreplSlotName,
PublicationName: s.config.LogreplPublicationName,
TableName: s.config.Table,
KeyColumnName: s.config.Key,
Columns: s.config.Columns,
Tables: s.config.Table,
TableKeys: s.tableKeys,
})
if err != nil {
return fmt.Errorf("failed to create logical replication iterator: %w", err)
Expand All @@ -96,9 +99,9 @@ func (s *Source) Open(ctx context.Context, pos sdk.Position) error {
snap, err := longpoll.NewSnapshotIterator(
ctx,
s.conn,
s.config.Table,
s.config.Columns,
s.config.Key)
s.config.Table[0], //todo: only the first table for now
columns,
s.tableKeys[s.config.Table[0]])
if err != nil {
return fmt.Errorf("failed to create long polling iterator: %w", err)
}
Expand Down Expand Up @@ -131,3 +134,27 @@ func (s *Source) Teardown(ctx context.Context) error {
}
return nil
}

func (s *Source) getTableColumns(ctx context.Context, conn *pgx.Conn) ([]string, error) {
query := "SELECT column_name FROM information_schema.columns WHERE table_name = $1"

rows, err := conn.Query(ctx, query, s.config.Table[0])
if err != nil {
return nil, err
}
defer rows.Close()

var columns []string
for rows.Next() {
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
var columnName string
err := rows.Scan(&columnName)
if err != nil {
return nil, err
}
columns = append(columns, columnName)
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("rows error: %w", err)
}
return columns, nil
}
44 changes: 36 additions & 8 deletions source/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package source

import (
"fmt"
"strings"

"github.com/jackc/pgx/v4"
)

type SnapshotMode string

const (
Expand All @@ -40,14 +47,12 @@ const (
type Config struct {
// URL is the connection string for the Postgres database.
URL string `json:"url" validate:"required"`
// The name of the table in Postgres that the connector should read.
Table string `json:"table" validate:"required"`
// Comma separated list of column names that should be included in each Record's payload.
Columns []string `json:"columns"`
// Column name that records should use for their `Key` fields.
Key string `json:"key"`

// Whether or not the plugin will take a snapshot of the entire table before starting cdc mode.
// Table is a List of table names to read from, separated by a comma.
Table []string `json:"table" validate:"required"`
// Key is a list of Key column names per table, ex:"table1:key1,table2:key2", records should use the key values for their `Key` fields.
Key []string `json:"key"`

// SnapshotMode is whether the plugin will take a snapshot of the entire table before starting cdc mode.
SnapshotMode SnapshotMode `json:"snapshotMode" validate:"inclusion=initial|never" default:"initial"`
// CDCMode determines how the connector should listen to changes.
CDCMode CDCMode `json:"cdcMode" validate:"inclusion=auto|logrepl|long_polling" default:"auto"`
Expand All @@ -59,3 +64,26 @@ type Config struct {
// connector uses logical replication to listen to changes (see CDCMode).
LogreplSlotName string `json:"logrepl.slotName" default:"conduitslot"`
}

// Validate validates the provided config values.
func (c Config) Validate() (map[string]string, error) {
// try parsing the url
_, err := pgx.ParseConfig(c.URL)
if err != nil {
return nil, fmt.Errorf("invalid url: %w", err)
}
// todo: when cdcMode "auto" is implemented, change this check
if len(c.Table) != 1 && c.CDCMode == CDCModeLongPolling {
return nil, fmt.Errorf("multi tables are only supported for logrepl CDCMode, please provide only one table")
}
tableKeys := make(map[string]string, len(c.Table))
for _, pair := range c.Key {
// Split each pair into key and value
parts := strings.Split(pair, ":")
if len(parts) != 2 {
return nil, fmt.Errorf("wrong format for the configuration %q, use comma separated pairs of tables and keys, example: table1:key1,table2:key2", "key")
}
tableKeys[parts[0]] = parts[1]
}
return tableKeys, nil
}
77 changes: 77 additions & 0 deletions source/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package source

import (
"testing"

"github.com/matryer/is"
)

func TestConfig_Validate(t *testing.T) {
maha-hajja marked this conversation as resolved.
Show resolved Hide resolved
testCases := []struct {
name string
cfg Config
wantErr bool
}{{
name: "valid config",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: false,
}, {
name: "invalid postgres url",
cfg: Config{
URL: "postgresql",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
}, {
name: "invalid multiple tables for long polling",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"table1:key1"},
CDCMode: CDCModeLongPolling,
},
wantErr: true,
}, {
name: "invalid key list format",
cfg: Config{
URL: "postgresql://meroxauser:meroxapass@127.0.0.1:5432/meroxadb",
Table: []string{"table1", "table2"},
Key: []string{"key1,key2"},
CDCMode: CDCModeLogrepl,
},
wantErr: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
is := is.New(t)
_, err := tc.cfg.Validate()
if tc.wantErr {
is.True(err != nil)
return
}
is.True(err == nil)
})
}
}
38 changes: 21 additions & 17 deletions source/logrepl/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ type Config struct {
Position sdk.Position
SlotName string
PublicationName string
TableName string
KeyColumnName string
Columns []string
Tables []string
TableKeys map[string]string
}

// CDCIterator asynchronously listens for events from the logical replication
Expand Down Expand Up @@ -154,21 +153,30 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er
}
}

keyColumn, err := i.getKeyColumn(ctx, conn)
if err != nil {
return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", i.config.TableName, err)
var err error
if i.config.TableKeys == nil {
i.config.TableKeys = make(map[string]string, len(i.config.Tables))
}
for _, tableName := range i.config.Tables {
// get unprovided table keys
if _, ok := i.config.TableKeys[tableName]; ok {
continue // key was provided manually
}
i.config.TableKeys[tableName], err = i.getTableKeys(ctx, conn, tableName)
if err != nil {
return fmt.Errorf("failed to find key for table %s (try specifying it manually): %w", tableName, err)
lovromazgon marked this conversation as resolved.
Show resolved Hide resolved
}
}

sub := internal.NewSubscription(
conn.Config().Config,
i.config.SlotName,
i.config.PublicationName,
[]string{i.config.TableName},
i.config.Tables,
lsn,
NewCDCHandler(
internal.NewRelationSet(conn.ConnInfo()),
keyColumn,
i.config.Columns,
i.config.TableKeys,
i.records,
).Handle,
)
Expand All @@ -177,23 +185,19 @@ func (i *CDCIterator) attachSubscription(ctx context.Context, conn *pgx.Conn) er
return nil
}

// getKeyColumn queries the db for the name of the primary key column for a
// getTableKeys queries the db for the name of the primary key column for a
// table if one exists and returns it.
func (i *CDCIterator) getKeyColumn(ctx context.Context, conn *pgx.Conn) (string, error) {
if i.config.KeyColumnName != "" {
return i.config.KeyColumnName, nil
}

func (i *CDCIterator) getTableKeys(ctx context.Context, conn *pgx.Conn, tableName string) (string, error) {
query := `SELECT column_name
FROM information_schema.key_column_usage
WHERE table_name = $1 AND constraint_name LIKE '%_pkey'
LIMIT 1;`
row := conn.QueryRow(ctx, query, i.config.TableName)
row := conn.QueryRow(ctx, query, tableName)

var colName string
err := row.Scan(&colName)
if err != nil {
return "", fmt.Errorf("getKeyColumn query failed: %w", err)
return "", fmt.Errorf("getTableKeys query failed: %w", err)
}

return colName, nil
Expand Down
2 changes: 1 addition & 1 deletion source/logrepl/cdc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestIterator_Next(t *testing.T) {
func testIterator(ctx context.Context, t *testing.T, pool *pgxpool.Pool, table string) *CDCIterator {
is := is.New(t)
config := Config{
TableName: table,
Tables: []string{table},
PublicationName: table, // table is random, reuse for publication name
SlotName: table, // table is random, reuse for slot name
}
Expand Down
Loading
Loading