Skip to content

Commit

Permalink
Clean up tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tang8330 committed Sep 20, 2024
1 parent 74bb569 commit ae06903
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 56 deletions.
6 changes: 3 additions & 3 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (c Config) TopicConfigs() ([]*kafkalib.TopicConfig, error) {
return c.Pubsub.TopicConfigs, nil
}

return nil, fmt.Errorf("unsupported queue: %v", c.Queue)
return nil, fmt.Errorf("unsupported queue: %q", c.Queue)
}

type Mode string
Expand Down Expand Up @@ -200,7 +200,7 @@ func readFileToConfig(pathToConfig string) (*Config, error) {

func (c Config) ValidateRedshift() error {
if c.Output != constants.Redshift {
return fmt.Errorf("output is not Redshift, output: %v", c.Output)
return fmt.Errorf("output is not Redshift, output: %q", c.Output)
}

if c.Redshift == nil {
Expand All @@ -224,7 +224,7 @@ func (c Config) ValidateRedshift() error {
// The actual output source (like Snowflake) and CDC parser will be loaded and checked by other funcs.
func (c Config) Validate() error {
if c.FlushSizeKb <= 0 {
return fmt.Errorf("flush size pool has to be a positive number, current value: %v", c.FlushSizeKb)
return fmt.Errorf("flush size pool has to be a positive number, current value: %d", c.FlushSizeKb)
}

if !numbers.BetweenEq(FlushIntervalSecondsMin, FlushIntervalSecondsMax, c.FlushIntervalSeconds) {
Expand Down
100 changes: 47 additions & 53 deletions lib/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"io"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -457,11 +456,7 @@ bigquery:
}

func TestConfig_Validate(t *testing.T) {
pubsub := Pubsub{
ProjectID: "foo",
PathToCredentials: "bar",
}

pubsub := Pubsub{ProjectID: "foo", PathToCredentials: "bar"}
cfg := Config{
Pubsub: &pubsub,
FlushIntervalSeconds: 5,
Expand Down Expand Up @@ -508,48 +503,51 @@ func TestConfig_Validate(t *testing.T) {
cfg.BufferRows = defaultBufferPoolSize + 1
assert.Nil(t, cfg.Validate())
}

// Test the various flush error settings.
for i := 0; i < bufferPoolSizeMin; i++ {
// Reset buffer rows.
cfg.BufferRows = 500
cfg.FlushIntervalSeconds = i
assert.ErrorContains(t, cfg.Validate(), "flush interval is outside of our range")

// Reset Flush
cfg.FlushIntervalSeconds = 20
cfg.BufferRows = uint(i)
assert.ErrorContains(t, cfg.Validate(), "buffer pool is too small")
{
// Invalid flush settings
for i := 0; i < bufferPoolSizeMin; i++ {
// Reset buffer rows.
cfg.BufferRows = 500
cfg.FlushIntervalSeconds = i
assert.ErrorContains(t, cfg.Validate(), "flush interval is outside of our range")

// Reset Flush
cfg.FlushIntervalSeconds = 20
cfg.BufferRows = uint(i)
assert.ErrorContains(t, cfg.Validate(), "buffer pool is too small")
}
}

cfg.BufferRows = 500
cfg.FlushIntervalSeconds = 600
assert.Nil(t, cfg.Validate())

// Now that we have a valid output, let's test with S3.
cfg.Output = constants.S3
assert.ErrorContains(t, cfg.Validate(), "s3 settings are nil")
cfg.S3 = &S3Settings{
Bucket: "foo",
AwsSecretAccessKey: "foo",
AwsAccessKeyID: "bar",
OutputFormat: constants.ParquetFormat,
}

assert.Nil(t, cfg.Validate())

// Now let's change to history mode and see.
cfg.Mode = History
pubsub.TopicConfigs[0].DropDeletedColumns = true
assert.ErrorContains(t, cfg.Validate(), "dropDeletedColumns is not supported in history mode")

pubsub.TopicConfigs[0].DropDeletedColumns = false
pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = false
assert.ErrorContains(t, cfg.Validate(), "includeDatabaseUpdatedAt is required in history mode")
{
// S3
cfg.Output = constants.S3
assert.ErrorContains(t, cfg.Validate(), "s3 settings are nil")
cfg.S3 = &S3Settings{
Bucket: "foo",
AwsSecretAccessKey: "foo",
AwsAccessKeyID: "bar",
OutputFormat: constants.ParquetFormat,
}

pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = true
assert.NoError(t, cfg.Validate())
// End history mode
assert.Nil(t, cfg.Validate())
}
{
// Now let's change to history mode and see.
cfg.Mode = History
pubsub.TopicConfigs[0].DropDeletedColumns = true
assert.ErrorContains(t, cfg.Validate(), "dropDeletedColumns is not supported in history mode")

pubsub.TopicConfigs[0].DropDeletedColumns = false
pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = false
assert.ErrorContains(t, cfg.Validate(), "includeDatabaseUpdatedAt is required in history mode")

pubsub.TopicConfigs[0].IncludeDatabaseUpdatedAt = true
assert.NoError(t, cfg.Validate())
}

for _, num := range []int{-500, -300, -5, 0} {
cfg.FlushSizeKb = num
Expand All @@ -558,18 +556,14 @@ func TestConfig_Validate(t *testing.T) {
}

func TestCfg_KafkaBootstrapServers(t *testing.T) {
kafka := Kafka{
BootstrapServer: "localhost:9092",
{
// Single broker
kafka := Kafka{BootstrapServer: "localhost:9092"}
assert.Equal(t, []string{"localhost:9092"}, kafka.BootstrapServers())
}

assert.Equal(t, []string{"localhost:9092"}, strings.Split(kafka.BootstrapServer, ","))

kafkaWithMultipleBrokers := Kafka{
BootstrapServer: "a:9092,b:9093,c:9094",
{
// Multiple brokers
kafkaWithMultipleBrokers := Kafka{BootstrapServer: "a:9092,b:9093,c:9094"}
assert.Equal(t, []string{"a:9092", "b:9093", "c:9094"}, kafkaWithMultipleBrokers.BootstrapServers())
}

var brokers []string
brokers = append(brokers, strings.Split(kafkaWithMultipleBrokers.BootstrapServer, ",")...)

assert.Equal(t, []string{"a:9092", "b:9093", "c:9094"}, brokers)
}
1 change: 1 addition & 0 deletions lib/kafkalib/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type TopicConfig struct {
BigQueryPartitionSettings *partition.BigQuerySettings `yaml:"bigQueryPartitionSettings,omitempty"`
AdditionalMergePredicates []partition.MergePredicates `yaml:"additionalMergePredicates,omitempty"`
ColumnsToHash []string `yaml:"columnsToHash,omitempty"`
PrimaryKeysOverride []string `yaml:"primaryKeysOverride,omitempty"`

// Internal metadata
opsToSkipMap map[string]bool `yaml:"-"`
Expand Down

0 comments on commit ae06903

Please sign in to comment.