From 7b14e67a6e08f7a938feaa49af9cb084aaa5eccc Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 2 Jul 2024 20:09:57 +0200 Subject: [PATCH] etcd: Add integration tests for watching 'key' and 'prefix' --- internal/impl/etcd/client.go | 3 +- internal/impl/etcd/input.go | 3 +- internal/impl/etcd/integration_test.go | 203 +++++++++++++++++++++++++ 3 files changed, 207 insertions(+), 2 deletions(-) create mode 100644 internal/impl/etcd/integration_test.go diff --git a/internal/impl/etcd/client.go b/internal/impl/etcd/client.go index a1bdbe105..7e40bc8f6 100644 --- a/internal/impl/etcd/client.go +++ b/internal/impl/etcd/client.go @@ -5,8 +5,9 @@ import ( "errors" "strings" - "github.com/warpstreamlabs/bento/public/service" clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/warpstreamlabs/bento/public/service" ) func etcdClientFields() []*service.ConfigField { diff --git a/internal/impl/etcd/input.go b/internal/impl/etcd/input.go index fe7342392..0b1ad0574 100644 --- a/internal/impl/etcd/input.go +++ b/internal/impl/etcd/input.go @@ -3,8 +3,9 @@ package etcd import ( "context" - "github.com/warpstreamlabs/bento/public/service" clientv3 "go.etcd.io/etcd/client/v3" + + "github.com/warpstreamlabs/bento/public/service" ) func etcdWatchFields() []*service.ConfigField { diff --git a/internal/impl/etcd/integration_test.go b/internal/impl/etcd/integration_test.go new file mode 100644 index 000000000..f0feee3e5 --- /dev/null +++ b/internal/impl/etcd/integration_test.go @@ -0,0 +1,203 @@ +package etcd + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/ory/dockertest/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + clientv3 "go.etcd.io/etcd/client/v3" + + _ "github.com/warpstreamlabs/bento/public/components/pure" + "github.com/warpstreamlabs/bento/public/service" + "github.com/warpstreamlabs/bento/public/service/integration" +) + +var ( + etcdClient *clientv3.Client + etcdDockerAddress string +) + +func TestIntegrationEtcd(t *testing.T) { + integration.CheckSkip(t) + t.Parallel() + + pool, err := dockertest.NewPool("") + require.NoError(t, err) + pool.MaxWait = time.Second * 60 + + resource, err := pool.RunWithOptions(&dockertest.RunOptions{ + Repository: "bitnami/etcd", + Tag: "latest", + ExposedPorts: []string{"2379", "2380"}, + Env: []string{ + "ALLOW_NONE_AUTHENTICATION=yes", + "ETCD_ADVERTISE_CLIENT_URLS=http://localhost:2379", + }, + }) + require.NoError(t, err) + + defer t.Cleanup(func() { + assert.NoError(t, pool.Purge(resource)) + }) + + etcdDockerAddress = fmt.Sprintf("http://localhost:%s", resource.GetPort("2379/tcp")) + + _ = resource.Expire(900) + require.NoError(t, pool.Retry(func() (err error) { + defer func() { + if err != nil { + t.Logf("error: %v", err) + } + }() + + etcdClient, err = clientv3.New(clientv3.Config{ + Endpoints: []string{etcdDockerAddress}, + DialTimeout: 5 * time.Second, + }) + + if err != nil { + return err + } + + // Check the health of the etcd cluster + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + _, err = etcdClient.Get(ctx, "health") + if err != nil { + return err + } + + return nil + })) + + defer t.Cleanup(func() { + etcdClient.Close() + }) + + t.Run("watches_single_key", testWatchSingleKey) + t.Run("watches_all_keys", testWatchKeyPrefix) + +} + +func testWatchSingleKey(t *testing.T) { + template := fmt.Sprintf(` +etcd: + key: "foobar" + endpoints: + - %s`, etcdDockerAddress) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + require.NoError(t, streamOutBuilder.AddProcessorYAML(`mapping: 'root = content()'`)) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(ctx context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb.DeepCopy()) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + _ = streamOut.Run(context.Background()) + }() + + time.Sleep(time.Second) + for i := 0; i < 100; i++ { + key := "foobar" + value := fmt.Sprintf("bar-%d", i) + + if _, err := etcdClient.Put(context.Background(), key, value); err != nil { + t.Error(err) + } + } + + if _, err = etcdClient.Delete(context.Background(), "foobar"); err != nil { + t.Error(err) + } + + var tmpSize int + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + tmpSize = len(outBatches) + // 100 PUTs and 1 DELETE + return tmpSize == 101 + }, time.Second*10, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) +} + +func testWatchKeyPrefix(t *testing.T) { + template := fmt.Sprintf(` +etcd: + key: "" + endpoints: + - %s + options: + with_prefix: true`, etcdDockerAddress) + + streamOutBuilder := service.NewStreamBuilder() + require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`)) + require.NoError(t, streamOutBuilder.AddInputYAML(template)) + require.NoError(t, streamOutBuilder.AddProcessorYAML(`mapping: 'root = content()'`)) + + var outBatches []service.MessageBatch + var outBatchMut sync.Mutex + + require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(ctx context.Context, mb service.MessageBatch) error { + outBatchMut.Lock() + outBatches = append(outBatches, mb.DeepCopy()) + outBatchMut.Unlock() + return nil + })) + + streamOut, err := streamOutBuilder.Build() + require.NoError(t, err) + + go func() { + _ = streamOut.Run(context.Background()) + }() + + time.Sleep(time.Second) + prefixes := []string{"/foo", "/foo/bar"} + for i := 0; i < 100; i++ { + value := fmt.Sprintf("bar-%d", i) + + for _, prefix := range prefixes { + if _, err := etcdClient.Put(context.Background(), fmt.Sprintf("%s/%d", prefix, i), value); err != nil { + t.Error(err) + } + } + + } + + if _, err := etcdClient.Delete(context.Background(), "/foo", clientv3.WithPrefix()); err != nil { + t.Error(err) + } + + var tmpSize int + assert.Eventually(t, func() bool { + outBatchMut.Lock() + defer outBatchMut.Unlock() + tmpSize = len(outBatches) + t.Logf("length = %d", tmpSize) + // 200 PUTs and 1 DELETE + return tmpSize == 201 + }, time.Second*10, time.Millisecond*100) + + require.NoError(t, streamOut.StopWithin(time.Second*10)) + +}