diff --git a/Makefile b/Makefile index fdaa3e4..470f68d 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,8 @@ VERSION=$(shell git describe --tags --dirty --always) .PHONY: build build: - go build -ldflags "-X 'github.com/conduitio/conduit-connector-kafka.version=${VERSION}'" -o conduit-connector-kafka cmd/connector/main.go + sed -i '/specification:/,/version:/ s/version: .*/version: '"${VERSION}"'/' connector.yaml + go build -o conduit-connector-kafka cmd/connector/main.go .PHONY: test-kafka test-kafka: diff --git a/common/config.go b/common/config.go index 424ae6b..cc67863 100644 --- a/common/config.go +++ b/common/config.go @@ -43,13 +43,13 @@ type Config struct { } // Validate executes manual validations beyond what is defined in struct tags. -func (c Config) Validate() error { +func (c Config) Validate(ctx context.Context) error { var multierr []error - if err := c.ConfigSASL.Validate(); err != nil { + if err := c.ConfigSASL.Validate(ctx); err != nil { multierr = append(multierr, err) } - if err := c.ConfigTLS.Validate(); err != nil { + if err := c.ConfigTLS.Validate(ctx); err != nil { multierr = append(multierr, err) } diff --git a/common/config_test.go b/common/config_test.go index e0501a3..5c6cce2 100644 --- a/common/config_test.go +++ b/common/config_test.go @@ -76,7 +76,7 @@ func TestConfig_Validate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) - err := tc.cfg.Validate() + err := tc.cfg.Validate(context.Background()) is.True(err != nil) if actualErr, ok := tc.wantErr.(error); ok { is.True(errors.Is(err, actualErr)) diff --git a/common/sasl.go b/common/sasl.go index 96ea795..01f4954 100644 --- a/common/sasl.go +++ b/common/sasl.go @@ -15,6 +15,7 @@ package common import ( + "context" "errors" "fmt" @@ -36,7 +37,7 @@ type ConfigSASL struct { } // Validate executes manual validations beyond what is defined in struct tags. -func (c ConfigSASL) Validate() error { +func (c ConfigSASL) Validate(context.Context) error { var multierr []error if _, err := c.sasl(); err != nil { diff --git a/common/tls.go b/common/tls.go index c0f68ff..28297b3 100644 --- a/common/tls.go +++ b/common/tls.go @@ -15,6 +15,7 @@ package common import ( + "context" "crypto/tls" "crypto/x509" "fmt" @@ -36,7 +37,7 @@ type ConfigTLS struct { } // Validate executes manual validations beyond what is defined in struct tags. -func (c ConfigTLS) Validate() error { +func (c ConfigTLS) Validate(context.Context) error { _, err := c.tls() return err } diff --git a/connector.go b/connector.go index beb91a9..c8bc059 100644 --- a/connector.go +++ b/connector.go @@ -12,16 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:generate specgen + // Package kafka contains implementations for Kafka source and destination // connectors for Conduit. package kafka import ( + _ "embed" + sdk "github.com/conduitio/conduit-connector-sdk" ) +//go:embed connector.yaml +var specs string + var Connector = sdk.Connector{ - NewSpecification: Specification, + NewSpecification: sdk.YAMLSpecification(specs), NewSource: NewSource, NewDestination: NewDestination, } diff --git a/connector.yaml b/connector.yaml new file mode 100644 index 0000000..67b6ae6 --- /dev/null +++ b/connector.yaml @@ -0,0 +1,321 @@ +version: "1.0" +specification: + name: kafka + summary: A Kafka source and destination plugin for Conduit, written in Go. + description: |- + This is the description of this connector. + + You can use _markdown_ ..... + version: v0.11.1-25-g8c62739-dirty + author: Meroxa, Inc. Something + source: + parameters: + - name: servers + description: |- + Servers is a list of Kafka bootstrap servers, which will be used to + discover all the servers in a cluster. + type: string + default: "" + validations: + - type: required + value: "" + - name: topics + description: Topics is a comma separated list of Kafka topics to read from. + type: string + default: "" + validations: + - type: required + value: "" + - name: caCert + description: CACert is the Kafka broker's certificate. + type: string + default: "" + validations: [] + - name: clientCert + description: ClientCert is the Kafka client's certificate. + type: string + default: "" + validations: [] + - name: clientID + description: |- + ClientID is a unique identifier for client connections established by + this connector. + type: string + default: conduit-connector-kafka + validations: [] + - name: clientKey + description: ClientKey is the Kafka client's private key. + type: string + default: "" + validations: [] + - name: groupID + description: GroupID defines the consumer group id. + type: string + default: "" + validations: [] + - name: insecureSkipVerify + description: |- + InsecureSkipVerify defines whether to validate the broker's certificate + chain and host name. If 'true', accepts any certificate presented by the + server and any host name in that certificate. + type: bool + default: "" + validations: [] + - name: readFromBeginning + description: |- + ReadFromBeginning determines from whence the consumer group should begin + consuming when it finds a partition without a committed offset. If this + options is set to true it will start with the first message in that + partition. + type: bool + default: "" + validations: [] + - name: retryGroupJoinErrors + description: RetryGroupJoinErrors determines whether the connector will continually retry on group join errors. + type: bool + default: "true" + validations: [] + - name: saslMechanism + description: |- + Mechanism configures the connector to use SASL authentication. If + empty, no authentication will be performed. + type: string + default: "" + validations: + - type: inclusion + value: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 + - name: saslPassword + description: Password sets up the password used with SASL authentication. + type: string + default: "" + validations: [] + - name: saslUsername + description: Username sets up the username used with SASL authentication. + type: string + default: "" + validations: [] + - name: sdk.batch.delay + description: Maximum delay before an incomplete batch is read from the source. + type: duration + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.batch.size + description: Maximum size of batch before it gets read from the source. + type: int + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.schema.context.enabled + description: |- + Specifies whether to use a schema context name. If set to false, no schema context name will + be used, and schemas will be saved with the subject name specified in the connector + (not safe because of name conflicts). + type: bool + default: "true" + validations: [] + - name: sdk.schema.context.name + description: |- + Schema context name to be used. Used as a prefix for all schema subject names. + If empty, defaults to the connector ID. + type: string + default: "" + validations: [] + - name: sdk.schema.extract.key.enabled + description: Whether to extract and encode the record key with a schema. + type: bool + default: "false" + validations: [] + - name: sdk.schema.extract.key.subject + description: |- + The subject of the key schema. If the record metadata contains the field + "opencdc.collection" it is prepended to the subject name and separated + with a dot. + type: string + default: key + validations: [] + - name: sdk.schema.extract.payload.enabled + description: Whether to extract and encode the record payload with a schema. + type: bool + default: "false" + validations: [] + - name: sdk.schema.extract.payload.subject + description: |- + The subject of the payload schema. If the record metadata contains the + field "opencdc.collection" it is prepended to the subject name and + separated with a dot. + type: string + default: payload + validations: [] + - name: sdk.schema.extract.type + description: The type of the payload schema. + type: string + default: avro + validations: + - type: inclusion + value: avro + - name: tls.enabled + description: TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster. + type: bool + default: "" + validations: [] + destination: + parameters: + - name: servers + description: |- + Servers is a list of Kafka bootstrap servers, which will be used to + discover all the servers in a cluster. + type: string + default: "" + validations: + - type: required + value: "" + - name: acks + description: |- + Acks defines the number of acknowledges from partition replicas required + before receiving a response to a produce request. + None = fire and forget, one = wait for the leader to acknowledge the + writes, all = wait for the full ISR to acknowledge the writes. + type: string + default: all + validations: + - type: inclusion + value: none,one,all + - name: batchBytes + description: |- + BatchBytes limits the maximum size of a request in bytes before being + sent to a partition. This mirrors Kafka's max.message.bytes. + type: int + default: "1000012" + validations: [] + - name: caCert + description: CACert is the Kafka broker's certificate. + type: string + default: "" + validations: [] + - name: clientCert + description: ClientCert is the Kafka client's certificate. + type: string + default: "" + validations: [] + - name: clientID + description: |- + ClientID is a unique identifier for client connections established by + this connector. + type: string + default: conduit-connector-kafka + validations: [] + - name: clientKey + description: ClientKey is the Kafka client's private key. + type: string + default: "" + validations: [] + - name: compression + description: Compression set the compression codec to be used to compress messages. + type: string + default: snappy + validations: + - type: inclusion + value: none,gzip,snappy,lz4,zstd + - name: deliveryTimeout + description: DeliveryTimeout for write operation performed by the Writer. + type: duration + default: "" + validations: [] + - name: insecureSkipVerify + description: |- + InsecureSkipVerify defines whether to validate the broker's certificate + chain and host name. If 'true', accepts any certificate presented by the + server and any host name in that certificate. + type: bool + default: "" + validations: [] + - name: saslMechanism + description: |- + Mechanism configures the connector to use SASL authentication. If + empty, no authentication will be performed. + type: string + default: "" + validations: + - type: inclusion + value: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 + - name: saslPassword + description: Password sets up the password used with SASL authentication. + type: string + default: "" + validations: [] + - name: saslUsername + description: Username sets up the username used with SASL authentication. + type: string + default: "" + validations: [] + - name: sdk.batch.delay + description: Maximum delay before an incomplete batch is written to the destination. + type: duration + default: "0" + validations: [] + - name: sdk.batch.size + description: Maximum size of batch before it gets written to the destination. + type: int + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.rate.burst + description: |- + Allow bursts of at most X records (0 or less means that bursts are not + limited). Only takes effect if a rate limit per second is set. Note that + if `sdk.batch.size` is bigger than `sdk.rate.burst`, the effective batch + size will be equal to `sdk.rate.burst`. + type: int + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.rate.perSecond + description: Maximum number of records written per second (0 means no rate limit). + type: float + default: "0" + validations: + - type: greater-than + value: "-1" + - name: sdk.record.format + description: |- + The format of the output record. See the Conduit documentation for a full + list of supported formats (https://conduit.io/docs/using/connectors/configuration-parameters/output-format). + type: string + default: opencdc/json + validations: [] + - name: sdk.record.format.options + description: |- + Options to configure the chosen output record format. Options are normally + key=value pairs separated with comma (e.g. opt1=val2,opt2=val2), except + for the `template` record format, where options are a Go template. + type: string + default: "" + validations: [] + - name: sdk.schema.extract.key.enabled + description: Whether to extract and decode the record key with a schema. + type: bool + default: "true" + validations: [] + - name: sdk.schema.extract.payload.enabled + description: Whether to extract and decode the record payload with a schema. + type: bool + default: "true" + validations: [] + - name: tls.enabled + description: TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster. + type: bool + default: "" + validations: [] + - name: topic + description: |- + Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template) + that will be executed for each record to determine the topic. By default, + the topic is the value of the `opencdc.collection` metadata field. + type: string + default: '{{ index .Metadata "opencdc.collection" }}' + validations: [] diff --git a/destination.go b/destination.go index a7b0bda..c335d47 100644 --- a/destination.go +++ b/destination.go @@ -19,7 +19,6 @@ import ( "fmt" "strings" - "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/destination" sdk "github.com/conduitio/conduit-connector-sdk" @@ -33,35 +32,22 @@ type Destination struct { } func NewDestination() sdk.Destination { - return sdk.DestinationWithMiddleware(&Destination{}, sdk.DefaultDestinationMiddleware()...) + return sdk.DestinationWithMiddleware(&Destination{}) } -func (d *Destination) Parameters() config.Parameters { - return destination.Config{}.Parameters() +func (d *Destination) Config() sdk.DestinationConfig { + return &d.config } -func (d *Destination) Configure(ctx context.Context, cfg config.Config) error { - err := sdk.Util.ParseConfig(ctx, cfg, &d.config, NewDestination().Parameters()) - if err != nil { - return err - } - err = d.config.Validate() - if err != nil { - return err - } - - recordFormat := cfg[sdk.DestinationWithRecordFormatConfig{}.RecordFormatParameterName()] - if recordFormat != "" { - recordFormatType, _, _ := strings.Cut(recordFormat, "/") +func (d *Destination) Open(ctx context.Context) error { + recordFormat := d.config.RecordFormat + if recordFormat != nil && *recordFormat != "" { + recordFormatType, _, _ := strings.Cut(*recordFormat, "/") if recordFormatType == (sdk.DebeziumConverter{}.Name()) { d.config = d.config.WithKafkaConnectKeyFormat() } } - return nil -} - -func (d *Destination) Open(ctx context.Context) error { err := d.config.TryDial(ctx) if err != nil { return fmt.Errorf("failed to dial broker: %w", err) diff --git a/destination/config.go b/destination/config.go index 0aacdfb..16b0ec6 100644 --- a/destination/config.go +++ b/destination/config.go @@ -12,11 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate paramgen -output=paramgen.go Config - package destination import ( + "context" "errors" "fmt" "regexp" @@ -27,6 +26,7 @@ import ( "github.com/Masterminds/sprig/v3" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/common" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/twmb/franz-go/pkg/kgo" ) @@ -36,6 +36,7 @@ var ( ) type Config struct { + sdk.DefaultDestinationMiddleware common.Config // Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template) @@ -100,10 +101,10 @@ func (c Config) CompressionCodecs() []kgo.CompressionCodec { } // Validate executes manual validations beyond what is defined in struct tags. -func (c Config) Validate() error { +func (c Config) Validate(ctx context.Context) error { var multierr []error - err := c.Config.Validate() + err := c.Config.Validate(ctx) if err != nil { multierr = append(multierr, err) } diff --git a/destination/config_test.go b/destination/config_test.go index fe5a2bb..1beb196 100644 --- a/destination/config_test.go +++ b/destination/config_test.go @@ -15,6 +15,7 @@ package destination import ( + "context" "strings" "testing" @@ -72,7 +73,7 @@ func TestConfig_Validate(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { is := is.New(t) - err := tc.config.Validate() + err := tc.config.Validate(context.Background()) if tc.wantErr != "" && err == nil { t.Errorf("expected error, got nil") diff --git a/destination/paramgen.go b/destination/paramgen.go deleted file mode 100644 index 611edf0..0000000 --- a/destination/paramgen.go +++ /dev/null @@ -1,129 +0,0 @@ -// Code generated by paramgen. DO NOT EDIT. -// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen - -package destination - -import ( - "github.com/conduitio/conduit-commons/config" -) - -const ( - ConfigAcks = "acks" - ConfigBatchBytes = "batchBytes" - ConfigCaCert = "caCert" - ConfigClientCert = "clientCert" - ConfigClientID = "clientID" - ConfigClientKey = "clientKey" - ConfigCompression = "compression" - ConfigDeliveryTimeout = "deliveryTimeout" - ConfigInsecureSkipVerify = "insecureSkipVerify" - ConfigSaslMechanism = "saslMechanism" - ConfigSaslPassword = "saslPassword" - ConfigSaslUsername = "saslUsername" - ConfigServers = "servers" - ConfigTlsEnabled = "tls.enabled" - ConfigTopic = "topic" -) - -func (Config) Parameters() map[string]config.Parameter { - return map[string]config.Parameter{ - ConfigAcks: { - Default: "all", - Description: "Acks defines the number of acknowledges from partition replicas required\nbefore receiving a response to a produce request.\nNone = fire and forget, one = wait for the leader to acknowledge the\nwrites, all = wait for the full ISR to acknowledge the writes.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"none", "one", "all"}}, - }, - }, - ConfigBatchBytes: { - Default: "1000012", - Description: "BatchBytes limits the maximum size of a request in bytes before being\nsent to a partition. This mirrors Kafka's max.message.bytes.", - Type: config.ParameterTypeInt, - Validations: []config.Validation{}, - }, - ConfigCaCert: { - Default: "", - Description: "CACert is the Kafka broker's certificate.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigClientCert: { - Default: "", - Description: "ClientCert is the Kafka client's certificate.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigClientID: { - Default: "conduit-connector-kafka", - Description: "ClientID is a unique identifier for client connections established by\nthis connector.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigClientKey: { - Default: "", - Description: "ClientKey is the Kafka client's private key.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigCompression: { - Default: "snappy", - Description: "Compression set the compression codec to be used to compress messages.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"none", "gzip", "snappy", "lz4", "zstd"}}, - }, - }, - ConfigDeliveryTimeout: { - Default: "", - Description: "DeliveryTimeout for write operation performed by the Writer.", - Type: config.ParameterTypeDuration, - Validations: []config.Validation{}, - }, - ConfigInsecureSkipVerify: { - Default: "", - Description: "InsecureSkipVerify defines whether to validate the broker's certificate\nchain and host name. If 'true', accepts any certificate presented by the\nserver and any host name in that certificate.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigSaslMechanism: { - Default: "", - Description: "Mechanism configures the connector to use SASL authentication. If\nempty, no authentication will be performed.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}}, - }, - }, - ConfigSaslPassword: { - Default: "", - Description: "Password sets up the password used with SASL authentication.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigSaslUsername: { - Default: "", - Description: "Username sets up the username used with SASL authentication.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigServers: { - Default: "", - Description: "Servers is a list of Kafka bootstrap servers, which will be used to\ndiscover all the servers in a cluster.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - ConfigTlsEnabled: { - Default: "", - Description: "TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigTopic: { - Default: "{{ index .Metadata \"opencdc.collection\" }}", - Description: "Topic is the Kafka topic. It can contain a [Go template](https://pkg.go.dev/text/template)\nthat will be executed for each record to determine the topic. By default,\nthe topic is the value of the `opencdc.collection` metadata field.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - } -} diff --git a/destination_integration_test.go b/destination_integration_test.go index 8175726..e473ab8 100644 --- a/destination_integration_test.go +++ b/destination_integration_test.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit-connector-kafka/destination" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" ) @@ -49,7 +50,10 @@ func testDestinationIntegrationWrite(t *testing.T, cfg map[string]string) { is.NoErr(err) }() - err := underTest.Configure(ctx, cfg) + err := sdk.Util.ParseConfig(ctx, cfg, underTest.Config(), Connector.NewSpecification().DestinationParams) + is.NoErr(err) + + err = underTest.Config().Validate(ctx) is.NoErr(err) err = underTest.Open(ctx) diff --git a/go.mod b/go.mod index 142ce79..27f5ffe 100644 --- a/go.mod +++ b/go.mod @@ -4,8 +4,8 @@ go 1.23.2 require ( github.com/Masterminds/sprig/v3 v3.3.0 - github.com/conduitio/conduit-commons v0.5.0 - github.com/conduitio/conduit-connector-sdk v0.12.0 + github.com/conduitio/conduit-commons v0.5.1-0.20241212191338-65107596b166 + github.com/conduitio/conduit-connector-sdk v0.12.1-0.20241223110702-9d9a69d4f885 github.com/goccy/go-json v0.10.4 github.com/golangci/golangci-lint v1.62.2 github.com/google/go-cmp v0.6.0 @@ -54,6 +54,9 @@ require ( github.com/chavacava/garif v0.1.0 // indirect github.com/ckaznocha/intrange v0.2.1 // indirect github.com/conduitio/conduit-connector-protocol v0.9.0 // indirect + github.com/conduitio/evolviconf v0.0.0-20241105144321-27c16bddeb38 // indirect + github.com/conduitio/evolviconf/evolviyaml v0.0.0-20241105144803-b3ba81765197 // indirect + github.com/conduitio/yaml/v3 v3.3.0 // indirect github.com/curioswitch/go-reassign v0.2.0 // indirect github.com/daixiang0/gci v0.13.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -160,6 +163,9 @@ require ( github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect github.com/sagikazarmark/locafero v0.6.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect + github.com/samber/lo v1.44.0 // indirect + github.com/samber/slog-common v0.17.0 // indirect + github.com/samber/slog-zerolog/v2 v2.7.0 // indirect github.com/sanposhiho/wastedassign/v2 v2.0.7 // indirect github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/sashamelentyev/interfacebloat v1.1.0 // indirect @@ -206,19 +212,19 @@ require ( go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.29.0 // indirect - golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f // indirect golang.org/x/exp/typeparams v0.0.0-20241108190413-2d47ceb2692f // indirect golang.org/x/mod v0.22.0 // indirect - golang.org/x/net v0.31.0 // indirect - golang.org/x/sync v0.9.0 // indirect - golang.org/x/sys v0.27.0 // indirect - golang.org/x/text v0.20.0 // indirect + golang.org/x/net v0.32.0 // indirect + golang.org/x/sync v0.10.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.8.0 // indirect - golang.org/x/tools v0.27.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f // indirect - google.golang.org/grpc v1.68.0 // indirect - google.golang.org/protobuf v1.35.1 // indirect + golang.org/x/tools v0.28.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 // indirect + google.golang.org/grpc v1.69.2 // indirect + google.golang.org/protobuf v1.35.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 8c6bdba..6d3664e 100644 --- a/go.sum +++ b/go.sum @@ -76,12 +76,18 @@ github.com/chavacava/garif v0.1.0 h1:2JHa3hbYf5D9dsgseMKAmc/MZ109otzgNFk5s87H9Pc github.com/chavacava/garif v0.1.0/go.mod h1:XMyYCkEL58DF0oyW4qDjjnPWONs2HBqYKI+UIPD+Gww= github.com/ckaznocha/intrange v0.2.1 h1:M07spnNEQoALOJhwrImSrJLaxwuiQK+hA2DeajBlwYk= github.com/ckaznocha/intrange v0.2.1/go.mod h1:7NEhVyf8fzZO5Ds7CRaqPEm52Ut83hsTiL5zbER/HYk= -github.com/conduitio/conduit-commons v0.5.0 h1:28UIuOIo+6WvBZ4EU54KfPhSf44I1/Y65zQ9dC0Ps1E= -github.com/conduitio/conduit-commons v0.5.0/go.mod h1:xyT6XpGvj79gdtsn3qaD2KxadhsAYS+mmBOdln08Wio= +github.com/conduitio/conduit-commons v0.5.1-0.20241212191338-65107596b166 h1:YeT83g1/YSEEyYmqyUzHa8xzRAhPauF3Pzwf1+Kx0Ys= +github.com/conduitio/conduit-commons v0.5.1-0.20241212191338-65107596b166/go.mod h1:RNlFZZ+Dx9LjobEGfyHwtKp34Emh/0RVzeFMTEQ6vFk= github.com/conduitio/conduit-connector-protocol v0.9.0 h1:7MailxYxAsr376Nz8WStVYSXnlf86bjtzpA/d/66if0= github.com/conduitio/conduit-connector-protocol v0.9.0/go.mod h1:lF7RUjr9ZMj1rtNubaryHw4mPfjj4DGYDW+wvvRwBkM= -github.com/conduitio/conduit-connector-sdk v0.12.0 h1:WD/ZQhEAJMkvkq0KIyVCGeU8ni2ASMyPpBbAWZQ+lKo= -github.com/conduitio/conduit-connector-sdk v0.12.0/go.mod h1:keZ4eZ4q+7GFEz+Q8G97wvPrrdnBoxh+Bmxl9P9pZW0= +github.com/conduitio/conduit-connector-sdk v0.12.1-0.20241223110702-9d9a69d4f885 h1:R/GqFK0Gi4ZAJwAywvYK1irFMNG7TgzjYXXXXTm9XP0= +github.com/conduitio/conduit-connector-sdk v0.12.1-0.20241223110702-9d9a69d4f885/go.mod h1:Ex9puhGrJxjRCTp1SUFZXpsxa2Edqo8SpVFrZaeFP2E= +github.com/conduitio/evolviconf v0.0.0-20241105144321-27c16bddeb38 h1:hUvQ2irc5CVELscW0kSuTTTqjI/uBqtbCTTbUxDLv70= +github.com/conduitio/evolviconf v0.0.0-20241105144321-27c16bddeb38/go.mod h1:xhvEztHqNrIpDFYfbdxZaCpw4E8iM8R0R2mhoOHUfbM= +github.com/conduitio/evolviconf/evolviyaml v0.0.0-20241105144803-b3ba81765197 h1:XlsNXamx9GdCanxvAENHl5qwp0gICa9AsHI2OBn2lUE= +github.com/conduitio/evolviconf/evolviyaml v0.0.0-20241105144803-b3ba81765197/go.mod h1:22+FHPuroT5pPZpg0fuhE8ACIMCl1S+HsAFN1CM3Vho= +github.com/conduitio/yaml/v3 v3.3.0 h1:kbbaOSHcuH39gP4+rgbJGl6DSbLZcJgEaBvkEXJlCsI= +github.com/conduitio/yaml/v3 v3.3.0/go.mod h1:JNgFMOX1t8W4YJuRZOh6GggVtSMsgP9XgTw+7dIenpc= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/curioswitch/go-reassign v0.2.0 h1:G9UZyOcpk/d7Gd6mqYgd8XYWFMw/znxwGDUstnC9DIo= @@ -115,6 +121,8 @@ github.com/go-critic/go-critic v0.11.5 h1:TkDTOn5v7EEngMxu8KbuFqFR43USaaH8XRJLz1 github.com/go-critic/go-critic v0.11.5/go.mod h1:wu6U7ny9PiaHaZHcvMDmdysMqvDem162Rh3zWTrqk8M= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= @@ -177,8 +185,8 @@ github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/pprof v0.0.0-20241017200806-017d972448fc h1:NGyrhhFhwvRAZg02jnYVg3GBQy0qGBKmFQJwaPmpmxs= -github.com/google/pprof v0.0.0-20241017200806-017d972448fc/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= +github.com/google/pprof v0.0.0-20241101162523-b92577c0c142 h1:sAGdeJj0bnMgUNVeUpp6AYlVdCt3/GdI3pGRqsNSQLs= +github.com/google/pprof v0.0.0-20241101162523-b92577c0c142/go.mod h1:vavhavw2zAxS5dIdcRluK6cSGGPlZynqzFM8NdvU144= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gordonklaus/ineffassign v0.1.0 h1:y2Gd/9I7MdY1oEIt+n+rowjBNDcLQq3RsH5hwJd0f9s= @@ -216,8 +224,8 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jgautheron/goconst v1.7.1 h1:VpdAG7Ca7yvvJk5n8dMwQhfEZJh95kl/Hl9S1OI5Jkk= github.com/jgautheron/goconst v1.7.1/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4= -github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+TBDg= -github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8= +github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/jingyugao/rowserrcheck v1.1.1 h1:zibz55j/MJtLsjP1OF4bSdgXxwL1b+Vn7Tjzq7gFzUs= github.com/jingyugao/rowserrcheck v1.1.1/go.mod h1:4yvlZSDb3IyDTUZJUmpZfm2Hwok+Dtp+nu2qOq+er9c= github.com/jjti/go-spancheck v0.6.2 h1:iYtoxqPMzHUPp7St+5yA8+cONdyXD3ug6KK15n7Pklk= @@ -311,8 +319,8 @@ github.com/oklog/run v1.1.0 h1:GEenZ1cK0+q0+wsJew9qUg/DyD8k3JzYsZAi5gYi2mA= github.com/oklog/run v1.1.0/go.mod h1:sVPdnTZT1zYwAJeCMu2Th4T21pA3FPOQRfWjQlk7DVU= github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= -github.com/onsi/ginkgo/v2 v2.20.2 h1:7NVCeyIWROIAheY21RLS+3j2bb52W0W82tkberYytp4= -github.com/onsi/ginkgo/v2 v2.20.2/go.mod h1:K9gyxPIlb+aIvnZ8bd9Ak+YP18w3APlR+5coaZoE2ag= +github.com/onsi/ginkgo/v2 v2.21.0 h1:7rg/4f3rB88pb5obDgNZrNHrQ4e6WpjonchcpuBRnZM= +github.com/onsi/ginkgo/v2 v2.21.0/go.mod h1:7Du3c42kxCUegi0IImZ1wUQzMBVecgIHjR1C+NkhLQo= github.com/onsi/gomega v1.34.2 h1:pNCwDkzrsv7MS9kpaQvVb1aVLahQXyJ/Tv5oAZMI3i8= github.com/onsi/gomega v1.34.2/go.mod h1:v1xfxRgk0KIsG+QOdm7p8UosrOzPYRo60fd3B/1Dukc= github.com/otiai10/copy v1.2.0/go.mod h1:rrF5dJ5F0t/EWSYODDu4j9/vEeYHMkc8jt0zJChqQWw= @@ -371,6 +379,12 @@ github.com/sagikazarmark/locafero v0.6.0 h1:ON7AQg37yzcRPU69mt7gwhFEBwxI6P9T4Qu3 github.com/sagikazarmark/locafero v0.6.0/go.mod h1:77OmuIc6VTraTXKXIs/uvUxKGUXjE1GbemJYHqdNjX0= github.com/sagikazarmark/slog-shim v0.1.0 h1:diDBnUNK9N/354PgrxMywXnAwEr1QZcOr6gto+ugjYE= github.com/sagikazarmark/slog-shim v0.1.0/go.mod h1:SrcSrq8aKtyuqEI1uvTDTK1arOWRIczQRv+GVI1AkeQ= +github.com/samber/lo v1.44.0 h1:5il56KxRE+GHsm1IR+sZ/6J42NODigFiqCWpSc2dybA= +github.com/samber/lo v1.44.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/samber/slog-common v0.17.0 h1:HdRnk7QQTa9ByHlLPK3llCBo8ZSX3F/ZyeqVI5dfMtI= +github.com/samber/slog-common v0.17.0/go.mod h1:mZSJhinB4aqHziR0SKPqpVZjJ0JO35JfH+dDIWqaCBk= +github.com/samber/slog-zerolog/v2 v2.7.0 h1:VWJNhvoR3bf+SDEO89BmahAnz6w5l+NGbPBcnMUqO2g= +github.com/samber/slog-zerolog/v2 v2.7.0/go.mod h1:vGzG7VhveVOnyHEpr7LpIuw28QxEOfV/dQxphJRB4iY= github.com/sanposhiho/wastedassign/v2 v2.0.7 h1:J+6nrY4VW+gC9xFzUc+XjPD3g3wF3je/NsJFwFK7Uxc= github.com/sanposhiho/wastedassign/v2 v2.0.7/go.mod h1:KyZ0MWTwxxBmfwn33zh3k1dmsbF2ud9pAAGfoLfjhtI= github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 h1:lZUw3E0/J3roVtGQ+SCrUrg3ON6NgVqpn3+iol9aGu4= @@ -484,6 +498,16 @@ go-simpler.org/musttag v0.13.0 h1:Q/YAW0AHvaoaIbsPj3bvEI5/QFP7w696IMUpnKXQfCE= go-simpler.org/musttag v0.13.0/go.mod h1:FTzIGeK6OkKlUDVpj0iQUXZLUO1Js9+mvykDQy9C5yM= go-simpler.org/sloglint v0.7.2 h1:Wc9Em/Zeuu7JYpl+oKoYOsQSy2X560aVueCW/m6IijY= go-simpler.org/sloglint v0.7.2/go.mod h1:US+9C80ppl7VsThQclkM7BkCHQAzuz8kHLsW3ppuluo= +go.opentelemetry.io/otel v1.32.0 h1:WnBN+Xjcteh0zdk01SVqV55d/m62NJLJdIyb4y/WO5U= +go.opentelemetry.io/otel v1.32.0/go.mod h1:00DCVSB0RQcnzlwyTfqtxSm+DRr9hpYrHjNGiBHVQIg= +go.opentelemetry.io/otel/metric v1.32.0 h1:xV2umtmNcThh2/a/aCP+h64Xx5wsj8qqnkYZktzNa0M= +go.opentelemetry.io/otel/metric v1.32.0/go.mod h1:jH7CIbbK6SH2V2wE16W05BHCtIDzauciCRLoc/SyMv8= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= +go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= +go.opentelemetry.io/otel/trace v1.32.0 h1:WIC9mYrXf8TmY/EXuULKc8hR17vE+Hjv2cssQDe03fM= +go.opentelemetry.io/otel/trace v1.32.0/go.mod h1:+i4rkvCraA+tG6AzwloGaCtkx53Fa+L+V8e9a7YvhT8= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= @@ -500,10 +524,10 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= -golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= -golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= -golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= -golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= +golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo= +golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak= golang.org/x/exp/typeparams v0.0.0-20220428152302-39d4317da171/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20230203172020-98cc5a0785f9/go.mod h1:AbB0pIl9nAr9wVwH+Z2ZpaocVmF5I4GyWCDIsVjR0bk= golang.org/x/exp/typeparams v0.0.0-20241108190413-2d47ceb2692f h1:WTyX8eCCyfdqiPYkRGm0MqElSfYFH3yR1+rl/mct9sA= @@ -537,8 +561,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= -golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= +golang.org/x/net v0.32.0 h1:ZqPmj8Kzc+Y6e0+skZsuACbx+wzMgo5MQsJh9Qd6aYI= +golang.org/x/net v0.32.0/go.mod h1:CwU0IoeOlnQQWJ6ioyFrfRuomB8GKF6KbYXZVyeXNfs= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -548,8 +572,8 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= -golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -579,8 +603,8 @@ golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= -golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -599,8 +623,8 @@ golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= -golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -624,18 +648,18 @@ golang.org/x/tools v0.5.0/go.mod h1:N+Kgy78s5I24c24dU8OfWNEotWjutIs8SnJvn5IDq+k= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58= golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg= -golang.org/x/tools v0.27.0 h1:qEKojBykQkQ4EynWy4S8Weg69NumxKdn40Fce3uc/8o= -golang.org/x/tools v0.27.0/go.mod h1:sUi0ZgbwW9ZPAq26Ekut+weQPR5eIM6GQLQ1Yjm1H0Q= +golang.org/x/tools v0.28.0 h1:WuB6qZ4RPCQo5aP3WdKZS7i595EdWqWR8vqJTlwTVK8= +golang.org/x/tools v0.28.0/go.mod h1:dcIOrVd3mfQKTgrDVQHqCPMWy6lnhfhtX3hLXYVLfRw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f h1:cUMEy+8oS78BWIH9OWazBkzbr090Od9tWBNtZHkOhf0= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240930140551-af27646dc61f/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= -google.golang.org/grpc v1.68.0 h1:aHQeeJbo8zAkAa3pRzrVjZlbz6uSfeOXlJNQM0RAbz0= -google.golang.org/grpc v1.68.0/go.mod h1:fmSPC5AsjSBCK54MyHRx48kpOti1/jRfOlwEWywNjWA= -google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA= -google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28 h1:XVhgTWWV3kGQlwJHR3upFWZeTsei6Oks1apkZSeonIE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20241104194629-dd2ea8efbc28/go.mod h1:GX3210XPVPUjJbTUbvwI8f2IpZDMZuPJWDzDuebbviI= +google.golang.org/grpc v1.69.2 h1:U3S9QEtbXC0bYNvRtcoklF3xGtLViumSYxWykJS+7AU= +google.golang.org/grpc v1.69.2/go.mod h1:vyjdE6jLBI76dgpDojsFGNaHlxdjXN9ghpnd2o7JGZ4= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/source.go b/source.go index c257bb0..ce39dee 100644 --- a/source.go +++ b/source.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "github.com/conduitio/conduit-commons/config" "github.com/conduitio/conduit-commons/lang" "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" @@ -38,32 +37,21 @@ type Source struct { } func NewSource() sdk.Source { - return sdk.SourceWithMiddleware( - &Source{}, - sdk.DefaultSourceMiddleware( - // disable schema extraction by default, because the source produces raw data - sdk.SourceWithSchemaExtractionConfig{ - PayloadEnabled: lang.Ptr((false)), - KeyEnabled: lang.Ptr(false), + return sdk.SourceWithMiddleware(&Source{ + config: source.Config{ + DefaultSourceMiddleware: sdk.DefaultSourceMiddleware{ + SourceWithSchemaExtraction: sdk.SourceWithSchemaExtraction{ + PayloadEnabled: lang.Ptr(false), + KeyEnabled: lang.Ptr(false), + }, }, - )..., - ) + }, + }) } -func (s *Source) Parameters() config.Parameters { - return source.Config{}.Parameters() -} - -func (s *Source) Configure(ctx context.Context, cfg config.Config) error { - err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters()) - if err != nil { - return err - } - err = s.config.Validate(ctx) - if err != nil { - return err - } - return nil +// Config returns the currently active configuration of the source. +func (s *Source) Config() sdk.SourceConfig { + return &s.config } func (s *Source) Open(ctx context.Context, sdkPos opencdc.Position) error { diff --git a/source/config.go b/source/config.go index 1e47fa2..2ef1282 100644 --- a/source/config.go +++ b/source/config.go @@ -12,26 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -//go:generate paramgen -output=paramgen.go Config - package source import ( "context" - "errors" - "fmt" "github.com/conduitio/conduit-connector-kafka/common" sdk "github.com/conduitio/conduit-connector-sdk" ) type Config struct { + sdk.DefaultSourceMiddleware common.Config // Topics is a comma separated list of Kafka topics to read from. - Topics []string `json:"topics"` - // Topic {WARN will be deprecated soon} the kafka topic to read from. - Topic string `json:"topic"` + Topics []string `json:"topics" validate:"required"` // ReadFromBeginning determines from whence the consumer group should begin // consuming when it finds a partition without a committed offset. If this // options is set to true it will start with the first message in that @@ -43,25 +38,7 @@ type Config struct { RetryGroupJoinErrors bool `json:"retryGroupJoinErrors" default:"true"` } -// Validate executes manual validations beyond what is defined in struct tags. func (c *Config) Validate(ctx context.Context) error { - var multierr []error - err := c.Config.Validate() - if err != nil { - multierr = append(multierr, err) - } - // validate and set the topics. - if len(c.Topic) == 0 && len(c.Topics) == 0 { - multierr = append(multierr, fmt.Errorf("required parameter missing: %q", "topics")) - } - if len(c.Topic) > 0 && len(c.Topics) > 0 { - multierr = append(multierr, fmt.Errorf(`can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`)) - } - if len(c.Topic) > 0 && len(c.Topics) == 0 { - sdk.Logger(ctx).Warn().Msg(`"topic" parameter is deprecated and will be removed, please use "topics" instead.`) - // add the topic value to the topics slice. - c.Topics = make([]string, 1) - c.Topics[0] = c.Topic - } - return errors.Join(multierr...) + // custom validation can be added here + return c.DefaultSourceMiddleware.Validate(ctx) } diff --git a/source/config_test.go b/source/config_test.go index 983c578..1eb666e 100644 --- a/source/config_test.go +++ b/source/config_test.go @@ -32,27 +32,6 @@ func TestConfig_ValidateTopics(t *testing.T) { wantErr string }{ { - name: `one of "topic" and "topics" should be provided.`, - cfg: Config{ - Topics: []string{}, - Topic: "", - }, - wantErr: `required parameter missing: "topics"`, - }, { - name: "invalid, only provide one.", - cfg: Config{ - Topics: []string{"topic2"}, - Topic: "topic1", - }, - wantErr: `can't provide both "topic" and "topics" parameters, "topic" is deprecated and will be removed, use the "topics" parameter instead`, - }, { - name: "valid with warning, will be deprecated soon", - cfg: Config{ - Topics: []string{}, - Topic: "topic1", - }, - wantErr: "", - }, { name: "valid", cfg: Config{ Topics: []string{"topic1"}, diff --git a/source/paramgen.go b/source/paramgen.go deleted file mode 100644 index ce88ce5..0000000 --- a/source/paramgen.go +++ /dev/null @@ -1,125 +0,0 @@ -// Code generated by paramgen. DO NOT EDIT. -// Source: github.com/ConduitIO/conduit-commons/tree/main/paramgen - -package source - -import ( - "github.com/conduitio/conduit-commons/config" -) - -const ( - ConfigCaCert = "caCert" - ConfigClientCert = "clientCert" - ConfigClientID = "clientID" - ConfigClientKey = "clientKey" - ConfigGroupID = "groupID" - ConfigInsecureSkipVerify = "insecureSkipVerify" - ConfigReadFromBeginning = "readFromBeginning" - ConfigRetryGroupJoinErrors = "retryGroupJoinErrors" - ConfigSaslMechanism = "saslMechanism" - ConfigSaslPassword = "saslPassword" - ConfigSaslUsername = "saslUsername" - ConfigServers = "servers" - ConfigTlsEnabled = "tls.enabled" - ConfigTopic = "topic" - ConfigTopics = "topics" -) - -func (Config) Parameters() map[string]config.Parameter { - return map[string]config.Parameter{ - ConfigCaCert: { - Default: "", - Description: "CACert is the Kafka broker's certificate.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigClientCert: { - Default: "", - Description: "ClientCert is the Kafka client's certificate.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigClientID: { - Default: "conduit-connector-kafka", - Description: "ClientID is a unique identifier for client connections established by\nthis connector.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigClientKey: { - Default: "", - Description: "ClientKey is the Kafka client's private key.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigGroupID: { - Default: "", - Description: "GroupID defines the consumer group id.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigInsecureSkipVerify: { - Default: "", - Description: "InsecureSkipVerify defines whether to validate the broker's certificate\nchain and host name. If 'true', accepts any certificate presented by the\nserver and any host name in that certificate.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigReadFromBeginning: { - Default: "", - Description: "ReadFromBeginning determines from whence the consumer group should begin\nconsuming when it finds a partition without a committed offset. If this\noptions is set to true it will start with the first message in that\npartition.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigRetryGroupJoinErrors: { - Default: "true", - Description: "RetryGroupJoinErrors determines whether the connector will continually retry on group join errors.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigSaslMechanism: { - Default: "", - Description: "Mechanism configures the connector to use SASL authentication. If\nempty, no authentication will be performed.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationInclusion{List: []string{"PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"}}, - }, - }, - ConfigSaslPassword: { - Default: "", - Description: "Password sets up the password used with SASL authentication.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigSaslUsername: { - Default: "", - Description: "Username sets up the username used with SASL authentication.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigServers: { - Default: "", - Description: "Servers is a list of Kafka bootstrap servers, which will be used to\ndiscover all the servers in a cluster.", - Type: config.ParameterTypeString, - Validations: []config.Validation{ - config.ValidationRequired{}, - }, - }, - ConfigTlsEnabled: { - Default: "", - Description: "TLSEnabled defines whether TLS is needed to communicate with the Kafka cluster.", - Type: config.ParameterTypeBool, - Validations: []config.Validation{}, - }, - ConfigTopic: { - Default: "", - Description: "Topic {WARN will be deprecated soon} the kafka topic to read from.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - ConfigTopics: { - Default: "", - Description: "Topics is a comma separated list of Kafka topics to read from.", - Type: config.ParameterTypeString, - Validations: []config.Validation{}, - }, - } -} diff --git a/source_integration_test.go b/source_integration_test.go index 8b8955a..9a643f9 100644 --- a/source_integration_test.go +++ b/source_integration_test.go @@ -21,6 +21,7 @@ import ( "github.com/conduitio/conduit-commons/opencdc" "github.com/conduitio/conduit-connector-kafka/source" "github.com/conduitio/conduit-connector-kafka/test" + sdk "github.com/conduitio/conduit-connector-sdk" "github.com/matryer/is" "github.com/twmb/franz-go/pkg/kgo" ) @@ -81,8 +82,12 @@ func testSourceIntegrationRead( is.NoErr(err) }() - err := underTest.Configure(ctx, cfgMap) + err := sdk.Util.ParseConfig(ctx, cfgMap, underTest.Config(), Connector.NewSpecification().SourceParams) is.NoErr(err) + + err = underTest.Config().Validate(ctx) + is.NoErr(err) + err = underTest.Open(ctx, startFrom) is.NoErr(err) diff --git a/spec.go b/spec.go deleted file mode 100644 index 192627a..0000000 --- a/spec.go +++ /dev/null @@ -1,35 +0,0 @@ -// Copyright © 2022 Meroxa, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package kafka - -import ( - sdk "github.com/conduitio/conduit-connector-sdk" -) - -// version is set during the build process (i.e. the Makefile). -// Default version matches default from runtime/debug. -var version = "(devel)" - -// Specification returns the Kafka plugin's specification. -// Any changes here must also be reflected in the ReadMe. -func Specification() sdk.Specification { - return sdk.Specification{ - Name: "kafka", - Summary: "A Kafka source and destination plugin for Conduit, written in Go.", - Description: "A Kafka source and destination plugin for Conduit, written in Go.", - Version: version, - Author: "Meroxa, Inc.", - } -} diff --git a/test/docker-compose-kafka.yml b/test/docker-compose-kafka.yml index 531f9a7..9f1924c 100644 --- a/test/docker-compose-kafka.yml +++ b/test/docker-compose-kafka.yml @@ -1,4 +1,3 @@ -version: '3.4' services: kafka: image: apache/kafka-native:3.8.0 diff --git a/test/docker-compose-redpanda.yml b/test/docker-compose-redpanda.yml index e14b2e6..d523da7 100644 --- a/test/docker-compose-redpanda.yml +++ b/test/docker-compose-redpanda.yml @@ -1,4 +1,3 @@ -version: '3.7' services: redpanda: image: docker.redpanda.com/redpandadata/redpanda:v23.1.4 diff --git a/tools.go b/tools.go index d2ee24b..65dc5ae 100644 --- a/tools.go +++ b/tools.go @@ -17,7 +17,7 @@ package main import ( - _ "github.com/conduitio/conduit-commons/paramgen" + _ "github.com/conduitio/conduit-connector-sdk/specgen" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "go.uber.org/mock/mockgen" )