Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the logic of a record.Key formatting #20

Merged
merged 3 commits into from
Nov 16, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,19 @@ configuration.
|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------------------------------|
| `url` | [DSN](https://github.com/ClickHouse/clickhouse-go#dsn) to connect to the database. | **true** | `http://username:password@host1:8123/database` |
| `table` | Name of the table that the connector should read. | **true** | `table_name` |
| `keyColumns` | Comma-separated list of column names to build the `sdk.Record.Key`. Column names are the keys of the `sdk.Record.Key` map, and the values are taken from the rows. | **true** | `id,name` |
| `orderingColumn` | Column name that the connector will use for ordering rows. Column must contain unique values and suitable for sorting, otherwise the snapshot won't work correctly. | **true** | `id` |
| `keyColumns` | Comma-separated list of column names to build the `sdk.Record.Key`. See more: [key handling](#key-handling). | false | `id,name` |
| `columns` | Comma-separated list of column names that should be included in each payload of the `sdk.Record`. By default includes all columns. | false | `id,name,age` |
| `batchSize` | Size of rows batch. Min is 1 and max is 100000. The default is 1000. | false | `100` |

#### Key handling

List items are the keys of the `sdk.Record.Key` map, and the values are taken from the row's data.
hariso marked this conversation as resolved.
Show resolved Hide resolved

The `keyColumns` is an optional field. If the field is empty, the system makes a request to the database and uses the
received list of primary keys of the specified table. If the table does not contain primary keys, the system uses the
value of the `orderingColumn` field as the `keyColumns` value.
hariso marked this conversation as resolved.
Show resolved Hide resolved

## Destination

The ClickHouse Destination allows you to move data from any Conduit Source to a ClickHouse table. It takes
Expand Down
19 changes: 17 additions & 2 deletions config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ const (
// Configuration is the general configurations needed to connect to ClickHouse database.
type Configuration struct {
// URL is the configuration of the connection string to connect to ClickHouse database.
URL string `json:"url" validate:"required"`
URL string `validate:"required"`
// Table is the configuration of the table name.
Table string `json:"table" validate:"required"`
Table string `validate:"required"`
// KeyColumns is the configuration of key column names, separated by commas.
KeyColumns []string
}

// parses a general configuration.
Expand All @@ -49,6 +51,19 @@ func parseConfiguration(cfg map[string]string) (Configuration, error) {
return Configuration{}, fmt.Errorf("validate general config: %w", err)
}

if cfg[KeyColumns] == "" {
return config, nil
}

keyColumns := strings.Split(strings.ReplaceAll(cfg[KeyColumns], " ", ""), ",")
for i := range keyColumns {
if keyColumns[i] == "" {
return Configuration{}, fmt.Errorf("invalid %q", KeyColumns)
}

config.KeyColumns = append(config.KeyColumns, keyColumns[i])
}

return config, nil
}

Expand Down
118 changes: 118 additions & 0 deletions config/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,124 @@ func TestParseGeneral(t *testing.T) {
Table: testTable,
},
},
{
name: "success_keyColumns_has_one_key",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id"},
},
},
{
name: "success_keyColumns_has_two_keys",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id,name",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id", "name"},
},
},
{
name: "success_keyColumns_space_between_keys",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id, name",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id", "name"},
},
},
{
name: "success_keyColumns_ends_with_space",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id,name ",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id", "name"},
},
},
{
name: "success_keyColumns_starts_with_space",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: " id,name",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id", "name"},
},
},
{
name: "success_keyColumns_space_between_keys_before_comma",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id ,name",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id", "name"},
},
},
{
name: "success_keyColumns_two_spaces",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id, name",
},
want: Configuration{
URL: testURL,
Table: testTable,
KeyColumns: []string{"id", "name"},
},
},
{
name: "failure_keyColumns_ends_with_comma",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: "id,name,",
},
err: fmt.Errorf("invalid %q", KeyColumns),
},
{
name: "failure_keyColumns_starts_with_comma",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: ",id,name",
},
err: fmt.Errorf("invalid %q", KeyColumns),
},
{
name: "failure_invalid_keyColumns",
in: map[string]string{
URL: testURL,
Table: testTable,
KeyColumns: ",",
},
err: fmt.Errorf("invalid %q", KeyColumns),
},
{
name: "failure_required_url",
in: map[string]string{
Expand Down
17 changes: 0 additions & 17 deletions config/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@ package config

import (
"fmt"
"strings"
)

// Destination is a destination configuration needed to connect to ClickHouse database.
type Destination struct {
Configuration

// KeyColumns is the configuration of key column names, separated by commas.
KeyColumns []string `json:"keyColumns"`
}

// ParseDestination parses a destination configuration.
Expand All @@ -38,18 +34,5 @@ func ParseDestination(cfg map[string]string) (Destination, error) {
Configuration: config,
}

if cfg[KeyColumns] == "" {
return destinationConfig, nil
}

keyColumns := strings.Split(strings.ReplaceAll(cfg[KeyColumns], " ", ""), ",")
for i := range keyColumns {
if keyColumns[i] == "" {
return Destination{}, fmt.Errorf("invalid %q", KeyColumns)
}

destinationConfig.KeyColumns = append(destinationConfig.KeyColumns, keyColumns[i])
}

return destinationConfig, nil
}
Loading