From 5e059fd8dca7907c034d1d4b2458e755548a4a5e Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Mon, 8 May 2017 08:10:58 -0700 Subject: [PATCH] *: use metadata Incoming/OutgoingContext Fix https://github.com/coreos/etcd/issues/7888. Signed-off-by: Gyu-Ho Lee --- Documentation/learning/auth_design.md | 2 +- auth/store.go | 2 +- auth/store_test.go | 11 ++++++----- clientv3/client.go | 2 +- clientv3/lease.go | 2 +- etcdserver/api/v3rpc/interceptor.go | 4 ++-- integration/v3_grpc_test.go | 4 ++-- integration/v3_lease_test.go | 4 ++-- proxy/grpcproxy/lease.go | 2 +- proxy/grpcproxy/watch.go | 2 +- 10 files changed, 18 insertions(+), 17 deletions(-) diff --git a/Documentation/learning/auth_design.md b/Documentation/learning/auth_design.md index 192f4b21779..52c979731bf 100644 --- a/Documentation/learning/auth_design.md +++ b/Documentation/learning/auth_design.md @@ -60,7 +60,7 @@ For avoiding such a situation, the API layer performs *version number validation After authenticating with `Authenticate()`, a client can create a gRPC connection as it would without auth. In addition to the existing initialization process, the client must associate the token with the newly created connection. `grpc.WithPerRPCCredentials()` provides the functionality for this purpose. -Every authenticated request from the client has a token. The token can be obtained with `grpc.metadata.FromContext()` in the server side. The server can obtain who is issuing the request and when the user was authorized. The information will be filled by the API layer in the header (`etcdserverpb.RequestHeader.Username` and `etcdserverpb.RequestHeader.AuthRevision`) of a raft log entry (`etcdserverpb.InternalRaftRequest`). +Every authenticated request from the client has a token. The token can be obtained with `grpc.metadata.FromIncomingContext()` in the server side. The server can obtain who is issuing the request and when the user was authorized. The information will be filled by the API layer in the header (`etcdserverpb.RequestHeader.Username` and `etcdserverpb.RequestHeader.AuthRevision`) of a raft log entry (`etcdserverpb.InternalRaftRequest`). ### Checking permission in the state machine diff --git a/auth/store.go b/auth/store.go index f3cbd6bba17..3b391b464fe 100644 --- a/auth/store.go +++ b/auth/store.go @@ -995,7 +995,7 @@ func (as *authStore) AuthInfoFromTLS(ctx context.Context) *AuthInfo { } func (as *authStore) AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error) { - md, ok := metadata.FromContext(ctx) + md, ok := metadata.FromIncomingContext(ctx) if !ok { return nil, nil } diff --git a/auth/store_test.go b/auth/store_test.go index bf0a4fc9319..f2a25aac6bf 100644 --- a/auth/store_test.go +++ b/auth/store_test.go @@ -453,7 +453,8 @@ func TestAuthInfoFromCtx(t *testing.T) { t.Errorf("expected (nil, nil), got (%v, %v)", ai, err) } - ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"tokens": "dummy"})) + // as if it came from RPC + ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"tokens": "dummy"})) ai, err = as.AuthInfoFromCtx(ctx) if err != nil && ai != nil { t.Errorf("expected (nil, nil), got (%v, %v)", ai, err) @@ -465,19 +466,19 @@ func TestAuthInfoFromCtx(t *testing.T) { t.Error(err) } - ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": "Invalid Token"})) + ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": "Invalid Token"})) _, err = as.AuthInfoFromCtx(ctx) if err != ErrInvalidAuthToken { t.Errorf("expected %v, got %v", ErrInvalidAuthToken, err) } - ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": "Invalid.Token"})) + ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": "Invalid.Token"})) _, err = as.AuthInfoFromCtx(ctx) if err != ErrInvalidAuthToken { t.Errorf("expected %v, got %v", ErrInvalidAuthToken, err) } - ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": resp.Token})) + ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": resp.Token})) ai, err = as.AuthInfoFromCtx(ctx) if err != nil { t.Error(err) @@ -521,7 +522,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) { donec := make(chan struct{}) go func() { defer close(donec) - ctx := metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": "test"})) + ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": "test"})) as.AuthInfoFromCtx(ctx) }() as.UserAdd(&pb.AuthUserAddRequest{Name: "test"}) diff --git a/clientv3/client.go b/clientv3/client.go index f220a895ef2..a0e059d5e35 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -333,7 +333,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo // when the cluster has a leader. func WithRequireLeader(ctx context.Context) context.Context { md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - return metadata.NewContext(ctx, md) + return metadata.NewOutgoingContext(ctx, md) } func newClient(cfg *Config) (*Client, error) { diff --git a/clientv3/lease.go b/clientv3/lease.go index e3e888ccf59..f624793c4bc 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -323,7 +323,7 @@ func (l *lessor) closeRequireLeader() { reqIdxs := 0 // find all required leader channels, close, mark as nil for i, ctx := range ka.ctxs { - md, ok := metadata.FromContext(ctx) + md, ok := metadata.FromOutgoingContext(ctx) if !ok { continue } diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 29aef2914a5..de9470a8905 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -45,7 +45,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { return nil, rpctypes.ErrGRPCNotCapable } - md, ok := metadata.FromContext(ctx) + md, ok := metadata.FromIncomingContext(ctx) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { if s.Leader() == types.ID(raft.None) { @@ -66,7 +66,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNotCapable } - md, ok := metadata.FromContext(ss.Context()) + md, ok := metadata.FromIncomingContext(ss.Context()) if ok { if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { if s.Leader() == types.ID(raft.None) { diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index da04d76e63c..7e1fa6c2ec7 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1608,7 +1608,7 @@ func TestGRPCRequireLeader(t *testing.T) { time.Sleep(time.Duration(3*electionTicks) * tickDuration) md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - ctx := metadata.NewContext(context.Background(), md) + ctx := metadata.NewOutgoingContext(context.Background(), md) reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")} if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() { t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader) @@ -1630,7 +1630,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) { wAPI := toGRPC(client).Watch md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - ctx := metadata.NewContext(context.Background(), md) + ctx := metadata.NewOutgoingContext(context.Background(), md) wStream, err := wAPI.Watch(ctx) if err != nil { t.Fatalf("wAPI.Watch error: %v", err) diff --git a/integration/v3_lease_test.go b/integration/v3_lease_test.go index 3846817ea7f..b270e1c1b6b 100644 --- a/integration/v3_lease_test.go +++ b/integration/v3_lease_test.go @@ -454,7 +454,7 @@ func TestV3LeaseFailover(t *testing.T) { lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID} md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - mctx := metadata.NewContext(context.Background(), md) + mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) defer cancel() lac, err := lc.LeaseKeepAlive(ctx) @@ -502,7 +502,7 @@ func TestV3LeaseRequireLeader(t *testing.T) { clus.Members[2].Stop(t) md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - mctx := metadata.NewContext(context.Background(), md) + mctx := metadata.NewOutgoingContext(context.Background(), md) ctx, cancel := context.WithCancel(mctx) defer cancel() lac, err := lc.LeaseKeepAlive(ctx) diff --git a/proxy/grpcproxy/lease.go b/proxy/grpcproxy/lease.go index dd23425a281..19c2249a7e2 100644 --- a/proxy/grpcproxy/lease.go +++ b/proxy/grpcproxy/lease.go @@ -137,7 +137,7 @@ func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error errc := make(chan error, 2) var lostLeaderC <-chan struct{} - if md, ok := metadata.FromContext(stream.Context()); ok { + if md, ok := metadata.FromOutgoingContext(stream.Context()); ok { v := md[rpctypes.MetadataRequireLeaderKey] if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader { lostLeaderC = lp.leader.lostNotify() diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 42748fd4a4f..b960c94769a 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -95,7 +95,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { } var lostLeaderC <-chan struct{} - if md, ok := metadata.FromContext(stream.Context()); ok { + if md, ok := metadata.FromOutgoingContext(stream.Context()); ok { v := md[rpctypes.MetadataRequireLeaderKey] if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader { lostLeaderC = wp.leader.lostNotify()