From 27ec948504c002325f2c7f0a88b2115ea6541788 Mon Sep 17 00:00:00 2001 From: Robin Tang Date: Fri, 20 Sep 2024 16:32:33 -0700 Subject: [PATCH] Adding `PrimaryKeysOverride` to TopicConfig + clean up (#922) --- lib/config/config.go | 6 +- lib/config/config_test.go | 100 ++++++++++++++++------------------ lib/kafkalib/topic.go | 1 + processes/consumer/process.go | 3 +- 4 files changed, 52 insertions(+), 58 deletions(-) diff --git a/lib/config/config.go b/lib/config/config.go index 8816fd20d..b16b710d0 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -107,7 +107,7 @@ func (c Config) TopicConfigs() ([]*kafkalib.TopicConfig, error) { return c.Pubsub.TopicConfigs, nil } - return nil, fmt.Errorf("unsupported queue: %v", c.Queue) + return nil, fmt.Errorf("unsupported queue: %q", c.Queue) } type Mode string @@ -200,7 +200,7 @@ func readFileToConfig(pathToConfig string) (*Config, error) { func (c Config) ValidateRedshift() error { if c.Output != constants.Redshift { - return fmt.Errorf("output is not Redshift, output: %v", c.Output) + return fmt.Errorf("output is not Redshift, output: %q", c.Output) } if c.Redshift == nil { @@ -224,7 +224,7 @@ func (c Config) ValidateRedshift() error { // The actual output source (like Snowflake) and CDC parser will be loaded and checked by other funcs. func (c Config) Validate() error { if c.FlushSizeKb <= 0 { - return fmt.Errorf("flush size pool has to be a positive number, current value: %v", c.FlushSizeKb) + return fmt.Errorf("flush size pool has to be a positive number, current value: %d", c.FlushSizeKb) } if !numbers.BetweenEq(FlushIntervalSecondsMin, FlushIntervalSecondsMax, c.FlushIntervalSeconds) { diff --git a/lib/config/config_test.go b/lib/config/config_test.go index ce989a5bf..cd2716003 100644 --- a/lib/config/config_test.go +++ b/lib/config/config_test.go @@ -5,7 +5,6 @@ import ( "io" "os" "path/filepath" - "strings" "testing" "time" @@ -457,11 +456,7 @@ bigquery: } func TestConfig_Validate(t *testing.T) { - pubsub := Pubsub{ - ProjectID: "foo", - PathToCredentials: "bar", - } - + pubsub := Pubsub{ProjectID: "foo", PathToCredentials: "bar"} cfg := Config{ Pubsub: &pubsub, FlushIntervalSeconds: 5, @@ -508,48 +503,51 @@ func TestConfig_Validate(t *testing.T) { cfg.BufferRows = defaultBufferPoolSize + 1 assert.Nil(t, cfg.Validate()) } - - // Test the various flush error settings. - for i := 0; i < bufferPoolSizeMin; i++ { - // Reset buffer rows. - cfg.BufferRows = 500 - cfg.FlushIntervalSeconds = i - assert.ErrorContains(t, cfg.Validate(), "flush interval is outside of our range") - - // Reset Flush - cfg.FlushIntervalSeconds = 20 - cfg.BufferRows = uint(i) - assert.ErrorContains(t, cfg.Validate(), "buffer pool is too small") + { + // Invalid flush settings + for i := 0; i < bufferPoolSizeMin; i++ { + // Reset buffer rows. + cfg.BufferRows = 500 + cfg.FlushIntervalSeconds = i + assert.ErrorContains(t, cfg.Validate(), "flush interval is outside of our range") + + // Reset Flush + cfg.FlushIntervalSeconds = 20 + cfg.BufferRows = uint(i) + assert.ErrorContains(t, cfg.Validate(), "buffer pool is too small") + } } cfg.BufferRows = 500 cfg.FlushIntervalSeconds = 600 assert.Nil(t, cfg.Validate()) - // Now that we have a valid output, let's test with S3. - cfg.Output = constants.S3 - assert.ErrorContains(t, cfg.Validate(), "s3 settings are nil") - cfg.S3 = &S3Settings{ - Bucket: "foo", - AwsSecretAccessKey: "foo", - AwsAccessKeyID: "bar", - OutputFormat: constants.ParquetFormat, - } - - assert.Nil(t, cfg.Validate()) - - // Now let's change to history mode and see. - cfg.Mode = History - pubsub.TopicConfigs[0].DropDeletedColumns = true - assert.ErrorContains(t, cfg.Validate(), "dropDeletedColumns is not supported in history mode") - - pubsub.TopicConfigs[0].DropDeletedColumns = false - pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = false - assert.ErrorContains(t, cfg.Validate(), "includeDatabaseUpdatedAt is required in history mode") + { + // S3 + cfg.Output = constants.S3 + assert.ErrorContains(t, cfg.Validate(), "s3 settings are nil") + cfg.S3 = &S3Settings{ + Bucket: "foo", + AwsSecretAccessKey: "foo", + AwsAccessKeyID: "bar", + OutputFormat: constants.ParquetFormat, + } - pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = true - assert.NoError(t, cfg.Validate()) - // End history mode + assert.Nil(t, cfg.Validate()) + } + { + // Now let's change to history mode and see. + cfg.Mode = History + pubsub.TopicConfigs[0].DropDeletedColumns = true + assert.ErrorContains(t, cfg.Validate(), "dropDeletedColumns is not supported in history mode") + + pubsub.TopicConfigs[0].DropDeletedColumns = false + pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = false + assert.ErrorContains(t, cfg.Validate(), "includeDatabaseUpdatedAt is required in history mode") + + pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = true + assert.NoError(t, cfg.Validate()) + } for _, num := range []int{-500, -300, -5, 0} { cfg.FlushSizeKb = num @@ -558,18 +556,14 @@ func TestConfig_Validate(t *testing.T) { } func TestCfg_KafkaBootstrapServers(t *testing.T) { - kafka := Kafka{ - BootstrapServer: "localhost:9092", + { + // Single broker + kafka := Kafka{BootstrapServer: "localhost:9092"} + assert.Equal(t, []string{"localhost:9092"}, kafka.BootstrapServers()) } - - assert.Equal(t, []string{"localhost:9092"}, strings.Split(kafka.BootstrapServer, ",")) - - kafkaWithMultipleBrokers := Kafka{ - BootstrapServer: "a:9092,b:9093,c:9094", + { + // Multiple brokers + kafkaWithMultipleBrokers := Kafka{BootstrapServer: "a:9092,b:9093,c:9094"} + assert.Equal(t, []string{"a:9092", "b:9093", "c:9094"}, kafkaWithMultipleBrokers.BootstrapServers()) } - - var brokers []string - brokers = append(brokers, strings.Split(kafkaWithMultipleBrokers.BootstrapServer, ",")...) - - assert.Equal(t, []string{"a:9092", "b:9093", "c:9094"}, brokers) } diff --git a/lib/kafkalib/topic.go b/lib/kafkalib/topic.go index a2151fae8..503bf64bc 100644 --- a/lib/kafkalib/topic.go +++ b/lib/kafkalib/topic.go @@ -40,6 +40,7 @@ type TopicConfig struct { BigQueryPartitionSettings *partition.BigQuerySettings `yaml:"bigQueryPartitionSettings,omitempty"` AdditionalMergePredicates []partition.MergePredicates `yaml:"additionalMergePredicates,omitempty"` ColumnsToHash []string `yaml:"columnsToHash,omitempty"` + PrimaryKeysOverride []string `yaml:"primaryKeysOverride,omitempty"` // Internal metadata opsToSkipMap map[string]bool `yaml:"-"` diff --git a/processes/consumer/process.go b/processes/consumer/process.go index b2bf96f73..2df7cabbf 100644 --- a/processes/consumer/process.go +++ b/processes/consumer/process.go @@ -39,7 +39,7 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo topicConfig, isOk := p.TopicToConfigFormatMap.GetTopicFmt(p.Msg.Topic()) if !isOk { tags["what"] = "failed_topic_lookup" - return "", fmt.Errorf("failed to get topic name: %s", p.Msg.Topic()) + return "", fmt.Errorf("failed to get topic name: %q", p.Msg.Topic()) } tags["database"] = topicConfig.tc.Database @@ -65,7 +65,6 @@ func (p processArgs) process(ctx context.Context, cfg config.Config, inMemDB *mo } // Table name is only available after event has been cast tags["table"] = evt.Table - if topicConfig.tc.ShouldSkip(_event.Operation()) { // Check to see if we should skip first // This way, we can emit a specific tag to be more clear