Skip to content

Commit

Permalink
etcd-component: Add input for watching an etcd key or prefix (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
gregfurman committed Aug 18, 2024
1 parent 60879af commit d61e12d
Show file tree
Hide file tree
Showing 10 changed files with 1,294 additions and 1 deletion.
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ require (
github.com/xitongsys/parquet-go v1.6.2
github.com/xitongsys/parquet-go-source v0.0.0-20211228015320-b4f792c43cd0
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a
go.etcd.io/etcd/api/v3 v3.5.14
go.etcd.io/etcd/client/v3 v3.5.14
go.mongodb.org/mongo-driver v1.13.1
go.nanomsg.org/mangos/v3 v3.4.2
go.opentelemetry.io/otel v1.24.0
Expand Down Expand Up @@ -207,6 +209,8 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cockroachdb/apd/v3 v3.2.1 // indirect
github.com/containerd/continuity v0.3.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.3.2 // indirect
github.com/couchbase/gocbcore/v10 v10.5.1 // indirect
github.com/couchbase/gocbcoreps v0.1.3 // indirect
github.com/couchbase/goprotostellar v1.0.2 // indirect
Expand Down Expand Up @@ -319,6 +323,8 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.etcd.io/bbolt v1.3.10 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect
Expand Down
14 changes: 13 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,12 @@ github.com/colinmarc/hdfs v1.1.3/go.mod h1:0DumPviB681UcSuJErAbDIOx6SIaJWj463Tym
github.com/colinmarc/hdfs/v2 v2.1.1/go.mod h1:M3x+k8UKKmxtFu++uAZ0OtDU8jR3jnaZIAc6yK4Ue0c=
github.com/containerd/continuity v0.3.0 h1:nisirsYROK15TAMVukJOUyGJjz4BNQJBVsNvAXZJ/eg=
github.com/containerd/continuity v0.3.0/go.mod h1:wJEAIwKOm/pBZuBd0JmeTvnLquTB1Ag8espWhkykbPM=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/couchbase/gocb/v2 v2.9.1 h1:yB2ZhRLk782Y9sZlATaUwglZe9+2QpvFmItJXTX4stQ=
github.com/couchbase/gocb/v2 v2.9.1/go.mod h1:TMAeK34yUdcASdV4mGcYuwtkAWckRBYN5uvMCEgPfXo=
github.com/couchbase/gocbcore/v10 v10.5.1 h1:bwlV/zv/fSQLuO14M9k49K7yWgcWfjSgMyfRGhW1AyU=
Expand Down Expand Up @@ -465,6 +469,7 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU=
github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
Expand Down Expand Up @@ -1068,8 +1073,15 @@ github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaD
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8=
go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/bbolt v1.3.10 h1:+BqfJTcCzTItrop8mq/lbzL8wSGtj94UO/3U31shqG0=
go.etcd.io/bbolt v1.3.10/go.mod h1:bK3UQLPJZly7IlNmV7uVHJDxfe5aK9Ll93e/74Y9oEQ=
go.etcd.io/etcd/api/v3 v3.5.14 h1:vHObSCxyB9zlF60w7qzAdTcGaglbJOpSj1Xj9+WGxq0=
go.etcd.io/etcd/api/v3 v3.5.14/go.mod h1:BmtWcRlQvwa1h3G2jvKYwIQy4PkHlDej5t7uLMUdJUU=
go.etcd.io/etcd/client/pkg/v3 v3.5.14 h1:SaNH6Y+rVEdxfpA2Jr5wkEvN6Zykme5+YnbCkxvuWxQ=
go.etcd.io/etcd/client/pkg/v3 v3.5.14/go.mod h1:8uMgAokyG1czCtIdsq+AGyYQMvpIKnSvPjFMunkgeZI=
go.etcd.io/etcd/client/v3 v3.5.14 h1:CWfRs4FDaDoSz81giL7zPpZH2Z35tbOrAJkkjMqOupg=
go.etcd.io/etcd/client/v3 v3.5.14/go.mod h1:k3XfdV/VIHy/97rqWjoUzrj9tk7GgJGH9J8L4dNXmAk=
go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g=
go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk=
go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo=
Expand Down
216 changes: 216 additions & 0 deletions internal/impl/etcd/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package etcd

import (
"context"
"errors"
"strings"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/warpstreamlabs/bento/public/service"
)

func etcdClientFields() []*service.ConfigField {
return []*service.ConfigField{
service.NewURLListField(etcdEndpointsField).
Description("A set of URLs (schemes, hosts and ports only) that can be used to communicate with a logical etcd cluster. If multiple endpoints are provided, the Client will attempt to use them all in the event that one or more of them are unusable.").
Examples(
[]string{"etcd://:2379"},
[]string{"etcd://localhost:2379"},
[]string{"etcd://localhost:2379", "etcd://localhost:22379", "etcd://localhost:32379"},
),
service.NewObjectField(etcdAuthField,
service.NewBoolField(etcdAuthEnabledField).
Description("Whether to use password authentication").
Default(false),
service.NewStringField(etcdAuthUsernameField).
Description("The username to authenticate as.").
Default(""),
service.NewStringField(etcdAuthPasswordField).
Description("The password to authenticate with.").
Secret().
Default(""),
).
Description("Optional configuration of etcd authentication headers.").
Optional().
Advanced(),
service.NewDurationField(etcdDialTimeoutField).
Description("Timeout for failing to establish a connection.").
Optional().
Default("5s").
Advanced(),
service.NewDurationField(etcdKeepAliveTimeField).
Description("Time after which client pings the server to see if transport is alive.").
Optional().
Default("5s").
Advanced(),
service.NewDurationField(etcdKeepAliveTimeoutField).
Description("Time that the client waits for a response for the keep-alive probe. If the response is not received in this time, the connection is closed.").
Optional().
Default("1s").
Advanced(),
service.NewDurationField(etcdRequestTimeoutField).
Description("Timeout for a single request. This includes connection time, any redirects, and header wait time.").
Optional().
Default("1s").
Advanced(),
service.NewTLSToggledField(etcdTLSField).
Description("Custom TLS settings can be used to override system defaults.").
Advanced(),
service.NewDurationField(etcdAutoSyncIntervalField).
Description("The interval to update endpoints with its latest members. 0 disables auto-sync. By default auto-sync is disabled.").
Optional(),
service.NewIntField(etcdMaxCallSendMsgSizeField).
Description("The client-side request send limit in bytes. If 0, it defaults to 2.0 MiB (2 * 1024 * 1024).").
Optional().
Advanced(),
service.NewIntField(etcdMaxCallRecvMsgSizeField).
Description("The client-side response receive limit. If 0, it defaults to math.MaxInt32.").
Optional().
Advanced(),
service.NewBoolField(etcdRejectOldClusterField).
Description("When set, will refuse to create a client against an outdated cluster.").
Default(false).
Advanced(),
service.NewBoolField(etcdPermitWithoutStreamField).
Description("When set, will allow client to send keepalive pings to server without any active streams (RPCs).").
Default(false).
Advanced(),
service.NewIntField(etcdMaxUnaryRetriesField).
Description("The maximum number of retries for unary RPCs.").
Optional().
Advanced(),
service.NewDurationField(etcdBackoffWaitBetweenField).
Description("The wait time before retrying an RPC.").
Optional().
Advanced(),
service.NewFloatField(etcdBackoffJitterFractionField).
Description("The jitter fraction to randomize backoff wait time.").
Optional().
Advanced(),
}
}

func newEtcdClientFromConfig(ctx context.Context, cfg *clientv3.Config) (*clientv3.Client, error) {
if cfg == nil {
return nil, errors.New("etcd config cannot be nil")
}

cfg.Context = ctx

client, err := clientv3.New(*cfg)
if err != nil {
return nil, err
}

return client, nil
}

func newEtcdConfigFromParsed(parsedConf *service.ParsedConfig) (*clientv3.Config, error) {
var cfg clientv3.Config

endpointStrs, err := parsedConf.FieldStringList(etcdEndpointsField)
if err != nil {
return nil, err
}
if len(endpointStrs) == 0 {
return nil, errors.New("must specify at least one URL")
}
for _, u := range endpointStrs {
for _, splitURL := range strings.Split(u, ",") {
if trimmed := strings.TrimSpace(splitURL); trimmed != "" {
cfg.Endpoints = append(cfg.Endpoints, trimmed)
}
}
}

if cfg.DialTimeout, err = parsedConf.FieldDuration(etcdDialTimeoutField); err != nil {
return nil, err
}

if cfg.DialKeepAliveTime, err = parsedConf.FieldDuration(etcdKeepAliveTimeField); err != nil {
return nil, err
}

if cfg.DialKeepAliveTimeout, err = parsedConf.FieldDuration(etcdKeepAliveTimeoutField); err != nil {
return nil, err
}

if parsedConf.Contains(etcdAutoSyncIntervalField) {
if cfg.AutoSyncInterval, err = parsedConf.FieldDuration(etcdAutoSyncIntervalField); err != nil {
return nil, err
}
}

if parsedConf.Contains(etcdMaxCallSendMsgSizeField) {
if cfg.MaxCallSendMsgSize, err = parsedConf.FieldInt(etcdMaxCallSendMsgSizeField); err != nil {
return nil, err
}
}

if parsedConf.Contains(etcdMaxCallRecvMsgSizeField) {
if cfg.MaxCallRecvMsgSize, err = parsedConf.FieldInt(etcdMaxCallRecvMsgSizeField); err != nil {
return nil, err
}
}

if cfg.RejectOldCluster, err = parsedConf.FieldBool(etcdRejectOldClusterField); err != nil {
return nil, err
}

if cfg.PermitWithoutStream, err = parsedConf.FieldBool(etcdPermitWithoutStreamField); err != nil {
return nil, err
}

if parsedConf.Contains(etcdMaxUnaryRetriesField) {
if maxUnaryRetries, err := parsedConf.FieldInt(etcdMaxUnaryRetriesField); err != nil {
return nil, err
} else {
cfg.MaxUnaryRetries = uint(maxUnaryRetries)
}
}

if parsedConf.Contains(etcdBackoffWaitBetweenField) {
if cfg.BackoffWaitBetween, err = parsedConf.FieldDuration(etcdBackoffWaitBetweenField); err != nil {
return nil, err
}
}

if parsedConf.Contains(etcdBackoffJitterFractionField) {
if backoffJitterFraction, err := parsedConf.FieldFloat(etcdBackoffJitterFractionField); err != nil {
return nil, err
} else {
cfg.BackoffJitterFraction = float64(backoffJitterFraction)
}
}

tlsConf, tlsEnabled, err := parsedConf.FieldTLSToggled(etcdTLSField)
if err != nil {
return nil, err
}

if tlsEnabled {
cfg.TLS = tlsConf
}

if parsedConf.Contains(etcdAuthField) {
var authEnabled bool
authConf := parsedConf.Namespace(etcdAuthField)
if authEnabled, err = authConf.FieldBool(etcdAuthEnabledField); err != nil {
return nil, err
}

if authEnabled {
if cfg.Username, err = authConf.FieldString(etcdAuthUsernameField); err != nil {
return nil, err
}
if cfg.Password, err = authConf.FieldString(etcdAuthPasswordField); err != nil {
return nil, err
}
}

}

return &cfg, nil

}
47 changes: 47 additions & 0 deletions internal/impl/etcd/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package etcd

// etcd client configuration
const (
etcdEndpointsField = "endpoints"
etcdTLSField = "tls"
etcdOperationOptions = "options"

// Auth
etcdAuthField = "auth"
etcdAuthEnabledField = "enabled"
etcdAuthUsernameField = "username"
etcdAuthPasswordField = "password"

// Timeouts
etcdDialTimeoutField = "dial_timeout"
etcdKeepAliveTimeField = "keep_alive_time"
etcdKeepAliveTimeoutField = "keep_alive_timeout"
etcdRequestTimeoutField = "request_timeout"

etcdAutoSyncIntervalField = "auto_sync_interval"

etcdMaxCallSendMsgSizeField = "max_call_send_msg_size"
etcdMaxCallRecvMsgSizeField = "max_call_recv_msg_size"
etcdMaxUnaryRetriesField = "max_unary_retries"

etcdRejectOldClusterField = "reject_old_cluster"
etcdPermitWithoutStreamField = "permit_without_stream"

etcdBackoffWaitBetweenField = "backoff_wait_between"
etcdBackoffJitterFractionField = "backoff_jitter_fraction"
)

// etcd common options
const (
etcdKeyField = "key"
)

// etcd input configuration
const (
etcdWithPrefixField = "with_prefix"
etcdWatchWithProgressNotifyField = "with_progress_notify"
etcdWatchWithCreatedNotifyField = "with_created_notify"
etcdWatchWithFilterPut = "with_put_filter"
etcdWatchWithFilterDelete = "with_delete_filter"
etcdWatchWithRangeField = "with_range"
)
Loading

0 comments on commit d61e12d

Please sign in to comment.