From ae7ddfb4838f30da1ef04ee48cb89170d7c3268f Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Tue, 23 May 2017 17:50:20 -0700 Subject: [PATCH 1/2] etcdserver: add --max-txn-ops flag --max-txn-ops allows users to define the maximum transaction operations for each txn request. it defaults at 128. Fixes #7826 --- clientv3/integration/txn_test.go | 2 +- embed/config.go | 3 +++ embed/etcd.go | 1 + etcdmain/config.go | 1 + etcdmain/help.go | 2 ++ etcdserver/api/v3rpc/key.go | 18 ++++++++++-------- etcdserver/config.go | 1 + integration/v3_grpc_test.go | 2 +- 8 files changed, 20 insertions(+), 10 deletions(-) diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index b5cdea7e64f..1634a65da1b 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -41,7 +41,7 @@ func TestTxnError(t *testing.T) { t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err) } - ops := make([]clientv3.Op, v3rpc.MaxOpsPerTxn+10) + ops := make([]clientv3.Op, v3rpc.MaxTxnOps+10) for i := range ops { ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "") } diff --git a/embed/config.go b/embed/config.go index e3926f66cb4..0d88a9688d4 100644 --- a/embed/config.go +++ b/embed/config.go @@ -40,6 +40,7 @@ const ( DefaultName = "default" DefaultMaxSnapshots = 5 DefaultMaxWALs = 5 + DefaultMaxTxnOps = uint(128) DefaultListenPeerURLs = "http://localhost:2380" DefaultListenClientURLs = "http://localhost:2379" @@ -85,6 +86,7 @@ type Config struct { TickMs uint `json:"heartbeat-interval"` ElectionMs uint `json:"election-timeout"` QuotaBackendBytes int64 `json:"quota-backend-bytes"` + MaxTxnOps uint `json:"max-txn-ops"` // clustering @@ -172,6 +174,7 @@ func NewConfig() *Config { MaxWalFiles: DefaultMaxWALs, Name: DefaultName, SnapCount: etcdserver.DefaultSnapCount, + MaxTxnOps: DefaultMaxTxnOps, TickMs: 100, ElectionMs: 1000, LPUrls: []url.URL{*lpurl}, diff --git a/embed/etcd.go b/embed/etcd.go index f77dee0e065..18b2c905d7e 100644 --- a/embed/etcd.go +++ b/embed/etcd.go @@ -139,6 +139,7 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) { ElectionTicks: cfg.ElectionTicks(), AutoCompactionRetention: cfg.AutoCompactionRetention, QuotaBackendBytes: cfg.QuotaBackendBytes, + MaxTxnOps: cfg.MaxTxnOps, StrictReconfigCheck: cfg.StrictReconfigCheck, ClientCertAuthEnabled: cfg.ClientTLSInfo.ClientCertAuth, AuthToken: cfg.AuthToken, diff --git a/etcdmain/config.go b/etcdmain/config.go index b8732200ae6..88bae686c92 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -138,6 +138,7 @@ func newConfig() *config { fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.") fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.") fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") + fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum operations per txn that etcd server allows; defaults to 128.") // clustering fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.") diff --git a/etcdmain/help.go b/etcdmain/help.go index cd9282a3192..02e6707ddad 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -66,6 +66,8 @@ member flags: comma-separated whitelist of origins for CORS (cross-origin resource sharing). --quota-backend-bytes '0' raise alarms when backend size exceeds the given quota (0 defaults to low space quota). + --max-txn-ops '128' + maximum operations per txn that etcd server allows; defaults to 128. clustering flags: diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index 8acae5983c0..de03556714b 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -27,19 +27,20 @@ import ( var ( plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver/api/v3rpc") - - // Max operations per txn list. For example, Txn.Success can have at most 128 operations, - // and Txn.Failure can have at most 128 operations. - MaxOpsPerTxn = 128 ) type kvServer struct { hdr header kv etcdserver.RaftKV + // maxTxnOps is the max operations per txn. + // e.g suppose maxTxnOps = 128. + // Txn.Success can have at most 128 operations, + // and Txn.Failure can have at most 128 operations. + maxTxnOps uint } func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer { - return &kvServer{hdr: newHeader(s), kv: s} + return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps} } func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) { @@ -94,7 +95,7 @@ func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (* } func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) { - if err := checkTxnRequest(r); err != nil { + if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil { return nil, err } @@ -150,8 +151,9 @@ func checkDeleteRequest(r *pb.DeleteRangeRequest) error { return nil } -func checkTxnRequest(r *pb.TxnRequest) error { - if len(r.Compare) > MaxOpsPerTxn || len(r.Success) > MaxOpsPerTxn || len(r.Failure) > MaxOpsPerTxn { +func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error { + plog.Infof("maxTxnOps %v", maxTxnOps) + if len(r.Compare) > maxTxnOps || len(r.Success) > maxTxnOps || len(r.Failure) > maxTxnOps { return rpctypes.ErrGRPCTooManyOps } diff --git a/etcdserver/config.go b/etcdserver/config.go index 9c258934a1c..d0c63c00694 100644 --- a/etcdserver/config.go +++ b/etcdserver/config.go @@ -54,6 +54,7 @@ type ServerConfig struct { AutoCompactionRetention int QuotaBackendBytes int64 + MaxTxnOps uint StrictReconfigCheck bool diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index ac6d66467e4..b5c88997d9e 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -201,7 +201,7 @@ func TestV3TxnTooManyOps(t *testing.T) { for i, tt := range tests { txn := &pb.TxnRequest{} - for j := 0; j < v3rpc.MaxOpsPerTxn+1; j++ { + for j := 0; j < v3rpc.MaxTxnOps+1; j++ { tt(txn) } From e9f464debc8dc34b1237edd88b8c56c8b2601f31 Mon Sep 17 00:00:00 2001 From: fanmin shi Date: Wed, 24 May 2017 11:04:17 -0700 Subject: [PATCH 2/2] integration: creation of cluster now takes maxTxnOps --- clientv3/integration/txn_test.go | 4 ++-- etcdmain/config.go | 2 +- etcdmain/help.go | 2 +- etcdserver/api/v3rpc/key.go | 1 - integration/cluster.go | 8 ++++++++ integration/v3_grpc_test.go | 6 +++--- 6 files changed, 15 insertions(+), 8 deletions(-) diff --git a/clientv3/integration/txn_test.go b/clientv3/integration/txn_test.go index 1634a65da1b..064be976dd9 100644 --- a/clientv3/integration/txn_test.go +++ b/clientv3/integration/txn_test.go @@ -20,7 +20,7 @@ import ( "time" "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/etcdserver/api/v3rpc" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/integration" "github.com/coreos/etcd/pkg/testutil" @@ -41,7 +41,7 @@ func TestTxnError(t *testing.T) { t.Fatalf("expected %v, got %v", rpctypes.ErrDuplicateKey, err) } - ops := make([]clientv3.Op, v3rpc.MaxTxnOps+10) + ops := make([]clientv3.Op, int(embed.DefaultMaxTxnOps+10)) for i := range ops { ops[i] = clientv3.OpPut(fmt.Sprintf("foo%d", i), "") } diff --git a/etcdmain/config.go b/etcdmain/config.go index 88bae686c92..2c46e1db3be 100644 --- a/etcdmain/config.go +++ b/etcdmain/config.go @@ -138,7 +138,7 @@ func newConfig() *config { fs.UintVar(&cfg.TickMs, "heartbeat-interval", cfg.TickMs, "Time (in milliseconds) of a heartbeat interval.") fs.UintVar(&cfg.ElectionMs, "election-timeout", cfg.ElectionMs, "Time (in milliseconds) for an election to timeout.") fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", cfg.QuotaBackendBytes, "Raise alarms when backend size exceeds the given quota. 0 means use the default quota.") - fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum operations per txn that etcd server allows; defaults to 128.") + fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", cfg.MaxTxnOps, "Maximum number of operations permitted in a transaction.") // clustering fs.Var(flags.NewURLsValue(embed.DefaultInitialAdvertisePeerURLs), "initial-advertise-peer-urls", "List of this member's peer URLs to advertise to the rest of the cluster.") diff --git a/etcdmain/help.go b/etcdmain/help.go index 02e6707ddad..b3653887aa4 100644 --- a/etcdmain/help.go +++ b/etcdmain/help.go @@ -67,7 +67,7 @@ member flags: --quota-backend-bytes '0' raise alarms when backend size exceeds the given quota (0 defaults to low space quota). --max-txn-ops '128' - maximum operations per txn that etcd server allows; defaults to 128. + maximum number of operations permitted in a transaction. clustering flags: diff --git a/etcdserver/api/v3rpc/key.go b/etcdserver/api/v3rpc/key.go index de03556714b..b2ae1adeeed 100644 --- a/etcdserver/api/v3rpc/key.go +++ b/etcdserver/api/v3rpc/key.go @@ -152,7 +152,6 @@ func checkDeleteRequest(r *pb.DeleteRangeRequest) error { } func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error { - plog.Infof("maxTxnOps %v", maxTxnOps) if len(r.Compare) > maxTxnOps || len(r.Success) > maxTxnOps || len(r.Failure) > maxTxnOps { return rpctypes.ErrGRPCTooManyOps } diff --git a/integration/cluster.go b/integration/cluster.go index 3278bc232dd..38868694ccf 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -36,6 +36,7 @@ import ( "github.com/coreos/etcd/client" "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/embed" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v2http" "github.com/coreos/etcd/etcdserver/api/v3client" @@ -93,6 +94,7 @@ type ClusterConfig struct { DiscoveryURL string UseGRPC bool QuotaBackendBytes int64 + MaxTxnOps uint } type cluster struct { @@ -224,6 +226,7 @@ func (c *cluster) mustNewMember(t *testing.T) *member { peerTLS: c.cfg.PeerTLS, clientTLS: c.cfg.ClientTLS, quotaBackendBytes: c.cfg.QuotaBackendBytes, + maxTxnOps: c.cfg.MaxTxnOps, }) m.DiscoveryURL = c.cfg.DiscoveryURL if c.cfg.UseGRPC { @@ -490,6 +493,7 @@ type memberConfig struct { peerTLS *transport.TLSInfo clientTLS *transport.TLSInfo quotaBackendBytes int64 + maxTxnOps uint } // mustNewMember return an inited member with the given name. If peerTLS is @@ -537,6 +541,10 @@ func mustNewMember(t *testing.T, mcfg memberConfig) *member { m.ElectionTicks = electionTicks m.TickMs = uint(tickDuration / time.Millisecond) m.QuotaBackendBytes = mcfg.quotaBackendBytes + m.MaxTxnOps = mcfg.maxTxnOps + if m.MaxTxnOps == 0 { + m.MaxTxnOps = embed.DefaultMaxTxnOps + } m.AuthToken = "simple" // for the purpose of integration testing, simple token is enough return m } diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index b5c88997d9e..87acdb1c3b6 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -25,7 +25,6 @@ import ( "time" "github.com/coreos/etcd/clientv3" - "github.com/coreos/etcd/etcdserver/api/v3rpc" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" pb "github.com/coreos/etcd/etcdserver/etcdserverpb" "github.com/coreos/etcd/pkg/testutil" @@ -150,7 +149,8 @@ func TestV3CompactCurrentRev(t *testing.T) { func TestV3TxnTooManyOps(t *testing.T) { defer testutil.AfterTest(t) - clus := NewClusterV3(t, &ClusterConfig{Size: 3}) + maxTxnOps := uint(128) + clus := NewClusterV3(t, &ClusterConfig{Size: 3, MaxTxnOps: maxTxnOps}) defer clus.Terminate(t) kvc := toGRPC(clus.RandClient()).KV @@ -201,7 +201,7 @@ func TestV3TxnTooManyOps(t *testing.T) { for i, tt := range tests { txn := &pb.TxnRequest{} - for j := 0; j < v3rpc.MaxTxnOps+1; j++ { + for j := 0; j < int(maxTxnOps+1); j++ { tt(txn) }