Skip to content

Commit

Permalink
etcd: Add integration tests for watching 'key' and 'prefix'
Browse files Browse the repository at this point in the history
  • Loading branch information
gregfurman committed Jul 5, 2024
1 parent c5a58d6 commit 7b14e67
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 2 deletions.
3 changes: 2 additions & 1 deletion internal/impl/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"errors"
"strings"

"github.com/warpstreamlabs/bento/public/service"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/warpstreamlabs/bento/public/service"
)

func etcdClientFields() []*service.ConfigField {
Expand Down
3 changes: 2 additions & 1 deletion internal/impl/etcd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package etcd
import (
"context"

"github.com/warpstreamlabs/bento/public/service"
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/warpstreamlabs/bento/public/service"
)

func etcdWatchFields() []*service.ConfigField {
Expand Down
203 changes: 203 additions & 0 deletions internal/impl/etcd/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package etcd

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

clientv3 "go.etcd.io/etcd/client/v3"

_ "github.com/warpstreamlabs/bento/public/components/pure"
"github.com/warpstreamlabs/bento/public/service"
"github.com/warpstreamlabs/bento/public/service/integration"
)

var (
etcdClient *clientv3.Client
etcdDockerAddress string
)

func TestIntegrationEtcd(t *testing.T) {
integration.CheckSkip(t)
t.Parallel()

pool, err := dockertest.NewPool("")
require.NoError(t, err)
pool.MaxWait = time.Second * 60

resource, err := pool.RunWithOptions(&dockertest.RunOptions{
Repository: "bitnami/etcd",
Tag: "latest",
ExposedPorts: []string{"2379", "2380"},
Env: []string{
"ALLOW_NONE_AUTHENTICATION=yes",
"ETCD_ADVERTISE_CLIENT_URLS=http://localhost:2379",
},
})
require.NoError(t, err)

defer t.Cleanup(func() {
assert.NoError(t, pool.Purge(resource))
})

etcdDockerAddress = fmt.Sprintf("http://localhost:%s", resource.GetPort("2379/tcp"))

_ = resource.Expire(900)
require.NoError(t, pool.Retry(func() (err error) {
defer func() {
if err != nil {
t.Logf("error: %v", err)
}
}()

etcdClient, err = clientv3.New(clientv3.Config{
Endpoints: []string{etcdDockerAddress},
DialTimeout: 5 * time.Second,
})

if err != nil {
return err
}

// Check the health of the etcd cluster
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err = etcdClient.Get(ctx, "health")
if err != nil {
return err
}

return nil
}))

defer t.Cleanup(func() {
etcdClient.Close()
})

t.Run("watches_single_key", testWatchSingleKey)
t.Run("watches_all_keys", testWatchKeyPrefix)

}

func testWatchSingleKey(t *testing.T) {
template := fmt.Sprintf(`
etcd:
key: "foobar"
endpoints:
- %s`, etcdDockerAddress)

streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`))
require.NoError(t, streamOutBuilder.AddInputYAML(template))
require.NoError(t, streamOutBuilder.AddProcessorYAML(`mapping: 'root = content()'`))

var outBatches []service.MessageBatch
var outBatchMut sync.Mutex

require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(ctx context.Context, mb service.MessageBatch) error {
outBatchMut.Lock()
outBatches = append(outBatches, mb.DeepCopy())
outBatchMut.Unlock()
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

go func() {
_ = streamOut.Run(context.Background())
}()

time.Sleep(time.Second)
for i := 0; i < 100; i++ {
key := "foobar"
value := fmt.Sprintf("bar-%d", i)

if _, err := etcdClient.Put(context.Background(), key, value); err != nil {
t.Error(err)
}
}

if _, err = etcdClient.Delete(context.Background(), "foobar"); err != nil {
t.Error(err)
}

var tmpSize int
assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
tmpSize = len(outBatches)
// 100 PUTs and 1 DELETE
return tmpSize == 101
}, time.Second*10, time.Millisecond*100)

require.NoError(t, streamOut.StopWithin(time.Second*10))
}

func testWatchKeyPrefix(t *testing.T) {
template := fmt.Sprintf(`
etcd:
key: ""
endpoints:
- %s
options:
with_prefix: true`, etcdDockerAddress)

streamOutBuilder := service.NewStreamBuilder()
require.NoError(t, streamOutBuilder.SetLoggerYAML(`level: OFF`))
require.NoError(t, streamOutBuilder.AddInputYAML(template))
require.NoError(t, streamOutBuilder.AddProcessorYAML(`mapping: 'root = content()'`))

var outBatches []service.MessageBatch
var outBatchMut sync.Mutex

require.NoError(t, streamOutBuilder.AddBatchConsumerFunc(func(ctx context.Context, mb service.MessageBatch) error {
outBatchMut.Lock()
outBatches = append(outBatches, mb.DeepCopy())
outBatchMut.Unlock()
return nil
}))

streamOut, err := streamOutBuilder.Build()
require.NoError(t, err)

go func() {
_ = streamOut.Run(context.Background())
}()

time.Sleep(time.Second)
prefixes := []string{"/foo", "/foo/bar"}
for i := 0; i < 100; i++ {
value := fmt.Sprintf("bar-%d", i)

for _, prefix := range prefixes {
if _, err := etcdClient.Put(context.Background(), fmt.Sprintf("%s/%d", prefix, i), value); err != nil {
t.Error(err)
}
}

}

if _, err := etcdClient.Delete(context.Background(), "/foo", clientv3.WithPrefix()); err != nil {
t.Error(err)
}

var tmpSize int
assert.Eventually(t, func() bool {
outBatchMut.Lock()
defer outBatchMut.Unlock()
tmpSize = len(outBatches)
t.Logf("length = %d", tmpSize)
// 200 PUTs and 1 DELETE
return tmpSize == 201
}, time.Second*10, time.Millisecond*100)

require.NoError(t, streamOut.StopWithin(time.Second*10))

}

0 comments on commit 7b14e67

Please sign in to comment.