Skip to content

Commit

Permalink
Remove configuration parameter (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
voscob authored Dec 20, 2022
1 parent 53c93b8 commit 4e357e6
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 108 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ configuration.
| `table` | Name of the table that the connector should read. | **true** | `table_name` |
| `orderingColumn` | Column name that the connector will use for ordering rows. Column must contain unique values and suitable for sorting, otherwise the snapshot won't work correctly. | **true** | `id` |
| `keyColumns` | Comma-separated list of column names to build the `sdk.Record.Key`. See more: [key handling](#key-handling). | false | `id,name` |
| `columns` | Comma-separated list of column names that should be included in each payload of the `sdk.Record`. By default includes all columns. | false | `id,name,age` |
| `batchSize` | Size of rows batch. Min is 1 and max is 100000. The default is 1000. | false | `100` |

#### Key handling
Expand Down
1 change: 0 additions & 1 deletion config/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func getKeyName(fieldName string) string {
"Table": Table,
"KeyColumns": KeyColumns,
"OrderingColumn": OrderingColumn,
"Columns": Columns,
"BatchSize": BatchSize,
}[fieldName]
}
35 changes: 0 additions & 35 deletions config/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@ package config
import (
"fmt"
"strconv"
"strings"
)

const (
// OrderingColumn is a config name for an ordering column.
OrderingColumn = "orderingColumn"
// Snapshot is the configuration name for the Snapshot field.
Snapshot = "snapshot"
// Columns is the config name for a list of columns, separated by commas.
Columns = "columns"
// BatchSize is the config name for a batch size.
BatchSize = "batchSize"

Expand All @@ -45,8 +42,6 @@ type Source struct {
// Snapshot is the configuration that determines whether the connector
// will take a snapshot of the entire table before starting cdc mode.
Snapshot bool
// Columns list of column names that should be included in each Record's payload.
Columns []string
// BatchSize is a size of rows batch.
BatchSize int `validate:"gte=1,lte=100000"`
}
Expand Down Expand Up @@ -74,17 +69,6 @@ func ParseSource(cfg map[string]string) (Source, error) {
sourceConfig.Snapshot = snapshot
}

if cfg[Columns] != "" {
columnsSl := strings.Split(strings.ReplaceAll(cfg[Columns], " ", ""), ",")
for i := range columnsSl {
if columnsSl[i] == "" {
return Source{}, fmt.Errorf("invalid %q", Columns)
}

sourceConfig.Columns = append(sourceConfig.Columns, columnsSl[i])
}
}

if cfg[BatchSize] != "" {
sourceConfig.BatchSize, err = strconv.Atoi(cfg[BatchSize])
if err != nil {
Expand All @@ -97,24 +81,5 @@ func ParseSource(cfg map[string]string) (Source, error) {
return Source{}, err
}

if len(sourceConfig.Columns) == 0 {
return sourceConfig, nil
}

columnsMap := make(map[string]struct{}, len(sourceConfig.Columns))
for i := 0; i < len(sourceConfig.Columns); i++ {
columnsMap[sourceConfig.Columns[i]] = struct{}{}
}

if _, ok := columnsMap[sourceConfig.OrderingColumn]; !ok {
return Source{}, fmt.Errorf("columns must include %q", OrderingColumn)
}

for i := range sourceConfig.KeyColumns {
if _, ok := columnsMap[sourceConfig.KeyColumns[i]]; !ok {
return Source{}, fmt.Errorf("columns must include all %q", KeyColumns)
}
}

return sourceConfig, nil
}
60 changes: 2 additions & 58 deletions config/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id",
},
want: Source{
Configuration: Configuration{
Expand All @@ -134,7 +133,6 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id"},
},
},
{
Expand All @@ -143,7 +141,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id,name",
},
want: Source{
Configuration: Configuration{
Expand All @@ -153,7 +150,6 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id", "name"},
},
},
{
Expand All @@ -162,7 +158,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id, name",
},
want: Source{
Configuration: Configuration{
Expand All @@ -172,7 +167,6 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id", "name"},
},
},
{
Expand All @@ -181,7 +175,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id,name ",
},
want: Source{
Configuration: Configuration{
Expand All @@ -191,7 +184,6 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id", "name"},
},
},
{
Expand All @@ -200,7 +192,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: " id,name",
},
want: Source{
Configuration: Configuration{
Expand All @@ -210,7 +201,6 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id", "name"},
},
},
{
Expand All @@ -219,7 +209,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id ,name",
},
want: Source{
Configuration: Configuration{
Expand All @@ -229,7 +218,6 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id", "name"},
},
},
{
Expand All @@ -238,7 +226,6 @@ func TestParseSource(t *testing.T) {
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id, name",
},
want: Source{
Configuration: Configuration{
Expand All @@ -248,15 +235,13 @@ func TestParseSource(t *testing.T) {
OrderingColumn: "id",
Snapshot: defaultSnapshot,
BatchSize: defaultBatchSize,
Columns: []string{"id", "name"},
},
},
{
name: "failure_required_orderingColumn",
in: map[string]string{
URL: testURL,
Table: testTable,
Columns: "id,name,age",
URL: testURL,
Table: testTable,
},
err: fmt.Errorf("%q must be set", OrderingColumn),
},
Expand All @@ -280,27 +265,6 @@ func TestParseSource(t *testing.T) {
},
err: fmt.Errorf(`invalid %q: strconv.Atoi: parsing "a": invalid syntax`, BatchSize),
},
{
name: "failure_missed_orderingColumn_in_columns",
in: map[string]string{
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "name,age",
},
err: fmt.Errorf("columns must include %q", OrderingColumn),
},
{
name: "failure_missed_keyColumn_in_columns",
in: map[string]string{
URL: testURL,
Table: testTable,
OrderingColumn: "id",
KeyColumns: "name",
Columns: "id,age",
},
err: fmt.Errorf("columns must include all %q", KeyColumns),
},
{
name: "failure_batchSize_is_too_big",
in: map[string]string{
Expand Down Expand Up @@ -331,26 +295,6 @@ func TestParseSource(t *testing.T) {
},
err: fmt.Errorf("%w", fmt.Errorf("%q is out of range", BatchSize)),
},
{
name: "failure_columns_ends_with_comma",
in: map[string]string{
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: "id,name,",
},
err: fmt.Errorf("invalid %q", Columns),
},
{
name: "failure_columns_starts_with_comma",
in: map[string]string{
URL: testURL,
Table: testTable,
OrderingColumn: "id",
Columns: ",id,name",
},
err: fmt.Errorf("invalid %q", Columns),
},
}

for _, tt := range tests {
Expand Down
7 changes: 0 additions & 7 deletions source/iterator/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ type Iterator struct {
keyColumns []string
// name of the column that iterator will use for sorting data
orderingColumn string
// list of table's columns for record payload.
// if empty - will get all columns
columns []string
// size of batch
batchSize int
}
Expand Down Expand Up @@ -209,10 +206,6 @@ func (iter *Iterator) loadRows(ctx context.Context) error {
OrderBy(iter.orderingColumn).
Limit(iter.batchSize)

if len(iter.columns) > 0 {
sb.Select(iter.columns...)
}

if iter.position.LastProcessedValue != nil {
sb.Where(sb.GreaterThan(iter.orderingColumn, iter.position.LastProcessedValue))
}
Expand Down
6 changes: 0 additions & 6 deletions source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ func (s *Source) Parameters() map[string]sdk.Parameter {
Required: false,
Description: "Whether the connector will take a snapshot of the entire table before starting cdc mode.",
},
config.Columns: {
Default: "",
Required: false,
Description: "Comma-separated list of column names that should be included in each payload of the " +
"sdk.Record. By default includes all columns.",
},
config.BatchSize: {
Default: "1000",
Required: false,
Expand Down

0 comments on commit 4e357e6

Please sign in to comment.