Skip to content

Commit

Permalink
client/integration: test v2 client one shot operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Anthony Romano committed Jul 7, 2016
1 parent 946b3cc commit c30f89f
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 23 deletions.
134 changes: 134 additions & 0 deletions client/integration/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"fmt"
"net/http"
"net/http/httptest"
"os"
"strings"
"sync/atomic"
"testing"

"golang.org/x/net/context"

"github.com/coreos/etcd/client"
"github.com/coreos/etcd/integration"
"github.com/coreos/etcd/pkg/testutil"
)

// TestV2NoRetryEOF tests destructive api calls won't retry on a disconnection.
func TestV2NoRetryEOF(t *testing.T) {
defer testutil.AfterTest(t)
// generate an EOF response; specify address so appears first in sorted ep list
lEOF := integration.NewListenerWithAddr(t, fmt.Sprintf("eof:123.%d.sock", os.Getpid()))
defer lEOF.Close()
tries := uint32(0)
go func() {
for {
conn, err := lEOF.Accept()
if err != nil {
return
}
atomic.AddUint32(&tries, 1)
conn.Close()
}
}()
eofURL := integration.UrlScheme + "://" + lEOF.Addr().String()
cli := integration.MustNewHTTPClient(t, []string{eofURL, eofURL}, nil)
kapi := client.NewKeysAPI(cli)
for i, f := range noRetryList(kapi) {
startTries := atomic.LoadUint32(&tries)
if err := f(); err == nil {
t.Errorf("#%d: expected EOF error, got nil", i)
}
endTries := atomic.LoadUint32(&tries)
if startTries+1 != endTries {
t.Errorf("#%d: expected 1 try, got %d", i, endTries-startTries)
}
}
}

// TestV2NoRetryNoLeader tests destructive api calls won't retry if given an error code.
func TestV2NoRetryNoLeader(t *testing.T) {
defer testutil.AfterTest(t)

lHttp := integration.NewListenerWithAddr(t, fmt.Sprintf("errHttp:123.%d.sock", os.Getpid()))
eh := &errHandler{errCode: http.StatusServiceUnavailable}
srv := httptest.NewUnstartedServer(eh)
defer lHttp.Close()
defer srv.Close()
srv.Listener = lHttp
go srv.Start()
lHttpURL := integration.UrlScheme + "://" + lHttp.Addr().String()

cli := integration.MustNewHTTPClient(t, []string{lHttpURL, lHttpURL}, nil)
kapi := client.NewKeysAPI(cli)
// test error code
for i, f := range noRetryList(kapi) {
reqs := eh.reqs
if err := f(); err == nil || !strings.Contains(err.Error(), "no leader") {
t.Errorf("#%d: expected \"no leader\", got %v", i, err)
}
if eh.reqs != reqs+1 {
t.Errorf("#%d: expected 1 request, got %d", i, eh.reqs-reqs)
}
}
}

// TestV2RetryRefuse tests destructive api calls will retry if a connection is refused.
func TestV2RetryRefuse(t *testing.T) {
defer testutil.AfterTest(t)
cl := integration.NewCluster(t, 1)
cl.Launch(t)
defer cl.Terminate(t)
// test connection refused; expect no error failover
cli := integration.MustNewHTTPClient(t, []string{integration.UrlScheme + "://refuseconn:123", cl.URL(0)}, nil)
kapi := client.NewKeysAPI(cli)
if _, err := kapi.Set(context.Background(), "/delkey", "def", nil); err != nil {
t.Fatal(err)
}
for i, f := range noRetryList(kapi) {
if err := f(); err != nil {
t.Errorf("#%d: unexpected retry failure (%v)", i, err)
}
}
}

type errHandler struct {
errCode int
reqs int
}

func (eh *errHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
req.Body.Close()
eh.reqs++
w.WriteHeader(eh.errCode)
}

func noRetryList(kapi client.KeysAPI) []func() error {
return []func() error{
func() error {
opts := &client.SetOptions{PrevExist: client.PrevNoExist}
_, err := kapi.Set(context.Background(), "/setkey", "bar", opts)
return err
},
func() error {
_, err := kapi.Delete(context.Background(), "/delkey", nil)
return err
},
}
}
20 changes: 20 additions & 0 deletions client/integration/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package integration

