Skip to content

Commit

Permalink
test: add test case for watch concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
nic-chen committed Jun 4, 2022
1 parent c784df9 commit 6ce005b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 5 deletions.
70 changes: 66 additions & 4 deletions tests/e2e/v3_curl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,28 @@ import (
"encoding/base64"
"encoding/json"
"fmt"

"math/rand"
"path"
"strconv"
"sync"
"testing"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/stretchr/testify/assert"

"go.etcd.io/etcd/api/v3/authpb"
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/pkg/v3/testutil"
epb "go.etcd.io/etcd/server/v3/etcdserver/api/v3election/v3electionpb"
"go.etcd.io/etcd/tests/v3/framework/e2e"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
)

var apiPrefix = []string{"/v3"}
var (
apiPrefix = []string{"/v3"}
maxConcurrent uint32 = 3
)

func TestV3CurlPutGetNoTLS(t *testing.T) {
for _, p := range apiPrefix {
Expand All @@ -60,6 +66,20 @@ func TestV3CurlPutGetClientTLS(t *testing.T) {
testCtl(t, testV3CurlPutGet, withApiPrefix(p), withCfg(*e2e.NewConfigClientTLS()))
}
}
func TestV3CurlWatchConcurrent(t *testing.T) {
cfg := *e2e.NewConfigNoTLS()
cfg.MaxConcurrentStreams = maxConcurrent
for _, p := range apiPrefix {
testCtl(t, testV3CurlWatchConcurrent, withApiPrefix(p), withCfg(cfg))
}
}
func TestV3CurlWatchConcurrentClientTLS(t *testing.T) {
cfg := *e2e.NewConfigClientTLS()
cfg.MaxConcurrentStreams = maxConcurrent
for _, p := range apiPrefix {
testCtl(t, testV3CurlWatchConcurrent, withApiPrefix(p), withCfg(cfg))
}
}
func TestV3CurlWatch(t *testing.T) {
for _, p := range apiPrefix {
testCtl(t, testV3CurlWatch, withApiPrefix(p))
Expand Down Expand Up @@ -145,6 +165,48 @@ func testV3CurlWatch(cx ctlCtx) {
}
}

func testV3CurlWatchConcurrent(cx ctlCtx) {
// store "bar" into "foo"
putreq, err := json.Marshal(&pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
cx.t.Fatal(err)
}
// watch for first update to "foo"
wcr := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}
wreq, err := json.Marshal(wcr)
if err != nil {
cx.t.Fatal(err)
}
// marshaling the grpc to json gives:
// "{"RequestUnion":{"CreateRequest":{"key":"Zm9v","start_revision":1}}}"
// but the gprc-gateway expects a different format..
wstr := `{"create_request" : ` + string(wreq) + "}"
p := cx.apiPrefix

// concurrent watch
var wg sync.WaitGroup
excess := 2
wg.Add(int(maxConcurrent) + excess)
errCount := 0
for i := 0; i < int(maxConcurrent)+excess; i++ {
go func() {
defer wg.Done()
// expects "bar", timeout after 2 seconds since stream waits forever
if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/watch"), Value: wstr, Expected: `"YmFy"`, Timeout: 2}); err != nil {
errCount += 1
}
}()
}
// update
if err = e2e.CURLPost(cx.epc, e2e.CURLReq{Endpoint: path.Join(p, "/kv/put"), Value: string(putreq), Expected: "revision"}); err != nil {
cx.t.Fatalf("failed testV3CurlWatch put with curl using prefix (%s) (%v)", p, err)
}
// wait for watch
wg.Wait()
// assert
assert.Equal(cx.t, excess, errCount)
}

func testV3CurlTxn(cx ctlCtx) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
Expand Down Expand Up @@ -209,7 +271,7 @@ func testV3CurlAuth(cx ctlCtx) {
cx.t.Fatalf("failed testV3CurlAuth create role with curl using prefix (%s) (%v)", p, err)
}

//grant root role
// grant root role
for i := 0; i < len(usernames); i++ {
grantroleroot, err := json.Marshal(&pb.AuthUserGrantRoleRequest{User: usernames[i], Role: "root"})
testutil.AssertNil(cx.t, err)
Expand Down
9 changes: 8 additions & 1 deletion tests/framework/e2e/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ type EtcdProcessClusterConfig struct {

MetricsURLScheme string

SnapshotCount int // default is 10000
SnapshotCount int // default is 10000
MaxConcurrentStreams uint32 // default is math.MaxUint32

ClientTLS ClientConnType
ClientCertAuthEnabled bool
Expand Down Expand Up @@ -341,6 +342,12 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfigs(tb testing.TB) []*
args = append(args, "--log-level", cfg.LogLevel)
}

if cfg.MaxConcurrentStreams != 0 {
args = append(args, "--max-concurrent-streams",
fmt.Sprintf("%d", cfg.MaxConcurrentStreams),
)
}

etcdCfgs[i] = &EtcdServerProcessConfig{
lg: lg,
ExecPath: cfg.ExecPath,
Expand Down

0 comments on commit 6ce005b

Please sign in to comment.