Skip to content

Commit

Permalink
*: use metadata Incoming/OutgoingContext
Browse files Browse the repository at this point in the history
Fix etcd-io#7888.

Signed-off-by: Gyu-Ho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed May 8, 2017
1 parent ebda92c commit 5d04de2
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 17 deletions.
2 changes: 1 addition & 1 deletion Documentation/learning/auth_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ For avoiding such a situation, the API layer performs *version number validation

After authenticating with `Authenticate()`, a client can create a gRPC connection as it would without auth. In addition to the existing initialization process, the client must associate the token with the newly created connection. `grpc.WithPerRPCCredentials()` provides the functionality for this purpose.

Every authenticated request from the client has a token. The token can be obtained with `grpc.metadata.FromContext()` in the server side. The server can obtain who is issuing the request and when the user was authorized. The information will be filled by the API layer in the header (`etcdserverpb.RequestHeader.Username` and `etcdserverpb.RequestHeader.AuthRevision`) of a raft log entry (`etcdserverpb.InternalRaftRequest`).
Every authenticated request from the client has a token. The token can be obtained with `grpc.metadata.FromIncomingContext()` in the server side. The server can obtain who is issuing the request and when the user was authorized. The information will be filled by the API layer in the header (`etcdserverpb.RequestHeader.Username` and `etcdserverpb.RequestHeader.AuthRevision`) of a raft log entry (`etcdserverpb.InternalRaftRequest`).

### Checking permission in the state machine

Expand Down
2 changes: 1 addition & 1 deletion auth/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,7 @@ func (as *authStore) AuthInfoFromTLS(ctx context.Context) *AuthInfo {
}

func (as *authStore) AuthInfoFromCtx(ctx context.Context) (*AuthInfo, error) {
md, ok := metadata.FromContext(ctx)
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, nil
}
Expand Down
11 changes: 6 additions & 5 deletions auth/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ func TestAuthInfoFromCtx(t *testing.T) {
t.Errorf("expected (nil, nil), got (%v, %v)", ai, err)
}

ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"tokens": "dummy"}))
// as if it came from RPC
ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"tokens": "dummy"}))
ai, err = as.AuthInfoFromCtx(ctx)
if err != nil && ai != nil {
t.Errorf("expected (nil, nil), got (%v, %v)", ai, err)
Expand All @@ -465,19 +466,19 @@ func TestAuthInfoFromCtx(t *testing.T) {
t.Error(err)
}

ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": "Invalid Token"}))
ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": "Invalid Token"}))
_, err = as.AuthInfoFromCtx(ctx)
if err != ErrInvalidAuthToken {
t.Errorf("expected %v, got %v", ErrInvalidAuthToken, err)
}

ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": "Invalid.Token"}))
ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": "Invalid.Token"}))
_, err = as.AuthInfoFromCtx(ctx)
if err != ErrInvalidAuthToken {
t.Errorf("expected %v, got %v", ErrInvalidAuthToken, err)
}

ctx = metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": resp.Token}))
ctx = metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": resp.Token}))
ai, err = as.AuthInfoFromCtx(ctx)
if err != nil {
t.Error(err)
Expand Down Expand Up @@ -521,7 +522,7 @@ func TestAuthInfoFromCtxRace(t *testing.T) {
donec := make(chan struct{})
go func() {
defer close(donec)
ctx := metadata.NewContext(context.Background(), metadata.New(map[string]string{"token": "test"}))
ctx := metadata.NewIncomingContext(context.Background(), metadata.New(map[string]string{"token": "test"}))
as.AuthInfoFromCtx(ctx)
}()
as.UserAdd(&pb.AuthUserAddRequest{Name: "test"})
Expand Down
2 changes: 1 addition & 1 deletion clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo
// when the cluster has a leader.
func WithRequireLeader(ctx context.Context) context.Context {
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
return metadata.NewContext(ctx, md)
return metadata.NewOutgoingContext(ctx, md)
}

func newClient(cfg *Config) (*Client, error) {
Expand Down
2 changes: 1 addition & 1 deletion clientv3/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (l *lessor) closeRequireLeader() {
reqIdxs := 0
// find all required leader channels, close, mark as nil
for i, ctx := range ka.ctxs {
md, ok := metadata.FromContext(ctx)
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions etcdserver/api/v3rpc/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return nil, rpctypes.ErrGRPCNotCapable
}

md, ok := metadata.FromContext(ctx)
md, ok := metadata.FromIncomingContext(ctx)
if ok {
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {
Expand All @@ -66,7 +66,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor
return rpctypes.ErrGRPCNotCapable
}

md, ok := metadata.FromContext(ss.Context())
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {
Expand Down
4 changes: 2 additions & 2 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,7 +1556,7 @@ func TestGRPCRequireLeader(t *testing.T) {
time.Sleep(time.Duration(3*electionTicks) * tickDuration)

md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
ctx := metadata.NewContext(context.Background(), md)
ctx := metadata.NewOutgoingContext(context.Background(), md)
reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
if _, err := toGRPC(client).KV.Put(ctx, reqput); grpc.ErrorDesc(err) != rpctypes.ErrNoLeader.Error() {
t.Errorf("err = %v, want %v", err, rpctypes.ErrNoLeader)
Expand All @@ -1578,7 +1578,7 @@ func TestGRPCStreamRequireLeader(t *testing.T) {

wAPI := toGRPC(client).Watch
md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
ctx := metadata.NewContext(context.Background(), md)
ctx := metadata.NewOutgoingContext(context.Background(), md)
wStream, err := wAPI.Watch(ctx)
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions integration/v3_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func TestV3LeaseFailover(t *testing.T) {
lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}

md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
mctx := metadata.NewContext(context.Background(), md)
mctx := metadata.NewOutgoingContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
defer cancel()
lac, err := lc.LeaseKeepAlive(ctx)
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestV3LeaseRequireLeader(t *testing.T) {
clus.Members[2].Stop(t)

md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
mctx := metadata.NewContext(context.Background(), md)
mctx := metadata.NewOutgoingContext(context.Background(), md)
ctx, cancel := context.WithCancel(mctx)
defer cancel()
lac, err := lc.LeaseKeepAlive(ctx)
Expand Down
2 changes: 1 addition & 1 deletion proxy/grpcproxy/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error
errc := make(chan error, 2)

var lostLeaderC <-chan struct{}
if md, ok := metadata.FromContext(stream.Context()); ok {
if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
v := md[rpctypes.MetadataRequireLeaderKey]
if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
lostLeaderC = lp.leader.lostNotify()
Expand Down
2 changes: 1 addition & 1 deletion proxy/grpcproxy/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
}

var lostLeaderC <-chan struct{}
if md, ok := metadata.FromContext(stream.Context()); ok {
if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
v := md[rpctypes.MetadataRequireLeaderKey]
if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
lostLeaderC = wp.leader.lostNotify()
Expand Down

0 comments on commit 5d04de2

Please sign in to comment.