Skip to content

Commit

Permalink
integration: backport clientv3 test fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Nov 14, 2017
1 parent 284c43f commit 24ba237
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 48 deletions.
67 changes: 57 additions & 10 deletions integration/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration
import (
"fmt"
"io"
"io/ioutil"
"net"
"sync"

Expand All @@ -31,21 +32,23 @@ type bridge struct {
l net.Listener
conns map[*bridgeConn]struct{}

stopc chan struct{}
pausec chan struct{}
wg sync.WaitGroup
stopc chan struct{}
pausec chan struct{}
blackholec chan struct{}
wg sync.WaitGroup

mu sync.Mutex
}

func newBridge(addr string) (*bridge, error) {
b := &bridge{
// bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number
inaddr: addr + "0",
outaddr: addr,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
inaddr: addr + "0",
outaddr: addr,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
blackholec: make(chan struct{}),
}
close(b.pausec)

Expand Down Expand Up @@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
io.Copy(bc.out, bc.in)
b.ioCopy(bc, bc.out, bc.in)
bc.close()
wg.Done()
}()
go func() {
io.Copy(bc.in, bc.out)
b.ioCopy(bc, bc.in, bc.out)
bc.close()
wg.Done()
}()
Expand All @@ -179,3 +182,47 @@ func (bc *bridgeConn) close() {
bc.in.Close()
bc.out.Close()
}

func (b *bridge) Blackhole() {
b.mu.Lock()
close(b.blackholec)
b.mu.Unlock()
}

func (b *bridge) Unblackhole() {
b.mu.Lock()
for bc := range b.conns {
bc.Close()
}
b.conns = make(map[*bridgeConn]struct{})
b.blackholec = make(chan struct{})
b.mu.Unlock()
}

// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer
func (b *bridge) ioCopy(bc *bridgeConn, dst io.Writer, src io.Reader) (err error) {
buf := make([]byte, 32*1024)
for {
select {
case <-b.blackholec:
io.Copy(ioutil.Discard, src)
return nil
default:
}
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if ew != nil {
return ew
}
if nr != nw {
return io.ErrShortWrite
}
}
if er != nil {
err = er
break
}
}
return
}
93 changes: 65 additions & 28 deletions integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ import (
"testing"
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/coreos/etcd/client"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver"
Expand All @@ -50,7 +47,11 @@ import (
"github.com/coreos/etcd/pkg/transport"
"github.com/coreos/etcd/pkg/types"
"github.com/coreos/etcd/rafthttp"

"github.com/coreos/pkg/capnslog"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

const (
Expand Down Expand Up @@ -88,12 +89,19 @@ var (
)

type ClusterConfig struct {
Size int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
DiscoveryURL string
UseGRPC bool
QuotaBackendBytes int64
Size int
PeerTLS *transport.TLSInfo
ClientTLS *transport.TLSInfo
DiscoveryURL string
UseGRPC bool
QuotaBackendBytes int64
MaxTxnOps uint
MaxRequestBytes uint
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
// SkipCreatingClient to skip creating clients for each member.
SkipCreatingClient bool
}

type cluster struct {
Expand Down Expand Up @@ -221,10 +229,14 @@ func (c *cluster) HTTPMembers() []client.Member {
func (c *cluster) mustNewMember(t *testing.T) *member {
m := mustNewMember(t,
memberConfig{
name: c.name(rand.Int()),
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
name: c.name(rand.Int()),
peerTLS: c.cfg.PeerTLS,
clientTLS: c.cfg.ClientTLS,
quotaBackendBytes: c.cfg.QuotaBackendBytes,
maxRequestBytes: c.cfg.MaxRequestBytes,
grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime,
grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval,
grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout,
})
m.DiscoveryURL = c.cfg.DiscoveryURL
if c.cfg.UseGRPC {
Expand Down Expand Up @@ -474,9 +486,10 @@ type member struct {
s *etcdserver.EtcdServer
hss []*httptest.Server

grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge
grpcServerOpts []grpc.ServerOption
grpcServer *grpc.Server
grpcAddr string
grpcBridge *bridge

// serverClient is a clientv3 that directly calls the etcdserver.
serverClient *clientv3.Client
Expand All @@ -487,10 +500,14 @@ type member struct {
func (m *member) GRPCAddr() string { return m.grpcAddr }

type memberConfig struct {
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
quotaBackendBytes int64
name string
peerTLS *transport.TLSInfo
clientTLS *transport.TLSInfo
quotaBackendBytes int64
maxRequestBytes uint
grpcKeepAliveMinTime time.Duration
grpcKeepAliveInterval time.Duration
grpcKeepAliveTimeout time.Duration
}

// mustNewMember return an inited member with the given name. If peerTLS is
Expand Down Expand Up @@ -539,6 +556,21 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member {
m.TickMs = uint(tickDuration / time.Millisecond)
m.QuotaBackendBytes = mcfg.quotaBackendBytes
m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough

m.grpcServerOpts = []grpc.ServerOption{}
if mcfg.grpcKeepAliveMinTime > time.Duration(0) {
m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: mcfg.grpcKeepAliveMinTime,
PermitWithoutStream: false,
}))
}
if mcfg.grpcKeepAliveInterval > time.Duration(0) &&
mcfg.grpcKeepAliveTimeout > time.Duration(0) {
m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{
Time: mcfg.grpcKeepAliveInterval,
Timeout: mcfg.grpcKeepAliveTimeout,
}))
}
return m
}

Expand Down Expand Up @@ -567,6 +599,8 @@ func (m *member) electionTimeout() time.Duration {
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }
func (m *member) Blackhole() { m.grpcBridge.Blackhole() }
func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() }

// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
Expand Down Expand Up @@ -676,7 +710,7 @@ func (m *member) Launch() error {
return err
}
}
m.grpcServer = v3rpc.Server(m.s, tlscfg)
m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...)
m.serverClient = v3client.New(m.s)
lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient))
epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient))
Expand Down Expand Up @@ -824,15 +858,15 @@ func (m *member) Metric(metricName string) (string, error) {
}

