From d50ee303491c8ca89b036c09b8afb6f4e6f13b7c Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 26 Sep 2024 22:38:46 -0700 Subject: [PATCH 1/2] Clean up. --- clients/mssql/store.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/clients/mssql/store.go b/clients/mssql/store.go index 66704e93e..f76809e0a 100644 --- a/clients/mssql/store.go +++ b/clients/mssql/store.go @@ -13,7 +13,6 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" "github.com/artie-labs/transfer/lib/optimization" "github.com/artie-labs/transfer/lib/sql" - "github.com/artie-labs/transfer/lib/typing" ) type Store struct { @@ -32,6 +31,10 @@ func getSchema(schema string) string { } func (s *Store) Dialect() sql.Dialect { + return s.dialect() +} + +func (s *Store) dialect() dialect.MSSQLDialect { return dialect.MSSQLDialect{} } @@ -59,12 +62,7 @@ func (s *Store) Sweep() error { return err } - mssqlDialect, err := typing.AssertType[dialect.MSSQLDialect](s.Dialect()) - if err != nil { - return err - } - - return shared.Sweep(s, tcs, mssqlDialect.BuildSweepQuery) + return shared.Sweep(s, tcs, s.dialect().BuildSweepQuery) } func (s *Store) Dedupe(_ sql.TableIdentifier, _ []string, _ bool) error { From 09d8b3b023ef1dc61eb5db515697bd5aadb134db Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Thu, 26 Sep 2024 22:43:21 -0700 Subject: [PATCH 2/2] Minor clean up. --- lib/cdc/format/format.go | 7 +------ lib/cdc/format/format_test.go | 23 ++++++++++++++++++++--- lib/cdc/mongo/debezium.go | 8 ++++---- lib/cdc/relational/debezium.go | 8 ++++---- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/lib/cdc/format/format.go b/lib/cdc/format/format.go index d8a9b3ffb..76a5b37fb 100644 --- a/lib/cdc/format/format.go +++ b/lib/cdc/format/format.go @@ -9,13 +9,8 @@ import ( "github.com/artie-labs/transfer/lib/logger" ) -var ( - r relational.Debezium - m mongo.Debezium -) - func GetFormatParser(label, topic string) cdc.Format { - for _, validFormat := range []cdc.Format{&r, &m} { + for _, validFormat := range []cdc.Format{relational.Debezium{}, mongo.Debezium{}} { for _, fmtLabel := range validFormat.Labels() { if fmtLabel == label { slog.Info("Loaded CDC Format parser...", diff --git a/lib/cdc/format/format_test.go b/lib/cdc/format/format_test.go index 93e737d7f..9b80b0144 100644 --- a/lib/cdc/format/format_test.go +++ b/lib/cdc/format/format_test.go @@ -7,13 +7,30 @@ import ( "github.com/stretchr/testify/assert" + "github.com/artie-labs/transfer/lib/cdc/mongo" + "github.com/artie-labs/transfer/lib/cdc/relational" "github.com/artie-labs/transfer/lib/config/constants" + "github.com/artie-labs/transfer/lib/typing" ) func TestGetFormatParser(t *testing.T) { - validFormats := []string{constants.DBZPostgresAltFormat, constants.DBZPostgresFormat, constants.DBZMongoFormat} - for _, validFormat := range validFormats { - assert.NotNil(t, GetFormatParser(validFormat, "topicA")) + { + // Relational + for _, format := range []string{constants.DBZPostgresAltFormat, constants.DBZPostgresFormat} { + formatParser := GetFormatParser(format, "topicA") + assert.NotNil(t, formatParser) + + _, err := typing.AssertType[relational.Debezium](formatParser) + assert.NoError(t, err) + } + } + { + // Mongo + formatParser := GetFormatParser(constants.DBZMongoFormat, "topicA") + assert.NotNil(t, formatParser) + + _, err := typing.AssertType[mongo.Debezium](formatParser) + assert.NoError(t, err) } } diff --git a/lib/cdc/mongo/debezium.go b/lib/cdc/mongo/debezium.go index c5f07b451..8cd4cbcdc 100644 --- a/lib/cdc/mongo/debezium.go +++ b/lib/cdc/mongo/debezium.go @@ -18,9 +18,9 @@ import ( "go.mongodb.org/mongo-driver/bson" ) -type Debezium string +type Debezium struct{} -func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { +func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var schemaEventPayload SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") @@ -70,11 +70,11 @@ func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { return &schemaEventPayload, nil } -func (d *Debezium) Labels() []string { +func (Debezium) Labels() []string { return []string{constants.DBZMongoFormat} } -func (d *Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { +func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { kvMap, err := debezium.ParsePartitionKey(key, tc.CDCKeyFormat) if err != nil { return nil, err diff --git a/lib/cdc/relational/debezium.go b/lib/cdc/relational/debezium.go index 23dd68c29..b7864bded 100644 --- a/lib/cdc/relational/debezium.go +++ b/lib/cdc/relational/debezium.go @@ -11,9 +11,9 @@ import ( "github.com/artie-labs/transfer/lib/kafkalib" ) -type Debezium string +type Debezium struct{} -func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { +func (Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { var event util.SchemaEventPayload if len(bytes) == 0 { return nil, fmt.Errorf("empty message") @@ -26,7 +26,7 @@ func (d *Debezium) GetEventFromBytes(bytes []byte) (cdc.Event, error) { return &event, nil } -func (d *Debezium) Labels() []string { +func (Debezium) Labels() []string { return []string{ constants.DBZPostgresFormat, constants.DBZPostgresAltFormat, @@ -35,6 +35,6 @@ func (d *Debezium) Labels() []string { } } -func (d *Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { +func (Debezium) GetPrimaryKey(key []byte, tc kafkalib.TopicConfig) (map[string]any, error) { return debezium.ParsePartitionKey(key, tc.CDCKeyFormat) }