Skip to content

Commit

Permalink
Destination (#5)
Browse files Browse the repository at this point in the history
* Add a general config with a validator

* Add columntypes

* Add the destination

* Move the error from the  to the  file

* Add the unit test for the columntypes

* Add unit tests for destination and writer

* Update the comment bellow the  error

* Remove a repository package

* Use the  method instead of a switch in the  method

* Update the Writer interface

* Remove the  config field

* Update the destination unit test

* Add Yalantis to the copyright header

* Add a destination integration test

* Update the  with the destination info

* remove comment

* refactor readme

* Update the destination unit test

* Add more types to the destination integration test

* Add the explanation why the connector does not contain an sql-builder

* Update

* Add the keyColumn config

* Move the keyColumn configuration field to the destination config

* Add logs into the destination

* Remove unnecessary constants from the writer.go

* Add a few failed unit tests for the Write method

* Update the Destination Configure unit test

* Remove Parallel signals from the integration test

* Extend the table with configuration options with sdk.rate parameters

* Update the

* Move  methods into the  package

* Fix the db in the Destination

* Wrap config errors

* Move variables to consts in unit-tests

* Rename parse configuration function

* Update comments

* Rename test constants

* Rename test constants

* Update Destination Parameters

* Update the names of columns in the destination integration test

* Add engines destination test (#14)

* Add engines destination test

* Update comment line

* Add the Source (#8)

* Add a source config

* Implement the Source methods

* Update comments of the config fields

* Update the Source

* Wrap config errors

* Remove unnecessary iterator parameter

* Add tests to the Source. (#9)

* Fix the db in the Destination

* Add tests for the Source

* Add the acceptance test (#10)

* Add the acceptance test

* Wrap config errors

* Remove the unnecessary link to the source from the method's description of the acceptance test

* Update the comment of the randString Source method

* Add the information about the Source (#11)

* Add the information about the Source

* Add the example of the position

* Add some updates

* Update the info about the iterator

* Add the information about the CDC

* Rename a counter variable in the acceptance test

* Extend the test case to test the parsing of column fields

* Update logic of separated by comma fields, refactoring of tests (#12)

* Update logic of separated by comma fields, refactoring of tests

* Remove unnecessary strings.Trim calls

* Update readme.md

* Add info about Log engines to the

* Add the check engines test case

* Rename test constants

* Update readme.md

* Update column names in the integration test

* Update Source Parameters

* Update the names of columns in the acceptance test

* Update readme.md

* Update the check engines integration test

* Update readme.md

Co-authored-by: Yurii Voskoboinikov <voscob@gmail.com>
  • Loading branch information
maksenius and voscob authored Nov 9, 2022
1 parent 09cacba commit f273094
Show file tree
Hide file tree
Showing 29 changed files with 4,137 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .golangci.goheader.template
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Copyright © {{ YEAR }} Meroxa, Inc.
Copyright © {{ YEAR }} Meroxa, Inc. & Yalantis

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@ lint:
dep:
go mod download
go mod tidy

mockgen:
mockgen -package mock -source destination/destination.go -destination destination/mock/destination.go
mockgen -package mock -source source/source.go -destination source/mock/source.go
95 changes: 93 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,96 @@

## General

ClickHouse connector is one of [Conduit](https://github.com/ConduitIO/conduit) plugins. It provides both, a source and
a destination ClickHouse connector.
ClickHouse connector is one of [Conduit](https://github.com/ConduitIO/conduit) plugins. It provides both, a Source and a
Destination ClickHouse connectors.

Connector uses [Golang SQL database driver](https://github.com/ClickHouse/clickhouse-go) for Yandex ClickHouse.

## Prerequisites

- [Go](https://go.dev/) 1.18
- (optional) [golangci-lint](https://github.com/golangci/golangci-lint) 1.49.0

## How to build it

Run `make build`.

## Testing

Run `make test` to run all unit and integration tests. To run the integration test, set the ClickHouse database URL to
the environment variables as an `CLICKHOUSE_URL`.

## Source

The ClickHouse Source Connector allows you to move data from the ClickHouse table to Conduit Destination connectors.

It supports all engines of the [MergeTree](https://clickhouse.com/docs/en/engines/table-engines/#mergetree)
and [Log](https://clickhouse.com/docs/en/engines/table-engines/#log) families.

The iterator selects existing rows from the selected table in batches with ordering and where claus:

```
SELECT {{config.columns}}
FROM {{config.table}}
WHERE {{config.orderingColumn}} > {{position}}
ORDER BY {{config.orderingColumn}};
```

### CDC

When all existing data has been read, the connector will only detect new rows.

### Position

The position contains the `orderingColumn` value of the last processed row. This value is used in the where clause of
the SELECT query.

### Table Name

The metadata of each record is appended by the `clickhouse.table` key with the value of the table name from the
configuration.

### Configuration Options

| name | description | required | example |
|------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------------------------------|
| `url` | [DSN](https://github.com/ClickHouse/clickhouse-go#dsn) to connect to the database. | **true** | `http://username:password@host1:8123/database` |
| `table` | Name of the table that the connector should read. | **true** | `table_name` |
| `keyColumns` | Comma-separated list of column names to build the `sdk.Record.Key`. Column names are the keys of the `sdk.Record.Key` map, and the values are taken from the rows. | **true** | `id,name` |
| `orderingColumn` | Column name that the connector will use for ordering rows. Column must contain unique values and suitable for sorting, otherwise the snapshot won't work correctly. | **true** | `id` |
| `columns` | Comma-separated list of column names that should be included in each payload of the `sdk.Record`. By default includes all columns. | false | `id,name,age` |
| `batchSize` | Size of rows batch. Min is 1 and max is 100000. The default is 1000. | false | `100` |

## Destination

The ClickHouse Destination allows you to move data from any Conduit Source to a ClickHouse table. It takes
a `sdk.Record` and parses it into a valid SQL
query. [Log family engines](https://clickhouse.com/docs/en/engines/table-engines/#log) do not support data changes, so
in case of `OperationUpdate` or `OperationDelete` operations they will return the next
error: `Table engine {{table_engine}} doesn't support mutations.`

### Table Name

If a record contains a `clickhouse.table` property in its metadata, it will work with this table, otherwise, it will
fall back to use the table configured in the connector. Thus, a Destination can support multiple tables in a single
connector, as long as the user has proper access to those tables.

### Configuration Options

| name | description | required | example |
|----------------------|------------------------------------------------------------------------------------|----------|------------------------------------------------|
| `url` | [DSN](https://github.com/ClickHouse/clickhouse-go#dsn) to connect to the database. | **true** | `http://username:password@host1:8123/database` |
| `table` | Name of the table that the connector should write to. | **true** | `table_name` |
| `keyColumns` | Comma-separated list of column names for [key handling](#key-handling). | false | `id,name` |
| `sdk.rate.perSecond` | Maximum times the Write function can be called per second (0 means no rate limit). | false | `200` |
| `sdk.rate.burst` | Allow bursts of at most X writes (0 means that bursts are not allowed). | false | `10` |

#### Key Handling

If the `sdk.Record.Key` is empty, it is formed from `sdk.Record.Payload` data by the comma-separated `keyColumns` list
of keys (for update operations only).

## Known limitations

Creating a Source or Destination connector will fail if the table does not exist or if the user does not have permission
to work with the specified table.
142 changes: 142 additions & 0 deletions acceptance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package clickhouse

import (
"crypto/rand"
"encoding/hex"
"fmt"
"os"
"sync/atomic"
"testing"

"github.com/conduitio-labs/conduit-connector-clickhouse/config"
sdk "github.com/conduitio/conduit-connector-sdk"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/matryer/is"
)

type driver struct {
sdk.ConfigurableAcceptanceTestDriver

id int32
}

// GenerateRecord generates a random sdk.Record.
func (d *driver) GenerateRecord(t *testing.T, operation sdk.Operation) sdk.Record {
atomic.AddInt32(&d.id, 1)

return sdk.Record{
Position: nil,
Operation: operation,
Metadata: map[string]string{
"clickhouse.table": d.Config.SourceConfig[config.Table],
},
Key: sdk.StructuredData{
"Int32Type": d.id,
},
Payload: sdk.Change{After: sdk.RawData(
fmt.Sprintf(`{"Int32Type":%d,"StringType":"%s"}`, d.id, uuid.NewString()),
)},
}
}

func TestAcceptance(t *testing.T) {
cfg := prepareConfig(t)

is := is.New(t)

sdk.AcceptanceTest(t, &driver{
ConfigurableAcceptanceTestDriver: sdk.ConfigurableAcceptanceTestDriver{
Config: sdk.ConfigurableAcceptanceTestDriverConfig{
Connector: Connector,
SourceConfig: cfg,
DestinationConfig: cfg,
BeforeTest: func(t *testing.T) {
err := createTable(cfg[config.URL], cfg[config.Table])
is.NoErr(err)
},
AfterTest: func(t *testing.T) {
err := dropTables(cfg[config.URL], cfg[config.Table])
is.NoErr(err)
},
},
},
})
}

// prepareConfig receives the connection URL from the environment variable
// and prepares configuration map.
func prepareConfig(t *testing.T) map[string]string {
url := os.Getenv("CLICKHOUSE_URL")
if url == "" {
t.Skip("CLICKHOUSE_URL env var must be set")

return nil
}

return map[string]string{
config.URL: url,
config.Table: fmt.Sprintf("CONDUIT_TEST_%s", randString(6)),
config.KeyColumns: "Int32Type",
config.OrderingColumn: "Int32Type",
}
}

// createTable creates test table.
func createTable(url, table string) error {
db, err := sqlx.Open("clickhouse", url)
if err != nil {
return fmt.Errorf("open connection: %w", err)
}
defer db.Close()

_, err = db.Exec(fmt.Sprintf(`
CREATE TABLE %s
(
Int32Type Int32,
StringType String
) ENGINE ReplacingMergeTree() PRIMARY KEY Int32Type;`, table))
if err != nil {
return fmt.Errorf("execute create table query: %w", err)
}

return nil
}

// dropTables drops test table and tracking test table if exists.
func dropTables(url, table string) error {
db, err := sqlx.Open("clickhouse", url)
if err != nil {
return fmt.Errorf("open connection: %w", err)
}
defer db.Close()

_, err = db.Exec(fmt.Sprintf("DROP TABLE %s", table))
if err != nil {
return fmt.Errorf("execute drop table query: %w", err)
}

return nil
}

// generates a random string of length n.
func randString(n int) string {
b := make([]byte, n)
rand.Read(b) //nolint:errcheck // does not actually fail

return hex.EncodeToString(b)
}
2 changes: 1 addition & 1 deletion cmd/connector/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Meroxa, Inc.
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
65 changes: 65 additions & 0 deletions config/configuration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright © 2022 Meroxa, Inc. & Yalantis
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"fmt"
"strings"
)

const (
// URL is the configuration name of the url.
URL = "url"
// Table is the configuration name of the table.
Table = "table"
// KeyColumns is the configuration name of key column names (for the Destination),
// or of the names of the columns to build the record.Key (for the Source), separated by commas.
KeyColumns = "keyColumns"
)

// Configuration is the general configurations needed to connect to ClickHouse database.
type Configuration struct {
// URL is the configuration of the connection string to connect to ClickHouse database.
URL string `json:"url" validate:"required"`
// Table is the configuration of the table name.
Table string `json:"table" validate:"required"`
}

// parses a general configuration.
func parseConfiguration(cfg map[string]string) (Configuration, error) {
config := Configuration{
URL: strings.TrimSpace(cfg[URL]),
Table: strings.TrimSpace(cfg[Table]),
}

err := validate(config)
if err != nil {
return Configuration{}, fmt.Errorf("validate general config: %w", err)
}

return config, nil
}

// returns a configuration key name by struct field.
func getKeyName(fieldName string) string {
return map[string]string{
"URL": URL,
"Table": Table,
"KeyColumns": KeyColumns,
"OrderingColumn": OrderingColumn,
"Columns": Columns,
"BatchSize": BatchSize,
}[fieldName]
}
Loading

0 comments on commit f273094

Please sign in to comment.