// InjectPartition drops connections from m to others, vice versa.
func (m *member) InjectPartition(t *testing.T, others []*member) {
func (m *member) InjectPartition(t *testing.T, others ...*member) {
for _, other := range others {
m.s.CutPeer(other.s.ID())
other.s.CutPeer(m.s.ID())
}
}

// RecoverPartition recovers connections from m to others, vice versa.
func (m *member) RecoverPartition(t *testing.T, others []*member) {
func (m *member) RecoverPartition(t *testing.T, others ...*member) {
for _, other := range others {
m.s.MendPeer(other.s.ID())
other.s.MendPeer(m.s.ID())
Expand Down Expand Up @@ -884,12 +918,15 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 {
cluster: NewClusterByConfig(t, cfg),
}
clus.Launch(t)
for _, m := range clus.Members {
client, err := NewClientV3(m)
if err != nil {
t.Fatalf("cannot create client: %v", err)

if !cfg.SkipCreatingClient {
for _, m := range clus.Members {
client, err := NewClientV3(m)
if err != nil {
t.Fatalf("cannot create client: %v", err)
}
clus.clients = append(clus.clients, client)
}
clus.clients = append(clus.clients, client)
}

return clus
Expand Down
4 changes: 2 additions & 2 deletions integration/network_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ func getMembersByIndexSlice(clus *cluster, idxs []int) []*member {

func injectPartition(t *testing.T, src, others []*member) {
for _, m := range src {
m.InjectPartition(t, others)
m.InjectPartition(t, others...)
}
}

func recoverPartition(t *testing.T, src, others []*member) {
for _, m := range src {
m.RecoverPartition(t, others)
m.RecoverPartition(t, others...)
}
}
12 changes: 6 additions & 6 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) {
// nil out TLS field so client will use an insecure connection
clus.Members[0].ClientTLSInfo = nil
client, err := NewClientV3(clus.Members[0])
if err != nil && err != grpc.ErrClientConnTimeout {
if err != nil && err != context.DeadlineExceeded {
t.Fatalf("unexpected error (%v)", err)
} else if client == nil {
// Ideally, no client would be returned. However, grpc will
Expand Down Expand Up @@ -1408,7 +1408,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) {
client, err := NewClientV3(clus.Members[0])
if client != nil || err == nil {
t.Fatalf("expected no client")
} else if err != grpc.ErrClientConnTimeout {
} else if err != context.DeadlineExceeded {
t.Fatalf("unexpected error (%v)", err)
}
}
Expand Down Expand Up @@ -1565,8 +1565,8 @@ func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc
// 5. expect dial time-out when loading expired certs
select {
case gerr := <-errc:
if gerr != grpc.ErrClientConnTimeout {
t.Fatalf("expected %v, got %v", grpc.ErrClientConnTimeout, gerr)
if gerr != context.DeadlineExceeded {
t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr)
}
case <-time.After(5 * time.Second):
t.Fatal("failed to receive dial timeout error")
Expand Down Expand Up @@ -1611,7 +1611,7 @@ func TestGRPCRequireLeader(t *testing.T) {
time.Sleep(time.Duration(3*electionTicks) * tickDuration)

md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
ctx := metadata.NewContext(context.Background(), md)
ctx := metadata.NewOutgoingContext(context.Background(), md)
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
Expand All @@ -1633,7 +1633,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) {

wAPI := toGRPC(client).Watch
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
ctx := metadata.NewContext(context.Background(), md)
ctx := metadata.NewOutgoingContext(context.Background(), md)
wStream, err := wAPI.Watch(ctx)
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestV3LeaseFailover(t *testing.T) {
lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}

md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
mctx := metadata.NewContext(context.Background(), md)
mctx := metadata.NewOutgoingContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
defer cancel()
lac, err := lc.LeaseKeepAlive(ctx)
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestV3LeaseRequireLeader(t *testing.T) {
clus.Members[2].Stop(t)

md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
mctx := metadata.NewContext(context.Background(), md)
mctx := metadata.NewOutgoingContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
defer cancel()
lac, err := lc.LeaseKeepAlive(ctx)
Expand Down

0 comments on commit 24ba237

Please sign in to comment.