import (
"os"
"testing"

"github.com/coreos/etcd/pkg/testutil"
)

func TestMain(m *testing.M) {
v := m.Run()
if v == 0 && testutil.CheckLeakedGoroutine() {
os.Exit(1)
}
os.Exit(v)
}
26 changes: 13 additions & 13 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ const (
requestTimeout = 20 * time.Second

basePort = 21000
urlScheme = "unix"
urlSchemeTLS = "unixs"
UrlScheme = "unix"
UrlSchemeTLS = "unixs"
)

var (
Expand Down Expand Up @@ -96,9 +96,9 @@ func init() {

func schemeFromTLSInfo(tls *transport.TLSInfo) string {
if tls == nil {
return urlScheme
return UrlScheme
}
return urlSchemeTLS
return UrlSchemeTLS
}

func (c *cluster) fillClusterForMembers() error {
Expand Down Expand Up @@ -257,7 +257,7 @@ func (c *cluster) addMember(t *testing.T) {
}

func (c *cluster) addMemberByURL(t *testing.T, clientURL, peerURL string) error {
cc := mustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
cc := MustNewHTTPClient(t, []string{clientURL}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := ma.Add(ctx, peerURL); err != nil {
Expand All @@ -277,7 +277,7 @@ func (c *cluster) AddMember(t *testing.T) {

func (c *cluster) RemoveMember(t *testing.T, id uint64) {
// send remove request to the cluster
cc := mustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
cc := MustNewHTTPClient(t, c.URLs(), c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if err := ma.Remove(ctx, types.ID(id).String()); err != nil {
Expand Down Expand Up @@ -312,7 +312,7 @@ func (c *cluster) Terminate(t *testing.T) {

func (c *cluster) waitMembersMatch(t *testing.T, membs []client.Member) {
for _, u := range c.URLs() {
cc := mustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
cc := MustNewHTTPClient(t, []string{u}, c.cfg.ClientTLS)
ma := client.NewMembersAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
Expand Down Expand Up @@ -391,10 +391,10 @@ func isMembersEqual(membs []client.Member, wmembs []client.Member) bool {
func newLocalListener(t *testing.T) net.Listener {
c := atomic.AddInt64(&localListenCount, 1)
addr := fmt.Sprintf("127.0.0.1:%d.%d.sock", c+basePort, os.Getpid())
return newListenerWithAddr(t, addr)
return NewListenerWithAddr(t, addr)
}

func newListenerWithAddr(t *testing.T, addr string) net.Listener {
func NewListenerWithAddr(t *testing.T, addr string) net.Listener {
l, err := transport.NewUnixListener(addr)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -614,7 +614,7 @@ func (m *member) Launch() error {
}

func (m *member) WaitOK(t *testing.T) {
cc := mustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
cc := MustNewHTTPClient(t, []string{m.URL()}, m.ClientTLSInfo)
kapi := client.NewKeysAPI(cc)
for {
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
Expand Down Expand Up @@ -678,12 +678,12 @@ func (m *member) Restart(t *testing.T) error {
plog.Printf("restarting %s (%s)", m.Name, m.grpcAddr)
newPeerListeners := make([]net.Listener, 0)
for _, ln := range m.PeerListeners {
newPeerListeners = append(newPeerListeners, newListenerWithAddr(t, ln.Addr().String()))
newPeerListeners = append(newPeerListeners, NewListenerWithAddr(t, ln.Addr().String()))
}
m.PeerListeners = newPeerListeners
newClientListeners := make([]net.Listener, 0)
for _, ln := range m.ClientListeners {
newClientListeners = append(newClientListeners, newListenerWithAddr(t, ln.Addr().String()))
newClientListeners = append(newClientListeners, NewListenerWithAddr(t, ln.Addr().String()))
}
m.ClientListeners = newClientListeners

Expand All @@ -708,7 +708,7 @@ func (m *member) Terminate(t *testing.T) {
plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr)
}

func mustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client {
cfgtls := transport.TLSInfo{}
if tls != nil {
cfgtls = *tls
Expand Down
16 changes: 8 additions & 8 deletions integration/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func testClusterUsingDiscovery(t *testing.T, size int) {
dc.Launch(t)
defer dc.Terminate(t)
// init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
dcc := MustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", size)); err != nil {
Expand All @@ -90,7 +90,7 @@ func TestTLSClusterOf3UsingDiscovery(t *testing.T) {
dc.Launch(t)
defer dc.Terminate(t)
// init discovery token space
dcc := mustNewHTTPClient(t, dc.URLs(), nil)
dcc := MustNewHTTPClient(t, dc.URLs(), nil)
dkapi := client.NewKeysAPI(dcc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := dkapi.Create(ctx, "/_config/size", fmt.Sprintf("%d", 3)); err != nil {
Expand Down Expand Up @@ -157,7 +157,7 @@ func testDecreaseClusterSize(t *testing.T, size int) {
func TestForceNewCluster(t *testing.T) {
c := NewCluster(t, 3)
c.Launch(t)
cc := mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
cc := MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
resp, err := kapi.Create(ctx, "/foo", "bar")
Expand All @@ -184,7 +184,7 @@ func TestForceNewCluster(t *testing.T) {
c.waitLeader(t, c.Members[:1])

// use new http client to init new connection
cc = mustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
cc = MustNewHTTPClient(t, []string{c.Members[0].URL()}, nil)
kapi = client.NewKeysAPI(cc)
// ensure force restart keep the old data, and new cluster can make progress
ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestIssue2904(t *testing.T) {
c.Members[1].Stop(t)

// send remove member-1 request to the cluster.
cc := mustNewHTTPClient(t, c.URLs(), nil)
cc := MustNewHTTPClient(t, c.URLs(), nil)
ma := client.NewMembersAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
// the proposal is not committed because member 1 is stopped, but the
Expand Down Expand Up @@ -337,7 +337,7 @@ func TestIssue3699(t *testing.T) {
c.waitLeader(t, c.Members)

// try to participate in cluster
cc := mustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
cc := MustNewHTTPClient(t, []string{c.URL(0)}, c.cfg.ClientTLS)
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := kapi.Set(ctx, "/foo", "bar", nil); err != nil {
Expand All @@ -350,7 +350,7 @@ func TestIssue3699(t *testing.T) {
// a random key first, and check the new key could be got from all client urls
// of the cluster.
func clusterMustProgress(t *testing.T, membs []*member) {
cc := mustNewHTTPClient(t, []string{membs[0].URL()}, nil)
cc := MustNewHTTPClient(t, []string{membs[0].URL()}, nil)
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", rand.Int())
Expand All @@ -362,7 +362,7 @@ func clusterMustProgress(t *testing.T, membs []*member) {

for i, m := range membs {
u := m.URL()
mcc := mustNewHTTPClient(t, []string{u}, nil)
mcc := MustNewHTTPClient(t, []string{u}, nil)
mkapi := client.NewKeysAPI(mcc)
mctx, mcancel := context.WithTimeout(context.Background(), requestTimeout)
if _, err := mkapi.Watcher(key, &client.WatcherOptions{AfterIndex: resp.Node.ModifiedIndex - 1}).Next(mctx); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions integration/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {
resps := make([]*client.Response, 120)
var err error
for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i)
Expand All @@ -108,7 +108,7 @@ func TestSnapshotAndRestartMember(t *testing.T) {

m.WaitOK(t)
for i := 0; i < 120; i++ {
cc := mustNewHTTPClient(t, []string{m.URL()}, nil)
cc := MustNewHTTPClient(t, []string{m.URL()}, nil)
kapi := client.NewKeysAPI(cc)
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
key := fmt.Sprintf("foo%d", i)
Expand Down
1 change: 1 addition & 0 deletions test
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ function integration_tests {
intpid="$!"
wait $e2epid
wait $intpid
go test -timeout 1m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/client/integration
go test -timeout 10m -v ${RACE} -cpu 1,2,4 $@ ${REPO_PATH}/clientv3/integration
go test -timeout 1m -v -cpu 1,2,4 $@ ${REPO_PATH}/contrib/raftexample
go test -timeout 1m -v ${RACE} -cpu 1,2,4 -run=Example $@ ${TEST}
Expand Down

0 comments on commit c30f89f

Please sign in to comment.