Skip to content

Commit

Permalink
Update destination keyColumns logic, rename general config to config …
Browse files Browse the repository at this point in the history
…in errors (#27)
  • Loading branch information
voscob authored Nov 30, 2022
1 parent c100a0a commit c45dfa6
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 13 deletions.
6 changes: 3 additions & 3 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ const (
KeyColumns = "keyColumns"
)

// Configuration is the general configurations needed to connect to ClickHouse database.
// Configuration is the configuration needed to connect to ClickHouse database.
type Configuration struct {
// URL is the configuration of the connection string to connect to ClickHouse database.
URL string `validate:"required"`
Expand All @@ -39,7 +39,7 @@ type Configuration struct {
KeyColumns []string
}

// parses a general configuration.
// parses a configuration.
func parseConfiguration(cfg map[string]string) (Configuration, error) {
config := Configuration{
URL: strings.TrimSpace(cfg[URL]),
Expand All @@ -48,7 +48,7 @@ func parseConfiguration(cfg map[string]string) (Configuration, error) {

err := validate(config)
if err != nil {
return Configuration{}, fmt.Errorf("validate general config: %w", err)
return Configuration{}, fmt.Errorf("validate config: %w", err)
}

if cfg[KeyColumns] == "" {
Expand Down
8 changes: 4 additions & 4 deletions config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
testTable = "test_table"
)

func TestParseGeneral(t *testing.T) {
func TestParseConfiguration(t *testing.T) {
t.Parallel()

tests := []struct {
Expand Down Expand Up @@ -170,19 +170,19 @@ func TestParseGeneral(t *testing.T) {
in: map[string]string{
Table: testTable,
},
err: fmt.Errorf("validate general config: %w", fmt.Errorf("%q must be set", URL)),
err: fmt.Errorf("validate config: %w", fmt.Errorf("%q must be set", URL)),
},
{
name: "failure_required_table",
in: map[string]string{
URL: testURL,
},
err: fmt.Errorf("validate general config: %w", fmt.Errorf("%q must be set", Table)),
err: fmt.Errorf("validate config: %w", fmt.Errorf("%q must be set", Table)),
},
{
name: "failure_required_url_and_table",
in: map[string]string{},
err: fmt.Errorf("validate general config: %w",
err: fmt.Errorf("validate config: %w",
multierr.Combine(fmt.Errorf("%q must be set", URL), fmt.Errorf("%q must be set", Table))),
},
}
Expand Down
2 changes: 1 addition & 1 deletion config/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Destination struct {
func ParseDestination(cfg map[string]string) (Destination, error) {
config, err := parseConfiguration(cfg)
if err != nil {
return Destination{}, fmt.Errorf("parse general config: %w", err)
return Destination{}, fmt.Errorf("parse config: %w", err)
}

destinationConfig := Destination{
Expand Down
2 changes: 1 addition & 1 deletion config/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Source struct {
func ParseSource(cfg map[string]string) (Source, error) {
config, err := parseConfiguration(cfg)
if err != nil {
return Source{}, fmt.Errorf("parse general config: %w", err)
return Source{}, fmt.Errorf("parse source config: %w", err)
}

sourceConfig := Source{
Expand Down
2 changes: 1 addition & 1 deletion config/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func validate(s interface{}) error {
validationErr := Get().Struct(s)
if validationErr != nil {
if _, ok := validationErr.(*v.InvalidValidationError); ok {
return fmt.Errorf("validate general config struct: %w", validationErr)
return fmt.Errorf("validate struct: %w", validationErr)
}

for _, e := range validationErr.(v.ValidationErrors) {
Expand Down
2 changes: 1 addition & 1 deletion destination/destination_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDestination_Configure_Fail(t *testing.T) {
config.URL: testURL,
})
is.Equal(err.Error(),
`parse destination config: parse general config: validate general config: "table" must be set`)
`parse destination config: parse config: validate config: "table" must be set`)
}

func TestDestination_Write_Success(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions destination/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,12 @@ func (w *Writer) Update(ctx context.Context, record sdk.Record) error {
key = make(sdk.StructuredData)

for i := range w.keyColumns {
if val, ok := payload[w.keyColumns[i]]; ok {
key[w.keyColumns[i]] = val
val, ok := payload[w.keyColumns[i]]
if !ok {
return fmt.Errorf("key column %q not found", w.keyColumns[i])
}

key[w.keyColumns[i]] = val
}
}

Expand Down

0 comments on commit c45dfa6

Please sign in to comment.