Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple collections support in the destination #131

Merged
merged 12 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml → .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: build
name: test
hariso marked this conversation as resolved.
Show resolved Hide resolved

on:
push:
branches: [ main ]
pull_request:

jobs:
build:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
24 changes: 24 additions & 0 deletions .github/workflows/validate-generated-files.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: validate-generated-files

on:
push:
branches: [ main ]
pull_request:

jobs:
validate-generated-files:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: 'go.mod'

- name: Check generated files
run: |
export PATH=$PATH:$(go env GOPATH)/bin
make install-tools generate
git diff
git diff --exit-code --numstat
75 changes: 42 additions & 33 deletions destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ import (
const (
// TODO same constant is defined in packages longpoll, logrepl and destination
// use same constant everywhere
raulb marked this conversation as resolved.
Show resolved Hide resolved
MetadataPostgresTable = "postgres.table"
MetadataOpenCDCCollection = "opencdc.collection"
)

type Destination struct {
sdk.UnimplementedDestination

config destination.Config
getTableName destination.TableFn

conn *pgx.Conn
config destination.Config
stmtBuilder sq.StatementBuilderType
}

Expand All @@ -61,6 +63,12 @@ func (d *Destination) Configure(_ context.Context, cfg map[string]string) error
if err != nil {
return fmt.Errorf("invalid url: %w", err)
}

d.getTableName, err = d.config.TableFunction()
if err != nil {
return fmt.Errorf("invalid table name or table function: %w", err)
}

return nil
}

