diff --git a/integration/bridge.go b/integration/bridge.go index b9e67318e521..59cebe1f0e03 100644 --- a/integration/bridge.go +++ b/integration/bridge.go @@ -17,6 +17,7 @@ package integration import ( "fmt" "io" + "io/ioutil" "net" "sync" @@ -31,9 +32,10 @@ type bridge struct { l net.Listener conns map[*bridgeConn]struct{} - stopc chan struct{} - pausec chan struct{} - wg sync.WaitGroup + stopc chan struct{} + pausec chan struct{} + blackholec chan struct{} + wg sync.WaitGroup mu sync.Mutex } @@ -41,11 +43,12 @@ type bridge struct { func newBridge(addr string) (*bridge, error) { b := &bridge{ // bridge "port" is ("%05d%05d0", port, pid) since go1.8 expects the port to be a number - inaddr: addr + "0", - outaddr: addr, - conns: make(map[*bridgeConn]struct{}), - stopc: make(chan struct{}), - pausec: make(chan struct{}), + inaddr: addr + "0", + outaddr: addr, + conns: make(map[*bridgeConn]struct{}), + stopc: make(chan struct{}), + pausec: make(chan struct{}), + blackholec: make(chan struct{}), } close(b.pausec) @@ -152,12 +155,12 @@ func (b *bridge) serveConn(bc *bridgeConn) { var wg sync.WaitGroup wg.Add(2) go func() { - io.Copy(bc.out, bc.in) + b.ioCopy(bc, bc.out, bc.in) bc.close() wg.Done() }() go func() { - io.Copy(bc.in, bc.out) + b.ioCopy(bc, bc.in, bc.out) bc.close() wg.Done() }() @@ -179,3 +182,47 @@ func (bc *bridgeConn) close() { bc.in.Close() bc.out.Close() } + +func (b *bridge) Blackhole() { + b.mu.Lock() + close(b.blackholec) + b.mu.Unlock() +} + +func (b *bridge) Unblackhole() { + b.mu.Lock() + for bc := range b.conns { + bc.Close() + } + b.conns = make(map[*bridgeConn]struct{}) + b.blackholec = make(chan struct{}) + b.mu.Unlock() +} + +// ref. https://github.com/golang/go/blob/master/src/io/io.go copyBuffer +func (b *bridge) ioCopy(bc *bridgeConn, dst io.Writer, src io.Reader) (err error) { + buf := make([]byte, 32*1024) + for { + select { + case <-b.blackholec: + io.Copy(ioutil.Discard, src) + return nil + default: + } + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if ew != nil { + return ew + } + if nr != nw { + return io.ErrShortWrite + } + } + if er != nil { + err = er + break + } + } + return +} diff --git a/integration/cluster.go b/integration/cluster.go index 18c526bbf146..a3077d0ba2e5 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -31,9 +31,6 @@ import ( "testing" "time" - "golang.org/x/net/context" - "google.golang.org/grpc" - "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/etcdserver" @@ -50,7 +47,11 @@ import ( "github.com/coreos/etcd/pkg/transport" "github.com/coreos/etcd/pkg/types" "github.com/coreos/etcd/rafthttp" + "github.com/coreos/pkg/capnslog" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" ) const ( @@ -88,12 +89,19 @@ var ( ) type ClusterConfig struct { - Size int - PeerTLS *transport.TLSInfo - ClientTLS *transport.TLSInfo - DiscoveryURL string - UseGRPC bool - QuotaBackendBytes int64 + Size int + PeerTLS *transport.TLSInfo + ClientTLS *transport.TLSInfo + DiscoveryURL string + UseGRPC bool + QuotaBackendBytes int64 + MaxTxnOps uint + MaxRequestBytes uint + GRPCKeepAliveMinTime time.Duration + GRPCKeepAliveInterval time.Duration + GRPCKeepAliveTimeout time.Duration + // SkipCreatingClient to skip creating clients for each member. + SkipCreatingClient bool } type cluster struct { @@ -221,10 +229,14 @@ func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) mustNewMember(t *testing.T) *member { m := mustNewMember(t, memberConfig{ - name: c.name(rand.Int()), - peerTLS: c.cfg.PeerTLS, - clientTLS: c.cfg.ClientTLS, - quotaBackendBytes: c.cfg.QuotaBackendBytes, + name: c.name(rand.Int()), + peerTLS: c.cfg.PeerTLS, + clientTLS: c.cfg.ClientTLS, + quotaBackendBytes: c.cfg.QuotaBackendBytes, + maxRequestBytes: c.cfg.MaxRequestBytes, + grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, + grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, + grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -474,9 +486,10 @@ type member struct { s *etcdserver.EtcdServer hss []*httptest.Server - grpcServer *grpc.Server - grpcAddr string - grpcBridge *bridge + grpcServerOpts []grpc.ServerOption + grpcServer *grpc.Server + grpcAddr string + grpcBridge *bridge // serverClient is a clientv3 that directly calls the etcdserver. serverClient *clientv3.Client @@ -487,10 +500,14 @@ type member struct { func (m *member) GRPCAddr() string { return m.grpcAddr } type memberConfig struct { - name string - peerTLS *transport.TLSInfo - clientTLS *transport.TLSInfo - quotaBackendBytes int64 + name string + peerTLS *transport.TLSInfo + clientTLS *transport.TLSInfo + quotaBackendBytes int64 + maxRequestBytes uint + grpcKeepAliveMinTime time.Duration + grpcKeepAliveInterval time.Duration + grpcKeepAliveTimeout time.Duration } // mustNewMember return an inited member with the given name. If peerTLS is @@ -539,6 +556,21 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { m.TickMs = uint(tickDuration / time.Millisecond) m.QuotaBackendBytes = mcfg.quotaBackendBytes m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough + + m.grpcServerOpts = []grpc.ServerOption{} + if mcfg.grpcKeepAliveMinTime > time.Duration(0) { + m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: mcfg.grpcKeepAliveMinTime, + PermitWithoutStream: false, + })) + } + if mcfg.grpcKeepAliveInterval > time.Duration(0) && + mcfg.grpcKeepAliveTimeout > time.Duration(0) { + m.grpcServerOpts = append(m.grpcServerOpts, grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: mcfg.grpcKeepAliveInterval, + Timeout: mcfg.grpcKeepAliveTimeout, + })) + } return m } @@ -567,6 +599,8 @@ func (m *member) electionTimeout() time.Duration { func (m *member) DropConnections() { m.grpcBridge.Reset() } func (m *member) PauseConnections() { m.grpcBridge.Pause() } func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() } +func (m *member) Blackhole() { m.grpcBridge.Blackhole() } +func (m *member) Unblackhole() { m.grpcBridge.Unblackhole() } // NewClientV3 creates a new grpc client connection to the member func NewClientV3(m *member) (*clientv3.Client, error) { @@ -676,7 +710,7 @@ func (m *member) Launch() error { return err } } - m.grpcServer = v3rpc.Server(m.s, tlscfg) + m.grpcServer = v3rpc.Server(m.s, tlscfg, m.grpcServerOpts...) m.serverClient = v3client.New(m.s) lockpb.RegisterLockServer(m.grpcServer, v3lock.NewLockServer(m.serverClient)) epb.RegisterElectionServer(m.grpcServer, v3election.NewElectionServer(m.serverClient)) @@ -824,7 +858,7 @@ func (m *member) Metric(metricName string) (string, error) { } // InjectPartition drops connections from m to others, vice versa. -func (m *member) InjectPartition(t *testing.T, others []*member) { +func (m *member) InjectPartition(t *testing.T, others ...*member) { for _, other := range others { m.s.CutPeer(other.s.ID()) other.s.CutPeer(m.s.ID()) @@ -832,7 +866,7 @@ func (m *member) InjectPartition(t *testing.T, others []*member) { } // RecoverPartition recovers connections from m to others, vice versa. -func (m *member) RecoverPartition(t *testing.T, others []*member) { +func (m *member) RecoverPartition(t *testing.T, others ...*member) { for _, other := range others { m.s.MendPeer(other.s.ID()) other.s.MendPeer(m.s.ID()) @@ -884,12 +918,15 @@ func NewClusterV3(t *testing.T, cfg *ClusterConfig) *ClusterV3 { cluster: NewClusterByConfig(t, cfg), } clus.Launch(t) - for _, m := range clus.Members { - client, err := NewClientV3(m) - if err != nil { - t.Fatalf("cannot create client: %v", err) + + if !cfg.SkipCreatingClient { + for _, m := range clus.Members { + client, err := NewClientV3(m) + if err != nil { + t.Fatalf("cannot create client: %v", err) + } + clus.clients = append(clus.clients, client) } - clus.clients = append(clus.clients, client) } return clus diff --git a/integration/network_partition_test.go b/integration/network_partition_test.go index 21130eb02652..b94d6bc4c9b9 100644 --- a/integration/network_partition_test.go +++ b/integration/network_partition_test.go @@ -149,12 +149,12 @@ func getMembersByIndexSlice(clus *cluster, idxs []int) []*member { func injectPartition(t *testing.T, src, others []*member) { for _, m := range src { - m.InjectPartition(t, others) + m.InjectPartition(t, others...) } } func recoverPartition(t *testing.T, src, others []*member) { for _, m := range src { - m.RecoverPartition(t, others) + m.RecoverPartition(t, others...) } } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 30de93cd214c..81e1c33b27fe 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1372,7 +1372,7 @@ func TestTLSGRPCRejectInsecureClient(t *testing.T) { // nil out TLS field so client will use an insecure connection clus.Members[0].ClientTLSInfo = nil client, err := NewClientV3(clus.Members[0]) - if err != nil && err != grpc.ErrClientConnTimeout { + if err != nil && err != context.DeadlineExceeded { t.Fatalf("unexpected error (%v)", err) } else if client == nil { // Ideally, no client would be returned. However, grpc will @@ -1408,7 +1408,7 @@ func TestTLSGRPCRejectSecureClient(t *testing.T) { client, err := NewClientV3(clus.Members[0]) if client != nil || err == nil { t.Fatalf("expected no client") - } else if err != grpc.ErrClientConnTimeout { + } else if err != context.DeadlineExceeded { t.Fatalf("unexpected error (%v)", err) } } @@ -1565,8 +1565,8 @@ func testTLSReload(t *testing.T, cloneFunc func() transport.TLSInfo, replaceFunc // 5. expect dial time-out when loading expired certs select { case gerr := <-errc: - if gerr != grpc.ErrClientConnTimeout { - t.Fatalf("expected %v, got %v", grpc.ErrClientConnTimeout, gerr) + if gerr != context.DeadlineExceeded { + t.Fatalf("expected %v, got %v", context.DeadlineExceeded, gerr) } case <-time.After(5 * time.Second): t.Fatal("failed to receive dial timeout error") @@ -1611,7 +1611,7 @@ func TestGRPCRequireLeader(t *testing.T) { time.Sleep(time.Duration(3*electionTicks) * tickDuration) md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - ctx := metadata.NewContext(context.Background(), md) + ctx := metadata.NewOutgoingContext(context.Background(), md) reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) @@ -1633,7 +1633,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) { wAPI := toGRPC(client).Watch md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - ctx := metadata.NewContext(context.Background(), md) + ctx := metadata.NewOutgoingContext(context.Background(), md) wStream, err := wAPI.Watch(ctx) if err != nil { t.Fatalf("wAPI.Watch error: %v", err) diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index dbb8e6b727ab..7bb72ba131f8 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -460,7 +460,7 @@ func TestV3LeaseFailover(t *testing.T) { lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID} md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - mctx := metadata.NewContext(context.Background(), md) + mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) defer cancel() lac, err := lc.LeaseKeepAlive(ctx) @@ -508,7 +508,7 @@ func TestV3LeaseRequireLeader(t *testing.T) { clus.Members[2].Stop(t) md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - mctx := metadata.NewContext(context.Background(), md) + mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) defer cancel() lac, err := lc.LeaseKeepAlive(ctx)