From 9442f9001624d344595ccc64d12df66f8d04d4ef Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 19:53:44 -0800 Subject: [PATCH 01/11] integration: remove typo in "TestV3LargeRequests" Signed-off-by: Gyuho Lee --- integration/v3_grpc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index f3b2935ce3c..c6e4a3d16ae 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -1874,7 +1874,7 @@ func TestV3LargeRequests(t *testing.T) { // limit receive call size with original value + gRPC overhead bytes _, err = kvcli.Range(context.TODO(), reqget, grpc.MaxCallRecvMsgSize(test.valueSize+512*1024)) if err != nil { - t.Errorf("#%d: range expected no error , got %v", i, err) + t.Errorf("#%d: range expected no error, got %v", i, err) } } From 63d66b1011c39212b2e66aed928ce13447e009d8 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 14:56:38 -0800 Subject: [PATCH 02/11] clientv3: configure gRPC message limits in Config Signed-off-by: Gyuho Lee --- clientv3/client.go | 20 +++++++++++++++++ clientv3/config.go | 13 ++++++++++++ clientv3/grpc_options.go | 46 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 clientv3/grpc_options.go diff --git a/clientv3/client.go b/clientv3/client.go index 3be8976473f..3cb463d8fa6 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -67,6 +67,8 @@ type Client struct { Password string // tokenCred is an instance of WithPerRPCCredentials()'s argument tokenCred *authTokenCredential + + callOpts []grpc.CallOption } // New creates a new etcdv3 client from a given configuration. @@ -386,11 +388,29 @@ func newClient(cfg *Config) (*Client, error) { ctx: ctx, cancel: cancel, mu: new(sync.Mutex), + callOpts: defaultCallOpts, } if cfg.Username != "" && cfg.Password != "" { client.Username = cfg.Username client.Password = cfg.Password } + if cfg.MaxCallSendMsgSize > 0 || cfg.MaxCallRecvMsgSize > 0 { + if cfg.MaxCallRecvMsgSize > 0 && cfg.MaxCallSendMsgSize > cfg.MaxCallRecvMsgSize { + return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", cfg.MaxCallRecvMsgSize, cfg.MaxCallSendMsgSize) + } + callOpts := []grpc.CallOption{ + defaultFailFast, + defaultMaxCallSendMsgSize, + defaultMaxCallRecvMsgSize, + } + if cfg.MaxCallSendMsgSize > 0 { + callOpts[1] = grpc.MaxCallSendMsgSize(cfg.MaxCallSendMsgSize) + } + if cfg.MaxCallRecvMsgSize > 0 { + callOpts[2] = grpc.MaxCallRecvMsgSize(cfg.MaxCallRecvMsgSize) + } + client.callOpts = callOpts + } client.balancer = newHealthBalancer(cfg.Endpoints, cfg.DialTimeout, func(ep string) (bool, error) { return grpcHealthCheck(client, ep) diff --git a/clientv3/config.go b/clientv3/config.go index 15bc8e5e046..79d6e2a984f 100644 --- a/clientv3/config.go +++ b/clientv3/config.go @@ -41,6 +41,19 @@ type Config struct { // keep-alive probe. If the response is not received in this time, the connection is closed. DialKeepAliveTimeout time.Duration `json:"dial-keep-alive-timeout"` + // MaxCallSendMsgSize is the client-side request send limit in bytes. + // If 0, it defaults to 2.0 MiB (2 * 1024 * 1024). + // Make sure that "MaxCallSendMsgSize" < server-side default send/recv limit. + // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes"). + MaxCallSendMsgSize int + + // MaxCallRecvMsgSize is the client-side response receive limit. + // If 0, it defaults to "math.MaxInt32", because range response can + // easily exceed request send limits. + // Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit. + // ("--max-request-bytes" flag to etcd or "embed.Config.MaxRequestBytes"). + MaxCallRecvMsgSize int + // TLS holds the client secure credentials, if any. TLS *tls.Config diff --git a/clientv3/grpc_options.go b/clientv3/grpc_options.go new file mode 100644 index 00000000000..592dd6993cf --- /dev/null +++ b/clientv3/grpc_options.go @@ -0,0 +1,46 @@ +// Copyright 2017 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "math" + + "google.golang.org/grpc" +) + +var ( + // Disable gRPC internal retrial logic + // TODO: enable when gRPC retry is stable (FailFast=false) + // Reference: + // - https://github.com/grpc/grpc-go/issues/1532 + // - https://github.com/grpc/proposal/blob/master/A6-client-retries.md + defaultFailFast = grpc.FailFast(true) + + // client-side request send limit, gRPC default is math.MaxInt32 + // Make sure that "client-side send limit < server-side default send/recv limit" + // Same value as "embed.DefaultMaxRequestBytes" plus gRPC overhead bytes + defaultMaxCallSendMsgSize = grpc.MaxCallSendMsgSize(2 * 1024 * 1024) + + // client-side response receive limit, gRPC default is 4MB + // Make sure that "client-side receive limit >= server-side default send/recv limit" + // because range response can easily exceed request send limits + // Default to math.MaxInt32; writes exceeding server-side send limit fails anyway + defaultMaxCallRecvMsgSize = grpc.MaxCallRecvMsgSize(math.MaxInt32) +) + +// defaultCallOpts defines a list of default "gRPC.CallOption". +// Some options are exposed to "clientv3.Config". +// Defaults will be overridden by the settings in "clientv3.Config". +var defaultCallOpts = []grpc.CallOption{defaultFailFast, defaultMaxCallSendMsgSize, defaultMaxCallRecvMsgSize} From f87760998b2d898ae8af999a58169a1a39cb72d7 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 15:06:57 -0800 Subject: [PATCH 03/11] clientv3: call KV/Txn APIs with default gRPC call options Signed-off-by: Gyuho Lee --- clientv3/integration/dial_test.go | 2 +- clientv3/kv.go | 34 +++++++++++++++++++---------- clientv3/ordering/kv_test.go | 4 ++-- clientv3/txn.go | 6 ++++- etcdserver/api/v3client/v3client.go | 2 +- integration/cluster_proxy.go | 2 +- 6 files changed, 33 insertions(+), 17 deletions(-) diff --git a/clientv3/integration/dial_test.go b/clientv3/integration/dial_test.go index 43145c2cbb0..970dde03bb1 100644 --- a/clientv3/integration/dial_test.go +++ b/clientv3/integration/dial_test.go @@ -182,7 +182,7 @@ func TestDialForeignEndpoint(t *testing.T) { // grpc can return a lazy connection that's not connected yet; confirm // that it can communicate with the cluster. - kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn)) + kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), clus.Client(0)) ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() if _, gerr := kvc.Get(ctx, "abc"); gerr != nil { diff --git a/clientv3/kv.go b/clientv3/kv.go index b578d9ebe46..5a7469bd4c9 100644 --- a/clientv3/kv.go +++ b/clientv3/kv.go @@ -18,6 +18,8 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "google.golang.org/grpc" ) type ( @@ -88,15 +90,24 @@ func (resp *TxnResponse) OpResponse() OpResponse { } type kv struct { - remote pb.KVClient + remote pb.KVClient + callOpts []grpc.CallOption } func NewKV(c *Client) KV { - return &kv{remote: RetryKVClient(c)} + api := &kv{remote: RetryKVClient(c)} + if c != nil { + api.callOpts = c.callOpts + } + return api } -func NewKVFromKVClient(remote pb.KVClient) KV { - return &kv{remote: remote} +func NewKVFromKVClient(remote pb.KVClient, c *Client) KV { + api := &kv{remote: remote} + if c != nil { + api.callOpts = c.callOpts + } + return api } func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) { @@ -115,7 +126,7 @@ func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*Delete } func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) { - resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest()) + resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -124,8 +135,9 @@ func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*C func (kv *kv) Txn(ctx context.Context) Txn { return &txn{ - kv: kv, - ctx: ctx, + kv: kv, + ctx: ctx, + callOpts: kv.callOpts, } } @@ -134,27 +146,27 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) { switch op.t { case tRange: var resp *pb.RangeResponse - resp, err = kv.remote.Range(ctx, op.toRangeRequest()) + resp, err = kv.remote.Range(ctx, op.toRangeRequest(), kv.callOpts...) if err == nil { return OpResponse{get: (*GetResponse)(resp)}, nil } case tPut: var resp *pb.PutResponse r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease} - resp, err = kv.remote.Put(ctx, r) + resp, err = kv.remote.Put(ctx, r, kv.callOpts...) if err == nil { return OpResponse{put: (*PutResponse)(resp)}, nil } case tDeleteRange: var resp *pb.DeleteRangeResponse r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV} - resp, err = kv.remote.DeleteRange(ctx, r) + resp, err = kv.remote.DeleteRange(ctx, r, kv.callOpts...) if err == nil { return OpResponse{del: (*DeleteResponse)(resp)}, nil } case tTxn: var resp *pb.TxnResponse - resp, err = kv.remote.Txn(ctx, op.toTxnRequest()) + resp, err = kv.remote.Txn(ctx, op.toTxnRequest(), kv.callOpts...) if err == nil { return OpResponse{txn: (*TxnResponse)(resp)}, nil } diff --git a/clientv3/ordering/kv_test.go b/clientv3/ordering/kv_test.go index 79ce9491e9b..9e884f3b291 100644 --- a/clientv3/ordering/kv_test.go +++ b/clientv3/ordering/kv_test.go @@ -204,7 +204,7 @@ var rangeTests = []struct { func TestKvOrdering(t *testing.T) { for i, tt := range rangeTests { - mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()} + mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()} kv := &kvOrdering{ mKV, func(r *clientv3.GetResponse) OrderViolationFunc { @@ -258,7 +258,7 @@ var txnTests = []struct { func TestTxnOrdering(t *testing.T) { for i, tt := range txnTests { - mKV := &mockKV{clientv3.NewKVFromKVClient(nil), tt.response.OpResponse()} + mKV := &mockKV{clientv3.NewKVFromKVClient(nil, nil), tt.response.OpResponse()} kv := &kvOrdering{ mKV, func(r *clientv3.TxnResponse) OrderViolationFunc { diff --git a/clientv3/txn.go b/clientv3/txn.go index 8169b621509..c3c2d248569 100644 --- a/clientv3/txn.go +++ b/clientv3/txn.go @@ -19,6 +19,8 @@ import ( "sync" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "google.golang.org/grpc" ) // Txn is the interface that wraps mini-transactions. @@ -65,6 +67,8 @@ type txn struct { sus []*pb.RequestOp fas []*pb.RequestOp + + callOpts []grpc.CallOption } func (txn *txn) If(cs ...Cmp) Txn { @@ -139,7 +143,7 @@ func (txn *txn) Commit() (*TxnResponse, error) { var resp *pb.TxnResponse var err error - resp, err = txn.kv.remote.Txn(txn.ctx, r) + resp, err = txn.kv.remote.Txn(txn.ctx, r, txn.callOpts...) if err != nil { return nil, toErr(txn.ctx, err) } diff --git a/etcdserver/api/v3client/v3client.go b/etcdserver/api/v3client/v3client.go index 6f943184f0b..06874816d5e 100644 --- a/etcdserver/api/v3client/v3client.go +++ b/etcdserver/api/v3client/v3client.go @@ -31,7 +31,7 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client { c := clientv3.NewCtxClient(context.Background()) kvc := adapter.KvServerToKvClient(v3rpc.NewQuotaKVServer(s)) - c.KV = clientv3.NewKVFromKVClient(kvc) + c.KV = clientv3.NewKVFromKVClient(kvc, c) lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s)) c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second) diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 613b61b9a4b..7fb9e912117 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -99,7 +99,7 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { return nil, err } rpc := toGRPC(c) - c.KV = clientv3.NewKVFromKVClient(rpc.KV) + c.KV = clientv3.NewKVFromKVClient(rpc.KV, c) pmu.Lock() lc := c.Lease c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) From 497412c5886293f2ed02789f89942c363cef77bd Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 15:07:32 -0800 Subject: [PATCH 04/11] clientv3: call other APIs with default gRPC call options Signed-off-by: Gyuho Lee --- clientv3/auth.go | 56 +++++++++++++++++------------ clientv3/client.go | 2 +- clientv3/cluster.go | 27 +++++++++----- clientv3/lease.go | 22 +++++++----- clientv3/maintenance.go | 35 +++++++++++------- clientv3/watch.go | 21 +++++++---- etcdserver/api/v3client/v3client.go | 8 ++--- integration/cluster_proxy.go | 4 +-- 8 files changed, 110 insertions(+), 65 deletions(-) diff --git a/clientv3/auth.go b/clientv3/auth.go index 8df670f163a..7545bb6ca1c 100644 --- a/clientv3/auth.go +++ b/clientv3/auth.go @@ -101,60 +101,65 @@ type Auth interface { } type auth struct { - remote pb.AuthClient + remote pb.AuthClient + callOpts []grpc.CallOption } func NewAuth(c *Client) Auth { - return &auth{remote: RetryAuthClient(c)} + api := &auth{remote: RetryAuthClient(c)} + if c != nil { + api.callOpts = c.callOpts + } + return api } func (auth *auth) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) { - resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}) + resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...) return (*AuthEnableResponse)(resp), toErr(ctx, err) } func (auth *auth) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) { - resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}) + resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...) return (*AuthDisableResponse)(resp), toErr(ctx, err) } func (auth *auth) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) { - resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}) + resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password}, auth.callOpts...) return (*AuthUserAddResponse)(resp), toErr(ctx, err) } func (auth *auth) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) { - resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}) + resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...) return (*AuthUserDeleteResponse)(resp), toErr(ctx, err) } func (auth *auth) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) { - resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}) + resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...) return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) { - resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}) + resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...) return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) { - resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}) + resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...) return (*AuthUserGetResponse)(resp), toErr(ctx, err) } func (auth *auth) UserList(ctx context.Context) (*AuthUserListResponse, error) { - resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}) + resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...) return (*AuthUserListResponse)(resp), toErr(ctx, err) } func (auth *auth) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) { - resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}) + resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...) return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) { - resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}) + resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...) return (*AuthRoleAddResponse)(resp), toErr(ctx, err) } @@ -164,27 +169,27 @@ func (auth *auth) RoleGrantPermission(ctx context.Context, name string, key, ran RangeEnd: []byte(rangeEnd), PermType: authpb.Permission_Type(permType), } - resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}) + resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...) return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) { - resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}) + resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...) return (*AuthRoleGetResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleList(ctx context.Context) (*AuthRoleListResponse, error) { - resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}) + resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...) return (*AuthRoleListResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) { - resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}) + resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: key, RangeEnd: rangeEnd}, auth.callOpts...) return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err) } func (auth *auth) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) { - resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}) + resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...) return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err) } @@ -197,12 +202,13 @@ func StrToPermissionType(s string) (PermissionType, error) { } type authenticator struct { - conn *grpc.ClientConn // conn in-use - remote pb.AuthClient + conn *grpc.ClientConn // conn in-use + remote pb.AuthClient + callOpts []grpc.CallOption } func (auth *authenticator) authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) { - resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}) + resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...) return (*AuthenticateResponse)(resp), toErr(ctx, err) } @@ -210,14 +216,18 @@ func (auth *authenticator) close() { auth.conn.Close() } -func newAuthenticator(endpoint string, opts []grpc.DialOption) (*authenticator, error) { +func newAuthenticator(endpoint string, opts []grpc.DialOption, c *Client) (*authenticator, error) { conn, err := grpc.Dial(endpoint, opts...) if err != nil { return nil, err } - return &authenticator{ + api := &authenticator{ conn: conn, remote: pb.NewAuthClient(conn), - }, nil + } + if c != nil { + api.callOpts = c.callOpts + } + return api, nil } diff --git a/clientv3/client.go b/clientv3/client.go index 3cb463d8fa6..a2c313d35b5 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -297,7 +297,7 @@ func (c *Client) getToken(ctx context.Context) error { endpoint := c.cfg.Endpoints[i] host := getHost(endpoint) // use dial options without dopts to avoid reusing the client balancer - auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint)) + auth, err = newAuthenticator(host, c.dialSetupOpts(endpoint), c) if err != nil { continue } diff --git a/clientv3/cluster.go b/clientv3/cluster.go index 8beba58a67b..e472e62c7a0 100644 --- a/clientv3/cluster.go +++ b/clientv3/cluster.go @@ -18,6 +18,8 @@ import ( "context" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "google.golang.org/grpc" ) type ( @@ -43,20 +45,29 @@ type Cluster interface { } type cluster struct { - remote pb.ClusterClient + remote pb.ClusterClient + callOpts []grpc.CallOption } func NewCluster(c *Client) Cluster { - return &cluster{remote: RetryClusterClient(c)} + api := &cluster{remote: RetryClusterClient(c)} + if c != nil { + api.callOpts = c.callOpts + } + return api } -func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster { - return &cluster{remote: remote} +func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster { + api := &cluster{remote: remote} + if c != nil { + api.callOpts = c.callOpts + } + return api } func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAddResponse, error) { r := &pb.MemberAddRequest{PeerURLs: peerAddrs} - resp, err := c.remote.MemberAdd(ctx, r) + resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -65,7 +76,7 @@ func (c *cluster) MemberAdd(ctx context.Context, peerAddrs []string) (*MemberAdd func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveResponse, error) { r := &pb.MemberRemoveRequest{ID: id} - resp, err := c.remote.MemberRemove(ctx, r) + resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -75,7 +86,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []string) (*MemberUpdateResponse, error) { // it is safe to retry on update. r := &pb.MemberUpdateRequest{ID: id, PeerURLs: peerAddrs} - resp, err := c.remote.MemberUpdate(ctx, r) + resp, err := c.remote.MemberUpdate(ctx, r, c.callOpts...) if err == nil { return (*MemberUpdateResponse)(resp), nil } @@ -84,7 +95,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) { // it is safe to retry on list. - resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}) + resp, err := c.remote.MemberList(ctx, &pb.MemberListRequest{}, c.callOpts...) if err == nil { return (*MemberListResponse)(resp), nil } diff --git a/clientv3/lease.go b/clientv3/lease.go index 229e8cd66ba..4097b3afa2a 100644 --- a/clientv3/lease.go +++ b/clientv3/lease.go @@ -22,6 +22,7 @@ import ( "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) @@ -166,6 +167,8 @@ type lessor struct { // firstKeepAliveOnce ensures stream starts after first KeepAlive call. firstKeepAliveOnce sync.Once + + callOpts []grpc.CallOption } // keepAlive multiplexes a keepalive for a lease over multiple channels @@ -181,10 +184,10 @@ type keepAlive struct { } func NewLease(c *Client) Lease { - return NewLeaseFromLeaseClient(RetryLeaseClient(c), c.cfg.DialTimeout+time.Second) + return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second) } -func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease { +func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease { l := &lessor{ donec: make(chan struct{}), keepAlives: make(map[LeaseID]*keepAlive), @@ -194,6 +197,9 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati if l.firstKeepAliveTimeout == time.Second { l.firstKeepAliveTimeout = defaultTTL } + if c != nil { + l.callOpts = c.callOpts + } reqLeaderCtx := WithRequireLeader(context.Background()) l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx) return l @@ -201,7 +207,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Durati func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) { r := &pb.LeaseGrantRequest{TTL: ttl} - resp, err := l.remote.LeaseGrant(ctx, r) + resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...) if err == nil { gresp := &LeaseGrantResponse{ ResponseHeader: resp.GetHeader(), @@ -216,7 +222,7 @@ func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, err func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) { r := &pb.LeaseRevokeRequest{ID: int64(id)} - resp, err := l.remote.LeaseRevoke(ctx, r) + resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...) if err == nil { return (*LeaseRevokeResponse)(resp), nil } @@ -225,7 +231,7 @@ func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) { r := toLeaseTimeToLiveRequest(id, opts...) - resp, err := l.remote.LeaseTimeToLive(ctx, r) + resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...) if err == nil { gresp := &LeaseTimeToLiveResponse{ ResponseHeader: resp.GetHeader(), @@ -240,7 +246,7 @@ func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption } func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) { - resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}) + resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...) if err == nil { leases := make([]LeaseStatus, len(resp.Leases)) for i := range resp.Leases { @@ -389,7 +395,7 @@ func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAlive cctx, cancel := context.WithCancel(ctx) defer cancel() - stream, err := l.remote.LeaseKeepAlive(cctx) + stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -460,7 +466,7 @@ func (l *lessor) recvKeepAliveLoop() (gerr error) { // resetRecv opens a new lease stream and starts sending keep alive requests. func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) { sctx, cancel := context.WithCancel(l.stopCtx) - stream, err := l.remote.LeaseKeepAlive(sctx) + stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...) if err != nil { cancel() return nil, err diff --git a/clientv3/maintenance.go b/clientv3/maintenance.go index 3156770137b..f60cfbe4719 100644 --- a/clientv3/maintenance.go +++ b/clientv3/maintenance.go @@ -19,6 +19,8 @@ import ( "io" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "google.golang.org/grpc" ) type ( @@ -63,12 +65,13 @@ type Maintenance interface { } type maintenance struct { - dial func(endpoint string) (pb.MaintenanceClient, func(), error) - remote pb.MaintenanceClient + dial func(endpoint string) (pb.MaintenanceClient, func(), error) + remote pb.MaintenanceClient + callOpts []grpc.CallOption } func NewMaintenance(c *Client) Maintenance { - return &maintenance{ + api := &maintenance{ dial: func(endpoint string) (pb.MaintenanceClient, func(), error) { conn, err := c.dial(endpoint) if err != nil { @@ -79,15 +82,23 @@ func NewMaintenance(c *Client) Maintenance { }, remote: RetryMaintenanceClient(c, c.conn), } + if c != nil { + api.callOpts = c.callOpts + } + return api } -func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance { - return &maintenance{ +func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { + api := &maintenance{ dial: func(string) (pb.MaintenanceClient, func(), error) { return remote, func() {}, nil }, remote: remote, } + if c != nil { + api.callOpts = c.callOpts + } + return api } func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { @@ -96,7 +107,7 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) { MemberID: 0, // all Alarm: pb.AlarmType_NONE, // all } - resp, err := m.remote.Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req, m.callOpts...) if err == nil { return (*AlarmResponse)(resp), nil } @@ -126,7 +137,7 @@ func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmR return &ret, nil } - resp, err := m.remote.Alarm(ctx, req) + resp, err := m.remote.Alarm(ctx, req, m.callOpts...) if err == nil { return (*AlarmResponse)(resp), nil } @@ -139,7 +150,7 @@ func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*Defragm return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}) + resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -152,7 +163,7 @@ func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusRespo return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.Status(ctx, &pb.StatusRequest{}) + resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -165,7 +176,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* return nil, toErr(ctx, err) } defer cancel() - resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}) + resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -173,7 +184,7 @@ func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (* } func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) { - ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}) + ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, m.callOpts...) if err != nil { return nil, toErr(ctx, err) } @@ -210,6 +221,6 @@ func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) { } func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) { - resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}) + resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...) return (*MoveLeaderResponse)(resp), toErr(ctx, err) } diff --git a/clientv3/watch.go b/clientv3/watch.go index 7eb8b4d6ccf..f606a1af975 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -106,7 +106,8 @@ func (wr *WatchResponse) IsProgressNotify() bool { // watcher implements the Watcher interface type watcher struct { - remote pb.WatchClient + remote pb.WatchClient + callOpts []grpc.CallOption // mu protects the grpc streams map mu sync.RWMutex @@ -117,8 +118,9 @@ type watcher struct { // watchGrpcStream tracks all watch resources attached to a single grpc stream. type watchGrpcStream struct { - owner *watcher - remote pb.WatchClient + owner *watcher + remote pb.WatchClient + callOpts []grpc.CallOption // ctx controls internal remote.Watch requests ctx context.Context @@ -189,14 +191,18 @@ type watcherStream struct { } func NewWatcher(c *Client) Watcher { - return NewWatchFromWatchClient(pb.NewWatchClient(c.conn)) + return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c) } -func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { - return &watcher{ +func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { + w := &watcher{ remote: wc, streams: make(map[string]*watchGrpcStream), } + if c != nil { + w.callOpts = c.callOpts + } + return w } // never closes @@ -215,6 +221,7 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { wgs := &watchGrpcStream{ owner: w, remote: w.remote, + callOpts: w.callOpts, ctx: ctx, ctxKey: streamKeyFromCtx(inctx), cancel: cancel, @@ -775,7 +782,7 @@ func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) return nil, err default: } - if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { + if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil { break } if isHaltErr(w.ctx, err) { diff --git a/etcdserver/api/v3client/v3client.go b/etcdserver/api/v3client/v3client.go index 06874816d5e..ab48ea75b91 100644 --- a/etcdserver/api/v3client/v3client.go +++ b/etcdserver/api/v3client/v3client.go @@ -34,16 +34,16 @@ func New(s *etcdserver.EtcdServer) *clientv3.Client { c.KV = clientv3.NewKVFromKVClient(kvc, c) lc := adapter.LeaseServerToLeaseClient(v3rpc.NewQuotaLeaseServer(s)) - c.Lease = clientv3.NewLeaseFromLeaseClient(lc, time.Second) + c.Lease = clientv3.NewLeaseFromLeaseClient(lc, c, time.Second) wc := adapter.WatchServerToWatchClient(v3rpc.NewWatchServer(s)) - c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc)} + c.Watcher = &watchWrapper{clientv3.NewWatchFromWatchClient(wc, c)} mc := adapter.MaintenanceServerToMaintenanceClient(v3rpc.NewMaintenanceServer(s)) - c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc) + c.Maintenance = clientv3.NewMaintenanceFromMaintenanceClient(mc, c) clc := adapter.ClusterServerToClusterClient(v3rpc.NewClusterServer(s)) - c.Cluster = clientv3.NewClusterFromClusterClient(clc) + c.Cluster = clientv3.NewClusterFromClusterClient(clc, c) // TODO: implement clientv3.Auth interface? diff --git a/integration/cluster_proxy.go b/integration/cluster_proxy.go index 7fb9e912117..1e8d8b572fb 100644 --- a/integration/cluster_proxy.go +++ b/integration/cluster_proxy.go @@ -102,9 +102,9 @@ func newClientV3(cfg clientv3.Config) (*clientv3.Client, error) { c.KV = clientv3.NewKVFromKVClient(rpc.KV, c) pmu.Lock() lc := c.Lease - c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, cfg.DialTimeout) + c.Lease = clientv3.NewLeaseFromLeaseClient(rpc.Lease, c, cfg.DialTimeout) c.Watcher = &proxyCloser{ - Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch), + Watcher: clientv3.NewWatchFromWatchClient(rpc.Watch, c), wdonec: proxies[c].wdonec, kvdonec: proxies[c].kvdonec, lclose: func() { lc.Close() }, From f38593bbadc24ba7c7c40fb4330038d97ca6be84 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 15:21:01 -0800 Subject: [PATCH 05/11] clientv3/integration: test large KV requests Signed-off-by: Gyuho Lee --- clientv3/integration/kv_test.go | 93 +++++++++++++++++++++++++++++++++ integration/cluster.go | 55 +++++++++++-------- 2 files changed, 127 insertions(+), 21 deletions(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index f41b952976b..805f3d42038 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -30,6 +30,7 @@ import ( "github.com/coreos/etcd/pkg/testutil" "google.golang.org/grpc" + "google.golang.org/grpc/codes" ) func TestKVPutError(t *testing.T) { @@ -861,3 +862,95 @@ func TestKVPutAtMostOnce(t *testing.T) { t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0]) } } + +// TestKVLargeRequests tests various client/server side request limits. +func TestKVLargeRequests(t *testing.T) { + defer testutil.AfterTest(t) + tests := []struct { + // make sure that "MaxCallSendMsgSize" < server-side default send/recv limit + maxRequestBytesServer uint + maxCallSendBytesClient int + maxCallRecvBytesClient int + + valueSize int + expectError error + }{ + { + maxRequestBytesServer: 1, + maxCallSendBytesClient: 0, + maxCallRecvBytesClient: 0, + valueSize: 1024, + expectError: rpctypes.ErrRequestTooLarge, + }, + + // without proper client-side receive size limit + // "code = ResourceExhausted desc = grpc: received message larger than max (5242929 vs. 4194304)" + { + + maxRequestBytesServer: 7*1024*1024 + 512*1024, + maxCallSendBytesClient: 7 * 1024 * 1024, + maxCallRecvBytesClient: 0, + valueSize: 5 * 1024 * 1024, + expectError: nil, + }, + + { + maxRequestBytesServer: 10 * 1024 * 1024, + maxCallSendBytesClient: 100 * 1024 * 1024, + maxCallRecvBytesClient: 0, + valueSize: 10 * 1024 * 1024, + expectError: rpctypes.ErrRequestTooLarge, + }, + { + maxRequestBytesServer: 10 * 1024 * 1024, + maxCallSendBytesClient: 10 * 1024 * 1024, + maxCallRecvBytesClient: 0, + valueSize: 10 * 1024 * 1024, + expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485770, 10485760), + }, + { + maxRequestBytesServer: 10 * 1024 * 1024, + maxCallSendBytesClient: 100 * 1024 * 1024, + maxCallRecvBytesClient: 0, + valueSize: 10*1024*1024 + 5, + expectError: rpctypes.ErrRequestTooLarge, + }, + { + maxRequestBytesServer: 10 * 1024 * 1024, + maxCallSendBytesClient: 10 * 1024 * 1024, + maxCallRecvBytesClient: 0, + valueSize: 10*1024*1024 + 5, + expectError: grpc.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", 10485775, 10485760), + }, + } + for i, test := range tests { + clus := integration.NewClusterV3(t, + &integration.ClusterConfig{ + Size: 1, + MaxRequestBytes: test.maxRequestBytesServer, + ClientMaxCallSendMsgSize: test.maxCallSendBytesClient, + ClientMaxCallRecvMsgSize: test.maxCallRecvBytesClient, + }, + ) + cli := clus.Client(0) + _, err := cli.Put(context.TODO(), "foo", strings.Repeat("a", test.valueSize)) + + if _, ok := err.(rpctypes.EtcdError); ok { + if err != test.expectError { + t.Errorf("#%d: expected %v, got %v", i, test.expectError, err) + } + } else if err != nil && err.Error() != test.expectError.Error() { + t.Errorf("#%d: expected %v, got %v", i, test.expectError, err) + } + + // put request went through, now expects large response back + if err == nil { + _, err = cli.Get(context.TODO(), "foo") + if err != nil { + t.Errorf("#%d: get expected no error, got %v", i, err) + } + } + + clus.Terminate(t) + } +} diff --git a/integration/cluster.go b/integration/cluster.go index 036c847da47..0b4114853ec 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -105,6 +105,9 @@ type ClusterConfig struct { GRPCKeepAliveTimeout time.Duration // SkipCreatingClient to skip creating clients for each member. SkipCreatingClient bool + + ClientMaxCallSendMsgSize int + ClientMaxCallRecvMsgSize int } type cluster struct { @@ -232,15 +235,17 @@ func (c *cluster) HTTPMembers() []client.Member { func (c *cluster) mustNewMember(t *testing.T) *member { m := mustNewMember(t, memberConfig{ - name: c.name(rand.Int()), - peerTLS: c.cfg.PeerTLS, - clientTLS: c.cfg.ClientTLS, - quotaBackendBytes: c.cfg.QuotaBackendBytes, - maxTxnOps: c.cfg.MaxTxnOps, - maxRequestBytes: c.cfg.MaxRequestBytes, - grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, - grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, - grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, + name: c.name(rand.Int()), + peerTLS: c.cfg.PeerTLS, + clientTLS: c.cfg.ClientTLS, + quotaBackendBytes: c.cfg.QuotaBackendBytes, + maxTxnOps: c.cfg.MaxTxnOps, + maxRequestBytes: c.cfg.MaxRequestBytes, + grpcKeepAliveMinTime: c.cfg.GRPCKeepAliveMinTime, + grpcKeepAliveInterval: c.cfg.GRPCKeepAliveInterval, + grpcKeepAliveTimeout: c.cfg.GRPCKeepAliveTimeout, + clientMaxCallSendMsgSize: c.cfg.ClientMaxCallSendMsgSize, + clientMaxCallRecvMsgSize: c.cfg.ClientMaxCallRecvMsgSize, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -501,21 +506,25 @@ type member struct { // serverClient is a clientv3 that directly calls the etcdserver. serverClient *clientv3.Client - keepDataDirTerminate bool + keepDataDirTerminate bool + clientMaxCallSendMsgSize int + clientMaxCallRecvMsgSize int } func (m *member) GRPCAddr() string { return m.grpcAddr } type memberConfig struct { - name string - peerTLS *transport.TLSInfo - clientTLS *transport.TLSInfo - quotaBackendBytes int64 - maxTxnOps uint - maxRequestBytes uint - grpcKeepAliveMinTime time.Duration - grpcKeepAliveInterval time.Duration - grpcKeepAliveTimeout time.Duration + name string + peerTLS *transport.TLSInfo + clientTLS *transport.TLSInfo + quotaBackendBytes int64 + maxTxnOps uint + maxRequestBytes uint + grpcKeepAliveMinTime time.Duration + grpcKeepAliveInterval time.Duration + grpcKeepAliveTimeout time.Duration + clientMaxCallSendMsgSize int + clientMaxCallRecvMsgSize int } // mustNewMember return an inited member with the given name. If peerTLS is @@ -587,6 +596,8 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { Timeout: mcfg.grpcKeepAliveTimeout, })) } + m.clientMaxCallSendMsgSize = mcfg.clientMaxCallSendMsgSize + m.clientMaxCallRecvMsgSize = mcfg.clientMaxCallRecvMsgSize m.InitialCorruptCheck = true @@ -630,8 +641,10 @@ func NewClientV3(m *member) (*clientv3.Client, error) { } cfg := clientv3.Config{ - Endpoints: []string{m.grpcAddr}, - DialTimeout: 5 * time.Second, + Endpoints: []string{m.grpcAddr}, + DialTimeout: 5 * time.Second, + MaxCallSendMsgSize: m.clientMaxCallSendMsgSize, + MaxCallRecvMsgSize: m.clientMaxCallRecvMsgSize, } if m.ClientTLSInfo != nil { From 1b3ed912a28ca855256710ff77d4a600ec6239d9 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 15:36:42 -0800 Subject: [PATCH 06/11] words: whitelist more Signed-off-by: Gyuho Lee --- .words | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.words b/.words index 7ca7100bdcb..71156c6e94d 100644 --- a/.words +++ b/.words @@ -1,8 +1,11 @@ +DefaultMaxRequestBytes ErrCodeEnhanceYourCalm ErrTimeout GoAway KeepAlive Keepalive +MiB +ResourceExhausted RPC RPCs TODO From 3d924aedc8e63454810493f8baf66ba94d1c024d Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 21:25:52 -0800 Subject: [PATCH 07/11] Documentation/upgrades: clean up 3.2, 3.3 guides Make headers consistent. Signed-off-by: Gyuho Lee --- Documentation/upgrades/upgrade_3_2.md | 91 +++++++++++++++++---------- Documentation/upgrades/upgrade_3_3.md | 40 ++++++------ 2 files changed, 79 insertions(+), 52 deletions(-) diff --git a/Documentation/upgrades/upgrade_3_2.md b/Documentation/upgrades/upgrade_3_2.md index f65081478e4..441a0c482e7 100644 --- a/Documentation/upgrades/upgrade_3_2.md +++ b/Documentation/upgrades/upgrade_3_2.md @@ -6,57 +6,38 @@ In the general case, upgrading from etcd 3.1 to 3.2 can be a zero-downtime, roll Before [starting an upgrade](#upgrade-procedure), read through the rest of this guide to prepare. -### Server upgrade checklists (breaking change) +### Upgrade checklists -3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`. +Highlighted breaking changes in 3.2. -See [issue #6336](https://github.com/coreos/etcd/issues/6336) for more contexts. +#### Change in gRPC dependency (>=3.2.10) -### Client upgrade checklists (>=3.2.0) +3.2.10 or later now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.5` (<=3.2.9 requires `v1.2.1`). -3.2 introduces two breaking changes. +##### Deprecate `grpclog.Logger` -Previously, `clientv3.Lease.TimeToLive` API returned `lease.ErrLeaseNotFound` on non-existent lease ID. 3.2 instead returns TTL=-1 in its response and no error (see [#7305](https://github.com/coreos/etcd/pull/7305)). +`grpclog.Logger` has been deprecated in favor of [`grpclog.LoggerV2`](https://github.com/grpc/grpc-go/blob/master/grpclog/loggerv2.go). `clientv3.Logger` is now `grpclog.LoggerV2`. Before ```go -// when leaseID does not exist -resp, err := TimeToLive(ctx, leaseID) -resp == nil -err == lease.ErrLeaseNotFound +import "github.com/coreos/etcd/clientv3" +clientv3.SetLogger(log.New(os.Stderr, "grpc: ", 0)) ``` After -```go -// when leaseID does not exist -resp, err := TimeToLive(ctx, leaseID) -resp.TTL == -1 -err == nil -``` - -`clientv3.NewFromConfigFile` is moved to `yaml.NewConfig`. - -Before - ```go import "github.com/coreos/etcd/clientv3" -clientv3.NewFromConfigFile -``` - -After +import "google.golang.org/grpc/grpclog" +clientv3.SetLogger(grpclog.NewLoggerV2(os.Stderr, os.Stderr, os.Stderr)) -```go -import clientv3yaml "github.com/coreos/etcd/clientv3/yaml" -clientv3yaml.NewConfig +// log.New above cannot be used (not implement grpclog.LoggerV2 interface) ``` -### Client upgrade checklists (>=3.2.10) - -Note that >=3.2.10 requires `grpc/grpc-go` v1.7.4 (<=3.2.9 with v1.2.1), which introduces some breaking changes. +##### Deprecate `grpc.ErrClientConnTimeout` -Previously, `grpc.ErrClientConnTimeout` error is returned on client dial time-outs. >=3.2.10 instead returns `context.DeadlineExceeded` (see [#8504](https://github.com/coreos/etcd/issues/8504)). +Previously, `grpc.ErrClientConnTimeout` error is returned on client dial time-outs. 3.2 instead returns `context.DeadlineExceeded` (see [#8504](https://github.com/coreos/etcd/issues/8504)). Before @@ -83,6 +64,52 @@ if err == context.DeadlineExceeded { } ``` +#### Change in `--listen-peer-urls` and `--listen-client-urls` + +3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`. + +See [issue #6336](https://github.com/coreos/etcd/issues/6336) for more contexts. + +#### Change in `clientv3.Lease.TimeToLive` API + +Previously, `clientv3.Lease.TimeToLive` API returned `lease.ErrLeaseNotFound` on non-existent lease ID. 3.2 instead returns TTL=-1 in its response and no error (see [#7305](https://github.com/coreos/etcd/pull/7305)). + +Before + +```go +// when leaseID does not exist +resp, err := TimeToLive(ctx, leaseID) +resp == nil +err == lease.ErrLeaseNotFound +``` + +After + +```go +// when leaseID does not exist +resp, err := TimeToLive(ctx, leaseID) +resp.TTL == -1 +err == nil +``` + +#### Change in `clientv3.NewFromConfigFile` + +`clientv3.NewFromConfigFile` is moved to `yaml.NewConfig`. + +Before + +```go +import "github.com/coreos/etcd/clientv3" +clientv3.NewFromConfigFile +``` + +After + +```go +import clientv3yaml "github.com/coreos/etcd/clientv3/yaml" +clientv3yaml.NewConfig +``` + ### Server upgrade checklists #### Upgrade requirements diff --git a/Documentation/upgrades/upgrade_3_3.md b/Documentation/upgrades/upgrade_3_3.md index 77b12b24b35..2e724a46438 100644 --- a/Documentation/upgrades/upgrade_3_3.md +++ b/Documentation/upgrades/upgrade_3_3.md @@ -111,23 +111,7 @@ curl -L http://localhost:2379/v3beta/kv/put \ Requests to `/v3alpha` endpoints will redirect to `/v3beta`, and `/v3alpha` will be removed in 3.4 release. -#### `gcr.io/etcd-development/etcd` as primary container registry - -etcd uses [`gcr.io/etcd-development/etcd`](https://gcr.io/etcd-development/etcd) as a primary container registry, and [`quay.io/coreos/etcd`](https://quay.io/coreos/etcd) as secondary. - -Before - -```bash -docker pull quay.io/coreos/etcd:v3.2.5 -``` - -After - -```bash -docker pull gcr.io/etcd-development/etcd:v3.3.0 -``` - -#### Change in `Snapshot` API error type +#### Change in clientv3 `Snapshot` API error type Previously, clientv3 `Snapshot` API returned raw [`grpc/*status.statusError`] type error. v3.3 now translates those errors to corresponding public error types, to be consistent with other APIs. @@ -173,7 +157,7 @@ _, err = io.Copy(f, rc) err == context.DeadlineExceeded ``` -#### Deprecate `golang.org/x/net/context` imports +#### Change in `golang.org/x/net/context` imports `clientv3` has deprecated `golang.org/x/net/context`. If a project vendors `golang.org/x/net/context` in other code (e.g. etcd generated protocol buffer code) and imports `github.com/coreos/etcd/clientv3`, it requires Go 1.9+ to compile. @@ -191,9 +175,9 @@ import "context" cli.Put(context.Background(), "f", "v") ``` -#### Upgrade grpc/grpc-go to `v1.7.4` +#### Change in gRPC dependency -3.3 now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.4`. +3.3 now requires [grpc/grpc-go](https://github.com/grpc/grpc-go/releases) `v1.7.5`. ##### Deprecate `grpclog.Logger` @@ -245,6 +229,22 @@ if err == context.DeadlineExceeded { } ``` +#### Change in official container registry + +etcd now uses [`gcr.io/etcd-development/etcd`](https://gcr.io/etcd-development/etcd) as a primary container registry, and [`quay.io/coreos/etcd`](https://quay.io/coreos/etcd) as secondary. + +Before + +```bash +docker pull quay.io/coreos/etcd:v3.2.5 +``` + +After + +```bash +docker pull gcr.io/etcd-development/etcd:v3.3.0 +``` + ### Server upgrade checklists #### Upgrade requirements From 6bfde98be7773c16db8251fd3ee534b0f72a3b05 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Tue, 19 Dec 2017 21:49:38 -0800 Subject: [PATCH 08/11] Documentation/upgrades: highlight request limit changes in v3.2, v3.3 Signed-off-by: Gyuho Lee --- Documentation/upgrades/upgrade_3_2.md | 73 +++++++++++++++++++++++++++ Documentation/upgrades/upgrade_3_3.md | 73 +++++++++++++++++++++++++++ 2 files changed, 146 insertions(+) diff --git a/Documentation/upgrades/upgrade_3_2.md b/Documentation/upgrades/upgrade_3_2.md index 441a0c482e7..67cab1ed9ab 100644 --- a/Documentation/upgrades/upgrade_3_2.md +++ b/Documentation/upgrades/upgrade_3_2.md @@ -64,6 +64,79 @@ if err == context.DeadlineExceeded { } ``` +#### Change in maximum request size limits (>=3.2.10) + +3.2.10 and 3.2.11 allow custom request size limits in server side. >=3.2.12 allows custom request size limits for both server and **client side**. + +Server-side request limits can be configured with `--max-request-bytes` flag: + +```bash +# limits request size to 1.5 KiB +etcd --max-request-bytes 1536 + +# client writes exceeding 1.5 KiB will be rejected +etcdctl put foo [LARGE VALUE...] +# etcdserver: request is too large +``` + +Or configure `embed.Config.MaxRequestBytes` field: + +```go +import "github.com/coreos/etcd/embed" +import "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + +// limit requests to 5 MiB +cfg := embed.NewConfig() +cfg.MaxRequestBytes = 5 * 1024 * 1024 + +// client writes exceeding 5 MiB will be rejected +_, err := cli.Put(ctx, "foo", [LARGE VALUE...]) +err == rpctypes.ErrRequestTooLarge +``` + +**If not specified, server-side limit defaults to 1.5 MiB**. + +Client-side request limits must be configured based on server-side limits. + +```bash +# limits request size to 1 MiB +etcd --max-request-bytes 1048576 +``` + +```go +import "github.com/coreos/etcd/clientv3" + +cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{"127.0.0.1:2379"}, + MaxCallSendMsgSize: 2 * 1024 * 1024, + MaxCallRecvMsgSize: 3 * 1024 * 1024, +}) + + +// client writes exceeding "--max-request-bytes" will be rejected from etcd server +_, err := cli.Put(ctx, "foo", strings.Repeat("a", 1*1024*1024+5)) +err == rpctypes.ErrRequestTooLarge + + +// client writes exceeding "MaxCallSendMsgSize" will be rejected from client-side +_, err = cli.Put(ctx, "foo", strings.Repeat("a", 5*1024*1024)) +err.Error() == "rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (5242890 vs. 2097152)" + + +// some writes under limits +for i := range []int{0,1,2,3,4} { + _, err = cli.Put(ctx, fmt.Sprintf("foo%d", i), strings.Repeat("a", 1*1024*1024-500)) + if err != nil { + panic(err) + } +} +// client reads exceeding "MaxCallRecvMsgSize" will be rejected from client-side +_, err = cli.Get(ctx, "foo", clientv3.WithPrefix()) +err.Error() == "rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5240509 vs. 3145728)" +``` + +**If not specified, client-side send limit defaults to 2 MiB (1.5 MiB + gRPC overhead bytes) and receive limit to `math.MaxInt32`**. Please see [clientv3 godoc](https://godoc.org/github.com/coreos/etcd/clientv3#Config) for more detail. + #### Change in `--listen-peer-urls` and `--listen-client-urls` 3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`. diff --git a/Documentation/upgrades/upgrade_3_3.md b/Documentation/upgrades/upgrade_3_3.md index 2e724a46438..95ee6c64204 100644 --- a/Documentation/upgrades/upgrade_3_3.md +++ b/Documentation/upgrades/upgrade_3_3.md @@ -111,6 +111,79 @@ curl -L http://localhost:2379/v3beta/kv/put \ Requests to `/v3alpha` endpoints will redirect to `/v3beta`, and `/v3alpha` will be removed in 3.4 release. +#### Change in maximum request size limits + +3.3 now allows custom request size limits for both server and **client side**. + +Server-side request limits can be configured with `--max-request-bytes` flag: + +```bash +# limits request size to 1.5 KiB +etcd --max-request-bytes 1536 + +# client writes exceeding 1.5 KiB will be rejected +etcdctl put foo [LARGE VALUE...] +# etcdserver: request is too large +``` + +Or configure `embed.Config.MaxRequestBytes` field: + +```go +import "github.com/coreos/etcd/embed" +import "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + +// limit requests to 5 MiB +cfg := embed.NewConfig() +cfg.MaxRequestBytes = 5 * 1024 * 1024 + +// client writes exceeding 5 MiB will be rejected +_, err := cli.Put(ctx, "foo", [LARGE VALUE...]) +err == rpctypes.ErrRequestTooLarge +``` + +**If not specified, server-side limit defaults to 1.5 MiB**. + +Client-side request limits must be configured based on server-side limits. + +```bash +# limits request size to 1 MiB +etcd --max-request-bytes 1048576 +``` + +```go +import "github.com/coreos/etcd/clientv3" + +cli, _ := clientv3.New(clientv3.Config{ + Endpoints: []string{"127.0.0.1:2379"}, + MaxCallSendMsgSize: 2 * 1024 * 1024, + MaxCallRecvMsgSize: 3 * 1024 * 1024, +}) + + +// client writes exceeding "--max-request-bytes" will be rejected from etcd server +_, err := cli.Put(ctx, "foo", strings.Repeat("a", 1*1024*1024+5)) +err == rpctypes.ErrRequestTooLarge + + +// client writes exceeding "MaxCallSendMsgSize" will be rejected from client-side +_, err = cli.Put(ctx, "foo", strings.Repeat("a", 5*1024*1024)) +err.Error() == "rpc error: code = ResourceExhausted desc = grpc: trying to send message larger than max (5242890 vs. 2097152)" + + +// some writes under limits +for i := range []int{0,1,2,3,4} { + _, err = cli.Put(ctx, fmt.Sprintf("foo%d", i), strings.Repeat("a", 1*1024*1024-500)) + if err != nil { + panic(err) + } +} +// client reads exceeding "MaxCallRecvMsgSize" will be rejected from client-side +_, err = cli.Get(ctx, "foo", clientv3.WithPrefix()) +err.Error() == "rpc error: code = ResourceExhausted desc = grpc: received message larger than max (5240509 vs. 3145728)" +``` + +**If not specified, client-side send limit defaults to 2 MiB (1.5 MiB + gRPC overhead bytes) and receive limit to `math.MaxInt32`**. Please see [clientv3 godoc](https://godoc.org/github.com/coreos/etcd/clientv3#Config) for more detail. + #### Change in clientv3 `Snapshot` API error type Previously, clientv3 `Snapshot` API returned raw [`grpc/*status.statusError`] type error. v3.3 now translates those errors to corresponding public error types, to be consistent with other APIs. From 88fe8de99b5f5b5a4cd14a2221016d60b04d25da Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 20 Dec 2017 01:01:34 -0800 Subject: [PATCH 09/11] clientv3/integration: fix TestKVPutError Signed-off-by: Gyuho Lee --- clientv3/integration/kv_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 805f3d42038..43bfa3cbf7e 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -40,7 +40,7 @@ func TestKVPutError(t *testing.T) { maxReqBytes = 1.5 * 1024 * 1024 // hard coded max in v3_server.go quota = int64(int(maxReqBytes) + 8*os.Getpagesize()) ) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, QuotaBackendBytes: quota, ClientMaxCallSendMsgSize: 100 * 1024 * 1024}) defer clus.Terminate(t) kv := clus.RandClient() From 3d56045da0109fa43d9b4700e80db7a73e822868 Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 20 Dec 2017 04:23:03 -0800 Subject: [PATCH 10/11] integration: bump up wait leader timeout for slow CIs Signed-off-by: Gyuho Lee --- integration/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/cluster.go b/integration/cluster.go index 0b4114853ec..0b3e6e8ab59 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -387,7 +387,7 @@ func (c *cluster) waitLeader(t *testing.T, membs []*member) int { // ensure leader is up via linearizable get for { - ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration) + ctx, cancel := context.WithTimeout(context.Background(), 10*tickDuration+time.Second) _, err := kapi.Get(ctx, "0", &client.GetOptions{Quorum: true}) cancel() if err == nil || strings.Contains(err.Error(), "Key not found") { From 3c5eb4f4fee3c6cc15f4ca9ab57102528aed259e Mon Sep 17 00:00:00 2001 From: Gyuho Lee Date: Wed, 20 Dec 2017 11:05:17 -0800 Subject: [PATCH 11/11] Documentation/upgrades: highlight raw gRPC client wrapper changes Signed-off-by: Gyuho Lee --- Documentation/upgrades/upgrade_3_2.md | 23 +++++++++++++++++++++++ Documentation/upgrades/upgrade_3_3.md | 23 +++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/Documentation/upgrades/upgrade_3_2.md b/Documentation/upgrades/upgrade_3_2.md index 67cab1ed9ab..60e3c717048 100644 --- a/Documentation/upgrades/upgrade_3_2.md +++ b/Documentation/upgrades/upgrade_3_2.md @@ -137,6 +137,29 @@ err.Error() == "rpc error: code = ResourceExhausted desc = grpc: received messag **If not specified, client-side send limit defaults to 2 MiB (1.5 MiB + gRPC overhead bytes) and receive limit to `math.MaxInt32`**. Please see [clientv3 godoc](https://godoc.org/github.com/coreos/etcd/clientv3#Config) for more detail. +#### Change in raw gRPC client wrappers + +3.2.12 or later changes the function signatures of `clientv3` gRPC client wrapper. This change was needed to support [custom `grpc.CallOption` on message size limits](https://github.com/coreos/etcd/pull/9047). + +Before and after + +```diff +-func NewKVFromKVClient(remote pb.KVClient) KV { ++func NewKVFromKVClient(remote pb.KVClient, c *Client) KV { + +-func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster { ++func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster { + +-func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease { ++func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease { + +-func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance { ++func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { + +-func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { ++func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { +``` + #### Change in `--listen-peer-urls` and `--listen-client-urls` 3.2 now rejects domains names for `--listen-peer-urls` and `--listen-client-urls` (3.1 only prints out warnings), since domain name is invalid for network interface binding. Make sure that those URLs are properly formated as `scheme://IP:port`. diff --git a/Documentation/upgrades/upgrade_3_3.md b/Documentation/upgrades/upgrade_3_3.md index 95ee6c64204..bb663955af8 100644 --- a/Documentation/upgrades/upgrade_3_3.md +++ b/Documentation/upgrades/upgrade_3_3.md @@ -184,6 +184,29 @@ err.Error() == "rpc error: code = ResourceExhausted desc = grpc: received messag **If not specified, client-side send limit defaults to 2 MiB (1.5 MiB + gRPC overhead bytes) and receive limit to `math.MaxInt32`**. Please see [clientv3 godoc](https://godoc.org/github.com/coreos/etcd/clientv3#Config) for more detail. +#### Change in raw gRPC client wrappers + +3.3 changes the function signatures of `clientv3` gRPC client wrapper. This change was needed to support [custom `grpc.CallOption` on message size limits](https://github.com/coreos/etcd/pull/9047). + +Before and after + +```diff +-func NewKVFromKVClient(remote pb.KVClient) KV { ++func NewKVFromKVClient(remote pb.KVClient, c *Client) KV { + +-func NewClusterFromClusterClient(remote pb.ClusterClient) Cluster { ++func NewClusterFromClusterClient(remote pb.ClusterClient, c *Client) Cluster { + +-func NewLeaseFromLeaseClient(remote pb.LeaseClient, keepAliveTimeout time.Duration) Lease { ++func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease { + +-func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient) Maintenance { ++func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance { + +-func NewWatchFromWatchClient(wc pb.WatchClient) Watcher { ++func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { +``` + #### Change in clientv3 `Snapshot` API error type Previously, clientv3 `Snapshot` API returned raw [`grpc/*status.statusError`] type error. v3.3 now translates those errors to corresponding public error types, to be consistent with other APIs.