From c3838ba19b3067095668b57ce3a5c541dfe8e969 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 2 Jul 2024 20:07:26 +0200 Subject: [PATCH 1/3] etcd: Create new etcd watcher input --- go.mod | 6 + go.sum | 14 +- internal/impl/etcd/client.go | 216 ++++++++++++++++++++++++++++ internal/impl/etcd/config.go | 47 +++++++ internal/impl/etcd/input.go | 225 ++++++++++++++++++++++++++++++ internal/impl/etcd/util.go | 36 +++++ public/components/all/package.go | 1 + public/components/etcd/package.go | 6 + 8 files changed, 550 insertions(+), 1 deletion(-) create mode 100644 internal/impl/etcd/client.go create mode 100644 internal/impl/etcd/config.go create mode 100644 internal/impl/etcd/input.go create mode 100644 internal/impl/etcd/util.go create mode 100644 public/components/etcd/package.go diff --git a/go.mod b/go.mod index 69f32d143..81d83565e 100644 --- a/go.mod +++ b/go.mod @@ -129,6 +129,8 @@ require ( github.com/xitongsys/parquet-go v1.6.2 github.com/xitongsys/parquet-go-source v0.0.0-20211228015320-b4f792c43cd0 github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a + go.etcd.io/etcd/api/v3 v3.5.14 + go.etcd.io/etcd/client/v3 v3.5.14 go.mongodb.org/mongo-driver v1.13.1 go.nanomsg.org/mangos/v3 v3.4.2 go.opentelemetry.io/otel v1.24.0 @@ -207,6 +209,8 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cockroachdb/apd/v3 v3.2.1 // indirect github.com/containerd/continuity v0.3.0 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/couchbase/gocbcore/v10 v10.5.1 // indirect github.com/couchbase/gocbcoreps v0.1.3 // indirect github.com/couchbase/goprotostellar v1.0.2 // indirect @@ -319,6 +323,8 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.etcd.io/bbolt v1.3.10 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect diff --git a/go.sum b/go.sum index 64126e666..cfc060e55 100644 --- a/go.sum +++ b/go.sum @@ -321,8 +321,12 @@ github.com/colinmarc/hdfs v1.1.3/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463Tym github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c= github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg= github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= +github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= +github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ= github.com/couchbase/gocb/v2 v2.9.1/go.mod h1:TMAeK34yUdcASdV4mGcYuwtkAWckRBYN5uvMCEgPfXo= github.com/couchbase/gocbcore/v10 v10.5.1 h1:bwlV/zv/fSQLuO14M9k49K7yWgcWfjSgMyfRGhW1AyU= @@ -465,6 +469,7 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU= github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= @@ -1068,8 +1073,15 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8= go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M= -go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU= go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4= +go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0= +go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ= +go.etcd.io/etcd/api/v3 v3.5.14 h1:vHObSCxyB9zlF60w7qzAdTcGaglbJOpSj1Xj9+WGxq0= +go.etcd.io/etcd/api/v3 v3.5.14/go.mod h1:BmtWcRlQvwa1h3G2jvKYwIQy4PkHlDej5t7uLMUdJUU= +go.etcd.io/etcd/client/pkg/v3 v3.5.14 h1:SaNH6Y+rVEdxfpA2Jr5wkEvN6Zykme5+YnbCkxvuWxQ= +go.etcd.io/etcd/client/pkg/v3 v3.5.14/go.mod h1:8uMgAokyG1czCtIdsq+AGyYQMvpIKnSvPjFMunkgeZI= +go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg= +go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk= go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo= diff --git a/internal/impl/etcd/client.go b/internal/impl/etcd/client.go new file mode 100644 index 000000000..7e40bc8f6 --- /dev/null +++ b/internal/impl/etcd/client.go @@ -0,0 +1,216 @@ +package etcd + +import ( + "context" + "errors" + "strings" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/warpstreamlabs/bento/public/service" +) + +func etcdClientFields() []*service.ConfigField { + return []*service.ConfigField{ + service.NewURLListField(etcdEndpointsField). + Description("A set of URLs (schemes, hosts and ports only) that can be used to communicate with a logical etcd cluster. If multiple endpoints are provided, the Client will attempt to use them all in the event that one or more of them are unusable."). + Examples( + []string{"etcd://:2379"}, + []string{"etcd://localhost:2379"}, + []string{"etcd://localhost:2379", "etcd://localhost:22379", "etcd://localhost:32379"}, + ), + service.NewObjectField(etcdAuthField, + service.NewBoolField(etcdAuthEnabledField). + Description("Whether to use password authentication"). + Default(false), + service.NewStringField(etcdAuthUsernameField). + Description("The username to authenticate as."). + Default(""), + service.NewStringField(etcdAuthPasswordField). + Description("The password to authenticate with."). + Secret(). + Default(""), + ). + Description("Optional configuration of etcd authentication headers."). + Optional(). + Advanced(), + service.NewDurationField(etcdDialTimeoutField). + Description("Timeout for failing to establish a connection."). + Optional(). + Default("5s"). + Advanced(), + service.NewDurationField(etcdKeepAliveTimeField). + Description("Time after which client pings the server to see if transport is alive."). + Optional(). + Default("5s"). + Advanced(), + service.NewDurationField(etcdKeepAliveTimeoutField). + Description("Time that the client waits for a response for the keep-alive probe. If the response is not received in this time, the connection is closed."). + Optional(). + Default("1s"). + Advanced(), + service.NewDurationField(etcdRequestTimeoutField). + Description("Timeout for a single request. This includes connection time, any redirects, and header wait time."). + Optional(). + Default("1s"). + Advanced(), + service.NewTLSToggledField(etcdTLSField). + Description("Custom TLS settings can be used to override system defaults."). + Advanced(), + service.NewDurationField(etcdAutoSyncIntervalField). + Description("The interval to update endpoints with its latest members. 0 disables auto-sync. By default auto-sync is disabled."). + Optional(), + service.NewIntField(etcdMaxCallSendMsgSizeField). + Description("The client-side request send limit in bytes. If 0, it defaults to 2.0 MiB (2 * 1024 * 1024)."). + Optional(). + Advanced(), + service.NewIntField(etcdMaxCallRecvMsgSizeField). + Description("The client-side response receive limit. If 0, it defaults to math.MaxInt32."). + Optional(). + Advanced(), + service.NewBoolField(etcdRejectOldClusterField). + Description("When set, will refuse to create a client against an outdated cluster."). + Default(false). + Advanced(), + service.NewBoolField(etcdPermitWithoutStreamField). + Description("When set, will allow client to send keepalive pings to server without any active streams (RPCs)."). + Default(false). + Advanced(), + service.NewIntField(etcdMaxUnaryRetriesField). + Description("The maximum number of retries for unary RPCs."). + Optional(). + Advanced(), + service.NewDurationField(etcdBackoffWaitBetweenField). + Description("The wait time before retrying an RPC."). + Optional(). + Advanced(), + service.NewFloatField(etcdBackoffJitterFractionField). + Description("The jitter fraction to randomize backoff wait time."). + Optional(). + Advanced(), + } +} + +func newEtcdClientFromConfig(ctx context.Context, cfg *clientv3.Config) (*clientv3.Client, error) { + if cfg == nil { + return nil, errors.New("etcd config cannot be nil") + } + + cfg.Context = ctx + + client, err := clientv3.New(*cfg) + if err != nil { + return nil, err + } + + return client, nil +} + +func newEtcdConfigFromParsed(parsedConf *service.ParsedConfig) (*clientv3.Config, error) { + var cfg clientv3.Config + + endpointStrs, err := parsedConf.FieldStringList(etcdEndpointsField) + if err != nil { + return nil, err + } + if len(endpointStrs) == 0 { + return nil, errors.New("must specify at least one URL") + } + for _, u := range endpointStrs { + for _, splitURL := range strings.Split(u, ",") { + if trimmed := strings.TrimSpace(splitURL); trimmed != "" { + cfg.Endpoints = append(cfg.Endpoints, trimmed) + } + } + } + + if cfg.DialTimeout, err = parsedConf.FieldDuration(etcdDialTimeoutField); err != nil { + return nil, err + } + + if cfg.DialKeepAliveTime, err = parsedConf.FieldDuration(etcdKeepAliveTimeField); err != nil { + return nil, err + } + + if cfg.DialKeepAliveTimeout, err = parsedConf.FieldDuration(etcdKeepAliveTimeoutField); err != nil { + return nil, err + } + + if parsedConf.Contains(etcdAutoSyncIntervalField) { + if cfg.AutoSyncInterval, err = parsedConf.FieldDuration(etcdAutoSyncIntervalField); err != nil { + return nil, err + } + } + + if parsedConf.Contains(etcdMaxCallSendMsgSizeField) { + if cfg.MaxCallSendMsgSize, err = parsedConf.FieldInt(etcdMaxCallSendMsgSizeField); err != nil { + return nil, err + } + } + + if parsedConf.Contains(etcdMaxCallRecvMsgSizeField) { + if cfg.MaxCallRecvMsgSize, err = parsedConf.FieldInt(etcdMaxCallRecvMsgSizeField); err != nil { + return nil, err + } + } + + if cfg.RejectOldCluster, err = parsedConf.FieldBool(etcdRejectOldClusterField); err != nil { + return nil, err + } + + if cfg.PermitWithoutStream, err = parsedConf.FieldBool(etcdPermitWithoutStreamField); err != nil { + return nil, err + } + + if parsedConf.Contains(etcdMaxUnaryRetriesField) { + if maxUnaryRetries, err := parsedConf.FieldInt(etcdMaxUnaryRetriesField); err != nil { + return nil, err + } else { + cfg.MaxUnaryRetries = uint(maxUnaryRetries) + } + } + + if parsedConf.Contains(etcdBackoffWaitBetweenField) { + if cfg.BackoffWaitBetween, err = parsedConf.FieldDuration(etcdBackoffWaitBetweenField); err != nil { + return nil, err + } + } + + if parsedConf.Contains(etcdBackoffJitterFractionField) { + if backoffJitterFraction, err := parsedConf.FieldFloat(etcdBackoffJitterFractionField); err != nil { + return nil, err + } else { + cfg.BackoffJitterFraction = float64(backoffJitterFraction) + } + } + + tlsConf, tlsEnabled, err := parsedConf.FieldTLSToggled(etcdTLSField) + if err != nil { + return nil, err + } + + if tlsEnabled { + cfg.TLS = tlsConf + } + + if parsedConf.Contains(etcdAuthField) { + var authEnabled bool + authConf := parsedConf.Namespace(etcdAuthField) + if authEnabled, err = authConf.FieldBool(etcdAuthEnabledField); err != nil { + return nil, err + } + + if authEnabled { + if cfg.Username, err = authConf.FieldString(etcdAuthUsernameField); err != nil { + return nil, err + } + if cfg.Password, err = authConf.FieldString(etcdAuthPasswordField); err != nil { + return nil, err + } + } + + } + + return &cfg, nil + +} diff --git a/internal/impl/etcd/config.go b/internal/impl/etcd/config.go new file mode 100644 index 000000000..99f0f210a --- /dev/null +++ b/internal/impl/etcd/config.go @@ -0,0 +1,47 @@ +package etcd + +// etcd client configuration +const ( + etcdEndpointsField = "endpoints" + etcdTLSField = "tls" + etcdOperationOptions = "options" + + // Auth + etcdAuthField = "auth" + etcdAuthEnabledField = "enabled" + etcdAuthUsernameField = "username" + etcdAuthPasswordField = "password" + + // Timeouts + etcdDialTimeoutField = "dial_timeout" + etcdKeepAliveTimeField = "keep_alive_time" + etcdKeepAliveTimeoutField = "keep_alive_timeout" + etcdRequestTimeoutField = "request_timeout" + + etcdAutoSyncIntervalField = "auto_sync_interval" + + etcdMaxCallSendMsgSizeField = "max_call_send_msg_size" + etcdMaxCallRecvMsgSizeField = "max_call_recv_msg_size" + etcdMaxUnaryRetriesField = "max_unary_retries" + + etcdRejectOldClusterField = "reject_old_cluster" + etcdPermitWithoutStreamField = "permit_without_stream" + + etcdBackoffWaitBetweenField = "backoff_wait_between" + etcdBackoffJitterFractionField = "backoff_jitter_fraction" +) + +// etcd common options +const ( + etcdKeyField = "key" +) + +// etcd input configuration +const ( + etcdWithPrefixField = "with_prefix" + etcdWatchWithProgressNotifyField = "with_progress_notify" + etcdWatchWithCreatedNotifyField = "with_created_notify" + etcdWatchWithFilterPut = "with_put_filter" + etcdWatchWithFilterDelete = "with_delete_filter" + etcdWatchWithRangeField = "with_range" +) diff --git a/internal/impl/etcd/input.go b/internal/impl/etcd/input.go new file mode 100644 index 000000000..e7e7e6f9b --- /dev/null +++ b/internal/impl/etcd/input.go @@ -0,0 +1,225 @@ +package etcd + +import ( + "context" + + clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/warpstreamlabs/bento/public/service" +) + +func etcdWatchFields() []*service.ConfigField { + return []*service.ConfigField{ + service.NewStringField(etcdKeyField). + Description("The key or prefix being watched. For prefix watching, options.with_prefix should be `true`"), + service.NewObjectField(etcdOperationOptions, + service.NewBoolField(etcdWithPrefixField). + Description("Whether to watch for events on a prefix."). + Default(false), + service.NewBoolField(etcdWatchWithProgressNotifyField). + Description("Whether to send periodic progress updates every 10 minutes when there is no incoming events."). + Default(false), + service.NewBoolField(etcdWatchWithFilterPut). + Description("Whether to discard PUT events from the watcher."). + Default(false), + service.NewBoolField(etcdWatchWithFilterDelete). + Description("Whether to discard DELETE events from the watcher."). + Default(false), + service.NewBoolField(etcdWatchWithCreatedNotifyField). + Description("Whether to send CREATED notify events to the watcher."). + Default(false), + service.NewStringField(etcdWatchWithRangeField). + Description("Will cause the watcher to return a range of lexicographically sorted keys to return in the form `[key, end)` where `end` is the passed parameter."). + Default(""), + ).Description("Collection of options to configure an etcd watcher."), + } +} + +var description = ` +Watches an etcd key or prefix for new events. + +From the [etcd docs](https://etcd.io/docs/v3.5/learning/api/#watch-api): +> The Watch API provides an event-based interface for asynchronously monitoring changes to keys. +> An etcd watch waits for changes to keys by continuously watching from a given revision, +> either current or historical, and streams key updates back to the client. +> +> Watches are long-running requests and use gRPC streams to stream event data. +> A watch stream is bi-directional; the client writes to the stream to establish watches and reads to receive watch events. +> A single watch stream can multiplex many distinct watches by tagging events with per-watch identifiers. +> This multiplexing helps reducing the memory footprint and connection overhead on the core etcd cluster. + +### Events + +Each event object is flattened and returned as an array, with each individual event in the form: + +` + "```json" + ` +{ + "key": "", + "value": "", + "type": "<'PUT' or 'DELETE'>", + "version": "", + "mod_revision": "", + "create_revision": "", + "lease": "" +} +` + "```" + ` + +Where a ` + "`key`" + ` or ` + "`value`" + ` is only a string if it is valid UTF-8. +` + +func etcdConfigSpec() *service.ConfigSpec { + spec := service.NewConfigSpec(). + Beta(). + Categories("Services"). + Version("1.2.0"). + Summary("Configures an etcd event watcher on a key or prefix."). + Description(description). + Fields(etcdClientFields()...). + Fields(etcdWatchFields()...). + Field(service.NewAutoRetryNacksToggleField()) + + return spec +} + +func newEtcdWatchInput(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) { + reader, err := newEtcdWatchInputFromConfig(conf, mgr) + if err != nil { + return nil, err + } + + return service.AutoRetryNacksToggled(conf, reader) +} + +func init() { + err := service.RegisterInput("etcd", etcdConfigSpec(), newEtcdWatchInput) + if err != nil { + panic(err) + } +} + +type etcdWatchInput struct { + watchKey string // TODO: Potentially allow multiple keys/prefixes to be watched (multiplexing) + + client *clientv3.Client + clientConf *clientv3.Config + + watchCh clientv3.WatchChan + watchOptions []clientv3.OpOption +} + +func getWatchOptionsFromConfig(parsedConf *service.ParsedConfig) ([]clientv3.OpOption, error) { + var opts []clientv3.OpOption + + shouldAddToWatchOptions := func(should bool, option clientv3.OpOption) { + if should { + opts = append(opts, option) + } + } + + withPrefix, err := parsedConf.FieldBool(etcdOperationOptions, etcdWithPrefixField) + if err != nil { + return nil, err + } + shouldAddToWatchOptions(withPrefix, clientv3.WithPrefix()) + + withProgressNotify, err := parsedConf.FieldBool(etcdOperationOptions, etcdWatchWithProgressNotifyField) + if err != nil { + return nil, err + } + shouldAddToWatchOptions(withProgressNotify, clientv3.WithProgressNotify()) + + withCreatedNotify, err := parsedConf.FieldBool(etcdOperationOptions, etcdWatchWithCreatedNotifyField) + if err != nil { + return nil, err + } + shouldAddToWatchOptions(withCreatedNotify, clientv3.WithCreatedNotify()) + + withFilterPut, err := parsedConf.FieldBool(etcdOperationOptions, etcdWatchWithFilterPut) + if err != nil { + return nil, err + } + shouldAddToWatchOptions(withFilterPut, clientv3.WithFilterPut()) + + withFilterDelete, err := parsedConf.FieldBool(etcdOperationOptions, etcdWatchWithFilterDelete) + if err != nil { + return nil, err + } + shouldAddToWatchOptions(withFilterDelete, clientv3.WithFilterDelete()) + + withRange, err := parsedConf.FieldString(etcdOperationOptions, etcdWatchWithRangeField) + if err != nil { + return nil, err + } + shouldAddToWatchOptions(withRange != "", clientv3.WithRange(withRange)) + + return opts, nil +} + +func newEtcdWatchInputFromConfig(parsedConf *service.ParsedConfig, mgr *service.Resources) (*etcdWatchInput, error) { + config, err := newEtcdConfigFromParsed(parsedConf) + if err != nil { + return nil, err + } + + watchKey, err := parsedConf.FieldString(etcdKeyField) + if err != nil { + return nil, err + } + + opts, err := getWatchOptionsFromConfig(parsedConf) + if err != nil { + return nil, err + } + + return &etcdWatchInput{ + clientConf: config, + watchKey: watchKey, + watchOptions: opts, + }, nil +} + +func (e *etcdWatchInput) Connect(ctx context.Context) error { + client, err := newEtcdClientFromConfig(ctx, e.clientConf) + if err != nil { + return err + } + + e.client = client + e.watchCh = clientv3.NewWatcher(e.client).Watch(ctx, e.watchKey, e.watchOptions...) + + return nil +} + +func (e *etcdWatchInput) Read(ctx context.Context) (*service.Message, service.AckFunc, error) { + select { + case resp, open := <-e.watchCh: + if err := resp.Err(); err != nil { + return nil, nil, err + } + + if resp.Canceled { + return nil, nil, service.ErrEndOfInput + } + + if !open { + return nil, nil, service.ErrNotConnected + } + + msg := service.NewMessage(nil) + msg.SetStructured(etcdEventsToMap(resp.Events)) + + return msg, func(ctx context.Context, err error) error { + return err + }, nil + case <-ctx.Done(): + return nil, nil, ctx.Err() + } +} + +func (e *etcdWatchInput) Close(ctx context.Context) error { + if e.client != nil { + return e.client.Close() + } + + return nil +} diff --git a/internal/impl/etcd/util.go b/internal/impl/etcd/util.go new file mode 100644 index 000000000..84fd41afd --- /dev/null +++ b/internal/impl/etcd/util.go @@ -0,0 +1,36 @@ +package etcd + +import ( + "unicode/utf8" + + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func etcdEventsToMap(events []*clientv3.Event) []map[string]any { + eventsMap := make([]map[string]any, len(events)) + + for i, e := range events { + event := map[string]any{ + "key": e.Kv.Key, + "value": e.Kv.Value, + "type": mvccpb.Event_EventType_name[int32(e.Type)], + "version": e.Kv.Version, + "mod_revision": e.Kv.ModRevision, + "create_revision": e.Kv.CreateRevision, + "lease": e.Kv.Lease, + } + + if utf8.Valid(e.Kv.Key) { + event["key"] = string(e.Kv.Key) + } + + if utf8.Valid(e.Kv.Value) { + event["value"] = string(e.Kv.Value) + } + + eventsMap[i] = event + } + + return eventsMap +} diff --git a/public/components/all/package.go b/public/components/all/package.go index aabc6b204..c46897a4c 100644 --- a/public/components/all/package.go +++ b/public/components/all/package.go @@ -20,6 +20,7 @@ import ( _ "github.com/warpstreamlabs/bento/public/components/dgraph" _ "github.com/warpstreamlabs/bento/public/components/discord" _ "github.com/warpstreamlabs/bento/public/components/elasticsearch" + _ "github.com/warpstreamlabs/bento/public/components/etcd" _ "github.com/warpstreamlabs/bento/public/components/gcp" _ "github.com/warpstreamlabs/bento/public/components/hdfs" _ "github.com/warpstreamlabs/bento/public/components/influxdb" diff --git a/public/components/etcd/package.go b/public/components/etcd/package.go new file mode 100644 index 000000000..0f7467f37 --- /dev/null +++ b/public/components/etcd/package.go @@ -0,0 +1,6 @@ +package etcd + +import ( + // Bring in the internal plugin definitions. + _ "github.com/warpstreamlabs/bento/internal/impl/etcd" +) From 61d4f99fdd8668f27c16bcc8700f3ea47bd1d9c3 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 2 Jul 2024 20:10:29 +0200 Subject: [PATCH 2/3] etcd: Generate etcd docs for website --- website/docs/components/inputs/etcd.md | 486 +++++++++++++++++++++++++ 1 file changed, 486 insertions(+) create mode 100644 website/docs/components/inputs/etcd.md diff --git a/website/docs/components/inputs/etcd.md b/website/docs/components/inputs/etcd.md new file mode 100644 index 000000000..c8a795217 --- /dev/null +++ b/website/docs/components/inputs/etcd.md @@ -0,0 +1,486 @@ +--- +title: etcd +slug: etcd +type: input +status: beta +categories: ["Services"] +--- + + + +import Tabs from '@theme/Tabs'; +import TabItem from '@theme/TabItem'; + +:::caution BETA +This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found. +::: +Configures an etcd event watcher on a key or prefix. + +Introduced in version 1.2.0. + + + + + + +```yml +# Common config fields, showing default values +input: + label: "" + etcd: + endpoints: [] # No default (required) + auto_sync_interval: "" # No default (optional) + key: "" # No default (required) + options: + with_prefix: false + with_progress_notify: false + with_put_filter: false + with_delete_filter: false + with_created_notify: false + with_range: "" + auto_replay_nacks: true +``` + + + + +```yml +# All config fields, showing default values +input: + label: "" + etcd: + endpoints: [] # No default (required) + auth: + enabled: false + username: "" + password: "" + dial_timeout: 5s + keep_alive_time: 5s + keep_alive_timeout: 1s + request_timeout: 1s + tls: + enabled: false + skip_cert_verify: false + enable_renegotiation: false + root_cas: "" + root_cas_file: "" + client_certs: [] + auto_sync_interval: "" # No default (optional) + max_call_send_msg_size: 0 # No default (optional) + max_call_recv_msg_size: 0 # No default (optional) + reject_old_cluster: false + permit_without_stream: false + max_unary_retries: 0 # No default (optional) + backoff_wait_between: "" # No default (optional) + backoff_jitter_fraction: 0 # No default (optional) + key: "" # No default (required) + options: + with_prefix: false + with_progress_notify: false + with_put_filter: false + with_delete_filter: false + with_created_notify: false + with_range: "" + auto_replay_nacks: true +``` + + + + +Watches an etcd key or prefix for new events. + +From the [etcd docs](https://etcd.io/docs/v3.5/learning/api/#watch-api): +> The Watch API provides an event-based interface for asynchronously monitoring changes to keys. +> An etcd watch waits for changes to keys by continuously watching from a given revision, +> either current or historical, and streams key updates back to the client. +> +> Watches are long-running requests and use gRPC streams to stream event data. +> A watch stream is bi-directional; the client writes to the stream to establish watches and reads to receive watch events. +> A single watch stream can multiplex many distinct watches by tagging events with per-watch identifiers. +> This multiplexing helps reducing the memory footprint and connection overhead on the core etcd cluster. + +### Events + +Each event object is flattened and returned as an array, with each individual event in the form: + +```json +{ + "key": "", + "value": "", + "type": "<'PUT' or 'DELETE'>", + "version": "", + "mod_revision": "", + "create_revision": "", + "lease": "" +} +``` + +Where a `key` or `value` is only a string if it is valid UTF-8. + + +## Fields + +### `endpoints` + +A set of URLs (schemes, hosts and ports only) that can be used to communicate with a logical etcd cluster. If multiple endpoints are provided, the Client will attempt to use them all in the event that one or more of them are unusable. + + +Type: `array` + +```yml +# Examples + +endpoints: + - etcd://:2379 + +endpoints: + - etcd://localhost:2379 + +endpoints: + - etcd://localhost:2379 + - etcd://localhost:22379 + - etcd://localhost:32379 +``` + +### `auth` + +Optional configuration of etcd authentication headers. + + +Type: `object` + +### `auth.enabled` + +Whether to use password authentication + + +Type: `bool` +Default: `false` + +### `auth.username` + +The username to authenticate as. + + +Type: `string` +Default: `""` + +### `auth.password` + +The password to authenticate with. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +### `dial_timeout` + +Timeout for failing to establish a connection. + + +Type: `string` +Default: `"5s"` + +### `keep_alive_time` + +Time after which client pings the server to see if transport is alive. + + +Type: `string` +Default: `"5s"` + +### `keep_alive_timeout` + +Time that the client waits for a response for the keep-alive probe. If the response is not received in this time, the connection is closed. + + +Type: `string` +Default: `"1s"` + +### `request_timeout` + +Timeout for a single request. This includes connection time, any redirects, and header wait time. + + +Type: `string` +Default: `"1s"` + +### `tls` + +Custom TLS settings can be used to override system defaults. + + +Type: `object` + +### `tls.enabled` + +Whether custom TLS settings are enabled. + + +Type: `bool` +Default: `false` + +### `tls.skip_cert_verify` + +Whether to skip server side certificate verification. + + +Type: `bool` +Default: `false` + +### `tls.enable_renegotiation` + +Whether to allow the remote server to repeatedly request renegotiation. Enable this option if you're seeing the error message `local error: tls: no renegotiation`. + + +Type: `bool` +Default: `false` +Requires version 1.0.0 or newer + +### `tls.root_cas` + +An optional root certificate authority to use. This is a string, representing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +```yml +# Examples + +root_cas: |- + -----BEGIN CERTIFICATE----- + ... + -----END CERTIFICATE----- +``` + +### `tls.root_cas_file` + +An optional path of a root certificate authority file to use. This is a file, often with a .pem extension, containing a certificate chain from the parent trusted root certificate, to possible intermediate signing certificates, to the host certificate. + + +Type: `string` +Default: `""` + +```yml +# Examples + +root_cas_file: ./root_cas.pem +``` + +### `tls.client_certs` + +A list of client certificates to use. For each certificate either the fields `cert` and `key`, or `cert_file` and `key_file` should be specified, but not both. + + +Type: `array` +Default: `[]` + +```yml +# Examples + +client_certs: + - cert: foo + key: bar + +client_certs: + - cert_file: ./example.pem + key_file: ./example.key +``` + +### `tls.client_certs[].cert` + +A plain text certificate to use. + + +Type: `string` +Default: `""` + +### `tls.client_certs[].key` + +A plain text certificate key to use. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +### `tls.client_certs[].cert_file` + +The path of a certificate to use. + + +Type: `string` +Default: `""` + +### `tls.client_certs[].key_file` + +The path of a certificate key to use. + + +Type: `string` +Default: `""` + +### `tls.client_certs[].password` + +A plain text password for when the private key is password encrypted in PKCS#1 or PKCS#8 format. The obsolete `pbeWithMD5AndDES-CBC` algorithm is not supported for the PKCS#8 format. Warning: Since it does not authenticate the ciphertext, it is vulnerable to padding oracle attacks that can let an attacker recover the plaintext. +:::warning Secret +This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). +::: + + +Type: `string` +Default: `""` + +```yml +# Examples + +password: foo + +password: ${KEY_PASSWORD} +``` + +### `auto_sync_interval` + +The interval to update endpoints with its latest members. 0 disables auto-sync. By default auto-sync is disabled. + + +Type: `string` + +### `max_call_send_msg_size` + +The client-side request send limit in bytes. If 0, it defaults to 2.0 MiB (2 * 1024 * 1024). + + +Type: `int` + +### `max_call_recv_msg_size` + +The client-side response receive limit. If 0, it defaults to math.MaxInt32. + + +Type: `int` + +### `reject_old_cluster` + +When set, will refuse to create a client against an outdated cluster. + + +Type: `bool` +Default: `false` + +### `permit_without_stream` + +When set, will allow client to send keepalive pings to server without any active streams (RPCs). + + +Type: `bool` +Default: `false` + +### `max_unary_retries` + +The maximum number of retries for unary RPCs. + + +Type: `int` + +### `backoff_wait_between` + +The wait time before retrying an RPC. + + +Type: `string` + +### `backoff_jitter_fraction` + +The jitter fraction to randomize backoff wait time. + + +Type: `float` + +### `key` + +The key or prefix being watched. For prefix watching, options.with_prefix should be `true` + + +Type: `string` + +### `options` + +Collection of options to configure an etcd watcher. + + +Type: `object` + +### `options.with_prefix` + +Whether to watch for events on a prefix. + + +Type: `bool` +Default: `false` + +### `options.with_progress_notify` + +Whether to send periodic progress updates every 10 minutes when there is no incoming events. + + +Type: `bool` +Default: `false` + +### `options.with_put_filter` + +Whether to discard PUT events from the watcher. + + +Type: `bool` +Default: `false` + +### `options.with_delete_filter` + +Whether to discard DELETE events from the watcher. + + +Type: `bool` +Default: `false` + +### `options.with_created_notify` + +Whether to send CREATED notify events to the watcher. + + +Type: `bool` +Default: `false` + +### `options.with_range` + +Will cause the watcher to return a range of lexicographically sorted keys to return in the form `[key, end)` where `end` is the passed parameter. + + +Type: `string` +Default: `""` + +### `auto_replay_nacks` + +Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation. + + +Type: `bool` +Default: `true` + + From a7fa38d40221df9a5d021d029c3431ce4f0e294f Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 2 Jul 2024 20:09:57 +0200 Subject: [PATCH 3/3] etcd: Add integration tests for watching 'key' and 'prefix' --- internal/impl/etcd/integration_test.go | 258 +++++++++++++++++++++++++ 1 file changed, 258 insertions(+) create mode 100644 internal/impl/etcd/integration_test.go diff --git a/internal/impl/etcd/integration_test.go b/internal/impl/etcd/integration_test.go new file mode 100644 index 000000000..24811a96f --- /dev/null +++ b/internal/impl/etcd/integration_test.go @@ -0,0 +1,258 @@ +package etcd + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + clientv3 "go.etcd.io/etcd/client/v3" + + _ "github.com/warpstreamlabs/bento/public/components/pure" + "github.com/warpstreamlabs/bento/public/service" + "github.com/warpstreamlabs/bento/public/service/integration" +) + +func setupEtcd(t testing.TB) (*clientv3.Client, string) { + pool, err := dockertest.NewPool("") + require.NoError(t, err) + pool.MaxWait = time.Second * 60 + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "bitnami/etcd", + Tag: "latest", + Env: []string{ + "ALLOW_NONE_AUTHENTICATION=yes", + "ETCD_ADVERTISE_CLIENT_URLS=http://localhost:2379", + }, + }) + require.NoError(t, err) + + etcdDockerAddress := fmt.Sprintf("http://localhost:%s", resource.GetPort("2379/tcp")) + + _ = resource.Expire(900) + var etcdClient *clientv3.Client + require.NoError(t, pool.Retry(func() (err error) { + defer func() { + if err != nil { + t.Logf("error: %v", err) + } + }() + + etcdClient, err = clientv3.New(clientv3.Config{ + Endpoints: []string{etcdDockerAddress}, + DialTimeout: 5 * time.Second, + }) + + if err != nil { + return err + } + + // Check the health of the etcd cluster + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, err = etcdClient.Get(ctx, "health") + if err != nil { + return err + } + + return nil + })) + + t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + assert.NotNil(t, etcdClient) + assert.NoError(t, etcdClient.Close()) + }) + + return etcdClient, etcdDockerAddress + +} + +func TestIntegrationEtcd(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + // Each test will run and use it's own etcd Docker image + t.Run("watches_single_key", func(t *testing.T) { + client, address := setupEtcd(t) + testWatchSingleKey(t, address, client) + }) + + t.Run("watches_all_keys", func(t *testing.T) { + client, address := setupEtcd(t) + testWatchKeyPrefix(t, address, client) + }) + +} + +func testWatchSingleKey(t *testing.T, etcdDockerAddress string, etcdClient *clientv3.Client) { + template := fmt.Sprintf(` +etcd: + key: "foobar" + endpoints: + - %s`, etcdDockerAddress) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + require.NoError(t, streamOutBuilder.AddProcessorYAML(`mapping: 'root = content()'`)) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(ctx context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb.DeepCopy()) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + _ = streamOut.Run(context.Background()) + }() + + time.Sleep(time.Second) + + key := "foobar" + for i := 0; i < 100; i++ { + value := fmt.Sprintf("bar-%d", i) + + if _, err := etcdClient.Put(context.Background(), key, value); err != nil { + t.Error(err) + } + } + + if _, err = etcdClient.Delete(context.Background(), key); err != nil { + t.Error(err) + } + + var tmpSize int + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + tmpSize = len(outBatches) + // 100 PUTs and 1 DELETE + return tmpSize == 101 + }, time.Second*10, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + + finalDeleteEvent, err := outBatches[len(outBatches)-1][0].AsBytes() + require.NoError(t, err) + + for i := 0; i < len(outBatches)-1; i++ { + putEvent, err := outBatches[i][0].AsBytes() + require.NoError(t, err) + expected := fmt.Sprintf(`[{"create_revision":2,"key":"foobar","lease":0,"mod_revision":%d,"type":"PUT","value":"bar-%d","version":%d}]`, i+2, i, i+1) + require.Equal(t, expected, string(putEvent)) + } + require.Equal(t, `[{"create_revision":0,"key":"foobar","lease":0,"mod_revision":102,"type":"DELETE","value":"","version":0}]`, string(finalDeleteEvent)) + +} + +func testWatchKeyPrefix(t *testing.T, etcdDockerAddress string, etcdClient *clientv3.Client) { + template := fmt.Sprintf(` +etcd: + key: "" + endpoints: + - %s + options: + with_prefix: true`, etcdDockerAddress) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + require.NoError(t, streamOutBuilder.AddProcessorYAML(`mapping: 'root = content()'`)) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(ctx context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb.DeepCopy()) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + _ = streamOut.Run(context.Background()) + }() + + time.Sleep(time.Second) + prefixes := []string{"/foo", "/foo/bar"} + for _, prefix := range prefixes { + for i := 0; i < 100; i++ { + + value := fmt.Sprintf("bar-%d", i) + if _, err := etcdClient.Put(context.Background(), prefix, value); err != nil { + t.Error(err) + } + + } + } + + if _, err := etcdClient.Delete(context.Background(), "/foo", clientv3.WithPrefix()); err != nil { + t.Error(err) + } + + var tmpSize int + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + tmpSize = len(outBatches) + t.Logf("length = %d", tmpSize) + // 200 PUTs and 1 DELETE + return tmpSize == 201 + }, time.Second*10, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + + // First 100 with prefix /foo + for i := 0; i < 100; i++ { + putEvent, err := outBatches[i][0].AsBytes() + require.NoError(t, err) + + tmpl := `[{"create_revision":2,"key":"/foo","lease":0,"mod_revision":%d,"type":"PUT","value":"bar-%d","version":%d}]` + + expectedVersion := i + 1 + expectedModRevision := i + 2 + + expected := fmt.Sprintf(tmpl, expectedModRevision, i, expectedVersion) + require.Equal(t, expected, string(putEvent)) + } + + // Next 100 with prefix /foo/bar + for i := 0; i < 100; i++ { + offset := i + 100 + putEvent, err := outBatches[offset][0].AsBytes() + require.NoError(t, err) + + tmpl := `[{"create_revision":102,"key":"/foo/bar","lease":0,"mod_revision":%d,"type":"PUT","value":"bar-%d","version":%d}]` + + expectedVersion := i + 1 + expectedModRevision := offset + 2 + + expected := fmt.Sprintf(tmpl, expectedModRevision, i, expectedVersion) + require.Equal(t, expected, string(putEvent)) + } + + finalDeleteEvent, err := outBatches[len(outBatches)-1][0].AsBytes() + require.NoError(t, err) + + expectedDeleteEvent := "[" + `{"create_revision":0,"key":"/foo","lease":0,"mod_revision":202,"type":"DELETE","value":"","version":0}` + "," + + `{"create_revision":0,"key":"/foo/bar","lease":0,"mod_revision":202,"type":"DELETE","value":"","version":0}` + "]" + + require.Equal(t, expectedDeleteEvent, string(finalDeleteEvent)) +}