Expand All @@ -81,13 +89,13 @@ func (d *Destination) Write(ctx context.Context, recs []sdk.Record) (int, error)
var err error
switch rec.Operation {
case sdk.OperationCreate:
err = d.handleInsert(rec, b)
err = d.handleInsert(ctx, rec, b)
case sdk.OperationUpdate:
err = d.handleUpdate(rec, b)
err = d.handleUpdate(ctx, rec, b)
case sdk.OperationDelete:
err = d.handleDelete(rec, b)
err = d.handleDelete(ctx, rec, b)
case sdk.OperationSnapshot:
err = d.handleInsert(rec, b)
err = d.handleInsert(ctx, rec, b)
default:
return 0, fmt.Errorf("invalid operation %q", rec.Operation)
}
Expand Down Expand Up @@ -121,33 +129,33 @@ func (d *Destination) Teardown(ctx context.Context) error {
// table. It checks for the existence of a key. If no key is present or a key
// exists and no key column name is configured, it will plainly insert the data.
// Otherwise it upserts the record.
func (d *Destination) handleInsert(r sdk.Record, b *pgx.Batch) error {
func (d *Destination) handleInsert(ctx context.Context, r sdk.Record, b *pgx.Batch) error {
if !d.hasKey(r) || d.config.Key == "" {
return d.insert(r, b)
return d.insert(ctx, r, b)
}
return d.upsert(r, b)
return d.upsert(ctx, r, b)
}

// handleUpdate adds a query to the batch that updates the record in the target
// table. It assumes the record has a key and fails if one is not present.
func (d *Destination) handleUpdate(r sdk.Record, b *pgx.Batch) error {
func (d *Destination) handleUpdate(ctx context.Context, r sdk.Record, b *pgx.Batch) error {
if !d.hasKey(r) {
return fmt.Errorf("key must be provided on update actions")
}
// TODO handle case if the key was updated
return d.upsert(r, b)
return d.upsert(ctx, r, b)
}

// handleDelete adds a query to the batch that deletes the record from the
// target table. It assumes the record has a key and fails if one is not present.
func (d *Destination) handleDelete(r sdk.Record, b *pgx.Batch) error {
func (d *Destination) handleDelete(ctx context.Context, r sdk.Record, b *pgx.Batch) error {
if !d.hasKey(r) {
return fmt.Errorf("key must be provided on delete actions")
}
return d.remove(r, b)
return d.remove(ctx, r, b)
}

func (d *Destination) upsert(r sdk.Record, b *pgx.Batch) error {
func (d *Destination) upsert(ctx context.Context, r sdk.Record, b *pgx.Batch) error {
payload, err := d.getPayload(r)
if err != nil {
return fmt.Errorf("failed to get payload: %w", err)
Expand All @@ -160,7 +168,7 @@ func (d *Destination) upsert(r sdk.Record, b *pgx.Batch) error {

keyColumnName := d.getKeyColumnName(key, d.config.Key)

tableName, err := d.getTableName(r.Metadata)
tableName, err := d.getTableName(r)
if err != nil {
return fmt.Errorf("failed to get table name for write: %w", err)
}
Expand All @@ -169,21 +177,30 @@ func (d *Destination) upsert(r sdk.Record, b *pgx.Batch) error {
if err != nil {
return fmt.Errorf("error formatting query: %w", err)
}
sdk.Logger(ctx).Trace().
Str("table_name", tableName).
Any("key", map[string]interface{}{keyColumnName: key[keyColumnName]}).
Msg("upserting record")

b.Queue(query, args...)
return nil
}

func (d *Destination) remove(r sdk.Record, b *pgx.Batch) error {
func (d *Destination) remove(ctx context.Context, r sdk.Record, b *pgx.Batch) error {
key, err := d.getKey(r)
if err != nil {
return err
}
keyColumnName := d.getKeyColumnName(key, d.config.Key)
tableName, err := d.getTableName(r.Metadata)
tableName, err := d.getTableName(r)
if err != nil {
return fmt.Errorf("failed to get table name for write: %w", err)
}

sdk.Logger(ctx).Trace().
Str("table_name", tableName).
Any("key", map[string]interface{}{keyColumnName: key[keyColumnName]}).
Msg("deleting record")
query, args, err := d.stmtBuilder.
Delete(tableName).
Where(sq.Eq{keyColumnName: key[keyColumnName]}).
Expand All @@ -199,20 +216,26 @@ func (d *Destination) remove(r sdk.Record, b *pgx.Batch) error {
// insert is an append-only operation that doesn't care about keys, but
// can error on constraints violations so should only be used when no table
// key or unique constraints are otherwise present.
func (d *Destination) insert(r sdk.Record, b *pgx.Batch) error {
tableName, err := d.getTableName(r.Metadata)
func (d *Destination) insert(ctx context.Context, r sdk.Record, b *pgx.Batch) error {
tableName, err := d.getTableName(r)
if err != nil {
return err
}

key, err := d.getKey(r)
if err != nil {
return err
}

payload, err := d.getPayload(r)
if err != nil {
return err
}

colArgs, valArgs := d.formatColumnsAndValues(key, payload)
sdk.Logger(ctx).Trace().
Str("table_name", tableName).
Msg("inserting record")
query, args, err := d.stmtBuilder.
Insert(tableName).
Columns(colArgs...).
Expand Down Expand Up @@ -322,20 +345,6 @@ func (d *Destination) formatColumnsAndValues(key, payload sdk.StructuredData) ([
return colArgs, valArgs
}

// return either the record's metadata value for table or the default configured
// value for table. Otherwise it will error since we require some table to be
// set to write into.
func (d *Destination) getTableName(metadata map[string]string) (string, error) {
tableName, ok := metadata[MetadataPostgresTable]
if !ok {
if d.config.Table == "" {
return "", fmt.Errorf("no table provided for default writes")
}
return d.config.Table, nil
}
return tableName, nil
}

// getKeyColumnName will return the name of the first item in the key or the
// connector-configured default name of the key column name.
func (d *Destination) getKeyColumnName(key sdk.StructuredData, defaultKeyName string) string {
Expand Down
43 changes: 42 additions & 1 deletion destination/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,52 @@

package destination

import (
"bytes"
"fmt"
"strings"
"text/template"

"github.com/Masterminds/sprig/v3"
sdk "github.com/conduitio/conduit-connector-sdk"
)

type TableFn func(sdk.Record) (string, error)

type Config struct {
// URL is the connection string for the Postgres database.
URL string `json:"url" validate:"required"`
// Table is used as the target table into which records are inserted.
Table string `json:"table"`
Table string `json:"table" default:"{{ index .Metadata \"opencdc.collection\" }}"`
// Key represents the column name for the key used to identify and update existing rows.
Key string `json:"key"`
}

// TableFunction returns a function that determines the table for each record individually.
// The function might be returning a static table name.
// If the table is neither static nor a template, an error is returned.
func (c Config) TableFunction() (f TableFn, err error) {
// Not a template, i.e. it's a static table name
if !strings.HasPrefix(c.Table, "{{") && !strings.HasSuffix(c.Table, "}}") {
return func(_ sdk.Record) (string, error) {
return c.Table, nil
}, nil
}

// Try to parse the table
t, err := template.New("table").Funcs(sprig.FuncMap()).Parse(c.Table)
if err != nil {
// The table is not a valid Go template.
return nil, fmt.Errorf("table is neither a valid static table nor a valid Go template: %w", err)
}

// The table is a valid template, return TableFn.
var buf bytes.Buffer
return func(r sdk.Record) (string, error) {
buf.Reset()
if err := t.Execute(&buf, r); err != nil {
return "", fmt.Errorf("failed to execute table template: %w", err)
}
return buf.String(), nil
}, nil
}
2 changes: 1 addition & 1 deletion destination/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions destination_test.go → destination_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ func TestDestination_Write(t *testing.T) {
tableName := test.SetupTestTable(ctx, t, conn)

d := NewDestination()
err := d.Configure(ctx, map[string]string{"url": test.RegularConnString})
err := d.Configure(
ctx,
map[string]string{
"url": test.RegularConnString,
"table": "{{ index .Metadata \"opencdc.collection\" }}",
},
)
is.NoErr(err)
err = d.Open(ctx)
is.NoErr(err)
Expand All @@ -49,7 +55,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationSnapshot,
Metadata: map[string]string{MetadataPostgresTable: tableName},
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5000},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -64,7 +70,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationCreate,
Metadata: map[string]string{MetadataPostgresTable: tableName},
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 5},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -79,7 +85,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataPostgresTable: tableName},
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 6},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -94,7 +100,7 @@ func TestDestination_Write(t *testing.T) {
record: sdk.Record{
Position: sdk.Position("foo"),
Operation: sdk.OperationUpdate,
Metadata: map[string]string{MetadataPostgresTable: tableName},
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Key: sdk.StructuredData{"id": 1},
Payload: sdk.Change{
After: sdk.StructuredData{
Expand All @@ -108,7 +114,7 @@ func TestDestination_Write(t *testing.T) {
name: "delete",
record: sdk.Record{
Position: sdk.Position("foo"),
Metadata: map[string]string{MetadataPostgresTable: tableName},
Metadata: map[string]string{MetadataOpenCDCCollection: tableName},
Operation: sdk.OperationDelete,
Key: sdk.StructuredData{"id": 4},
},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/conduitio/conduit-connector-postgres
go 1.21

require (
github.com/Masterminds/sprig/v3 v3.2.3
github.com/Masterminds/squirrel v1.5.4
github.com/conduitio/conduit-connector-sdk v0.8.0
github.com/golangci/golangci-lint v1.57.2
Expand All @@ -26,7 +27,6 @@ require (
github.com/Masterminds/goutils v1.1.1 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
github.com/Masterminds/semver/v3 v3.2.0 // indirect
github.com/Masterminds/sprig/v3 v3.2.3 // indirect
github.com/OpenPeeDeeP/depguard/v2 v2.2.0 // indirect
github.com/alecthomas/go-check-sumtype v0.1.4 // indirect
github.com/alexkohler/nakedret/v2 v2.0.4 // indirect
Expand Down
6 changes: 3 additions & 3 deletions source/paramgen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.