Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
.*: fix etcd txn limit for lots of sources (#1852)
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored Jul 14, 2021
1 parent 147f5db commit 8089f1a
Show file tree
Hide file tree
Showing 14 changed files with 74 additions and 17 deletions.
7 changes: 7 additions & 0 deletions dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
defaultInitialClusterState = embed.ClusterStateFlagNew
defaultAutoCompactionMode = "periodic"
defaultAutoCompactionRetention = "1h"
defaultMaxTxnOps = 2048
defaultQuotaBackendBytes = 2 * 1024 * 1024 * 1024 // 2GB
quotaBackendBytesLowerBound = 500 * 1024 * 1024 // 500MB
)
Expand Down Expand Up @@ -78,6 +79,8 @@ func NewConfig() *Config {
fs.StringVar(&cfg.PeerUrls, "peer-urls", defaultPeerUrls, "URLs for peer traffic")
fs.StringVar(&cfg.AdvertisePeerUrls, "advertise-peer-urls", "", `advertise URLs for peer traffic (default "${peer-urls}")`)
fs.StringVar(&cfg.Join, "join", "", `join to an existing cluster (usage: cluster's "${master-addr}" list, e.g. "127.0.0.1:8261,127.0.0.1:18261"`)
fs.UintVar(&cfg.MaxTxnOps, "max-txn-ops", defaultMaxTxnOps, `etcd's max-txn-ops, default value is 2048`)
fs.UintVar(&cfg.MaxRequestBytes, "max-request-bytes", embed.DefaultMaxRequestBytes, `etcd's max-request-bytes`)
fs.StringVar(&cfg.AutoCompactionMode, "auto-compaction-mode", defaultAutoCompactionMode, `etcd's auto-compaction-mode, either 'periodic' or 'revision'`)
fs.StringVar(&cfg.AutoCompactionRetention, "auto-compaction-retention", defaultAutoCompactionRetention, `etcd's auto-compaction-retention, accept values like '5h' or '5' (5 hours in 'periodic' mode or 5 revisions in 'revision')`)
fs.Int64Var(&cfg.QuotaBackendBytes, "quota-backend-bytes", defaultQuotaBackendBytes, `etcd's storage quota in bytes`)
Expand Down Expand Up @@ -121,6 +124,8 @@ type Config struct {
InitialCluster string `toml:"initial-cluster" json:"initial-cluster"`
InitialClusterState string `toml:"initial-cluster-state" json:"initial-cluster-state"`
Join string `toml:"join" json:"join"` // cluster's client address (endpoints), not peer address
MaxTxnOps uint `toml:"max-txn-ops" json:"max-txn-ops"`
MaxRequestBytes uint `toml:"max-request-bytes" json:"max-request-bytes"`
AutoCompactionMode string `toml:"auto-compaction-mode" json:"auto-compaction-mode"`
AutoCompactionRetention string `toml:"auto-compaction-retention" json:"auto-compaction-retention"`
QuotaBackendBytes int64 `toml:"quota-backend-bytes" json:"quota-backend-bytes"`
Expand Down Expand Up @@ -363,6 +368,8 @@ func (c *Config) genEmbedEtcdConfig(cfg *embed.Config) (*embed.Config, error) {
cfg.AutoCompactionMode = c.AutoCompactionMode
cfg.AutoCompactionRetention = c.AutoCompactionRetention
cfg.QuotaBackendBytes = c.QuotaBackendBytes
cfg.MaxTxnOps = c.MaxTxnOps
cfg.MaxRequestBytes = c.MaxRequestBytes

err = cfg.Validate() // verify & trigger the builder
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion pkg/election/election.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,9 @@ func (e *Election) watchLeader(ctx context.Context, session *concurrency.Session
e.campaignMu.Unlock()
}()

wch := e.cli.Watch(ctx, key)
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
wch := e.cli.Watch(wCtx, key)

for {
if e.evictLeader.Load() {
Expand Down
7 changes: 4 additions & 3 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,9 +259,10 @@ func GetSourceBoundConfig(cli *clientv3.Client, worker string) (SourceBound, *co
// WatchSourceBound watches PUT & DELETE operations for the bound relationship of the specified DM-worker.
// For the DELETE operations, it returns an empty bound relationship.
// nolint:dupl
func WatchSourceBound(ctx context.Context, cli *clientv3.Client,
worker string, revision int64, outCh chan<- SourceBound, errCh chan<- error) {
ch := cli.Watch(ctx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker), clientv3.WithRev(revision))
func WatchSourceBound(ctx context.Context, cli *clientv3.Client, worker string, revision int64, outCh chan<- SourceBound, errCh chan<- error) {
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := cli.Watch(wCtx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker), clientv3.WithRev(revision))

for {
select {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ha/keepalive.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,9 @@ func revokeLease(cli *clientv3.Client, id clientv3.LeaseID) (*clientv3.LeaseRevo
// this function will output the worker event to evCh, output the error to errCh.
func WatchWorkerEvent(ctx context.Context, cli *clientv3.Client, rev int64, outCh chan<- WorkerEvent, errCh chan<- error) {
watcher := clientv3.NewWatcher(cli)
ch := watcher.Watch(ctx, common.WorkerKeepAliveKeyAdapter.Path(), clientv3.WithPrefix(), clientv3.WithRev(rev))
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := watcher.Watch(wCtx, common.WorkerKeepAliveKeyAdapter.Path(), clientv3.WithPrefix(), clientv3.WithRev(rev))

for {
select {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ha/load_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func GetAllLoadTask(cli *clientv3.Client) (map[string]map[string]string, int64,
// This function should often be called by DM-master.
func WatchLoadTask(ctx context.Context, cli *clientv3.Client, revision int64,
outCh chan<- LoadTask, errCh chan<- error) {
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
// NOTE: WithPrevKV used to get a valid `ev.PrevKv` for deletion.
ch := cli.Watch(ctx, common.LoadTaskKeyAdapter.Path(),
ch := cli.Watch(wCtx, common.LoadTaskKeyAdapter.Path(),
clientv3.WithPrefix(), clientv3.WithRev(revision), clientv3.WithPrevKV())

for {
Expand Down
4 changes: 3 additions & 1 deletion pkg/ha/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,9 @@ func deleteRelayConfigOp(worker string) clientv3.Op {
// For the DELETE operations, it returns an nil source config.
func WatchRelayConfig(ctx context.Context, cli *clientv3.Client,
worker string, revision int64, outCh chan<- RelaySource, errCh chan<- error) {
ch := cli.Watch(ctx, common.UpstreamRelayWorkerKeyAdapter.Encode(worker), clientv3.WithRev(revision))
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := cli.Watch(wCtx, common.UpstreamRelayWorkerKeyAdapter.Encode(worker), clientv3.WithRev(revision))

for {
select {
Expand Down
8 changes: 6 additions & 2 deletions pkg/ha/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,15 +252,19 @@ func GetSubTaskStageConfig(cli *clientv3.Client, source string) (map[string]Stag
// for the DELETE stage, it returns an empty stage.
func WatchRelayStage(ctx context.Context, cli *clientv3.Client,
source string, revision int64, outCh chan<- Stage, errCh chan<- error) {
ch := cli.Watch(ctx, common.StageRelayKeyAdapter.Encode(source), clientv3.WithRev(revision))
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := cli.Watch(wCtx, common.StageRelayKeyAdapter.Encode(source), clientv3.WithRev(revision))
watchStage(ctx, ch, relayStageFromKey, outCh, errCh)
}

// WatchSubTaskStage watches PUT & DELETE operations for the subtask stage.
// for the DELETE stage, it returns an empty stage.
func WatchSubTaskStage(ctx context.Context, cli *clientv3.Client,
source string, revision int64, outCh chan<- Stage, errCh chan<- error) {
ch := cli.Watch(ctx, common.StageSubTaskKeyAdapter.Encode(source), clientv3.WithPrefix(), clientv3.WithRev(revision))
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := cli.Watch(wCtx, common.StageSubTaskKeyAdapter.Encode(source), clientv3.WithPrefix(), clientv3.WithRev(revision))
watchStage(ctx, ch, subTaskStageFromKey, outCh, errCh)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/shardddl/optimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,10 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]map[string]map[stri
// This function should often be called by DM-master.
func WatchInfo(ctx context.Context, cli *clientv3.Client, revision int64,
outCh chan<- Info, errCh chan<- error) {
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
// NOTE: WithPrevKV used to get a valid `ev.PrevKv` for deletion.
ch := cli.Watch(ctx, common.ShardDDLOptimismInfoKeyAdapter.Path(),
ch := cli.Watch(wCtx, common.ShardDDLOptimismInfoKeyAdapter.Path(),
clientv3.WithPrefix(), clientv3.WithRev(revision), clientv3.WithPrevKV())

for {
Expand Down
6 changes: 4 additions & 2 deletions pkg/shardddl/optimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,14 @@ func WatchOperationPut(ctx context.Context, cli *clientv3.Client,
task, source, upSchema, upTable string, revision int64,
outCh chan<- Operation, errCh chan<- error) {
var ch clientv3.WatchChan
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
// caller may use empty keys to expect a prefix watch
if upTable == "" {
ch = cli.Watch(ctx, common.ShardDDLOptimismOperationKeyAdapter.Path(), clientv3.WithPrefix(),
ch = cli.Watch(wCtx, common.ShardDDLOptimismOperationKeyAdapter.Path(), clientv3.WithPrefix(),
clientv3.WithRev(revision))
} else {
ch = cli.Watch(ctx, common.ShardDDLOptimismOperationKeyAdapter.Encode(task, source, upSchema, upTable),
ch = cli.Watch(wCtx, common.ShardDDLOptimismOperationKeyAdapter.Encode(task, source, upSchema, upTable),
clientv3.WithRev(revision))
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/shardddl/optimism/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ func GetAllSourceTables(cli *clientv3.Client) (map[string]map[string]SourceTable
// This function should often be called by DM-master.
func WatchSourceTables(ctx context.Context, cli *clientv3.Client, revision int64,
outCh chan<- SourceTables, errCh chan<- error) {
ch := cli.Watch(ctx, common.ShardDDLOptimismSourceTablesKeyAdapter.Path(),
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := cli.Watch(wCtx, common.ShardDDLOptimismSourceTablesKeyAdapter.Path(),
clientv3.WithPrefix(), clientv3.WithRev(revision))

for {
Expand Down
4 changes: 3 additions & 1 deletion pkg/shardddl/pessimism/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,9 @@ func GetAllInfo(cli *clientv3.Client) (map[string]map[string]Info, int64, error)
// WatchInfoPut watches PUT operations for info.
// This function should often be called by DM-master.
func WatchInfoPut(ctx context.Context, cli *clientv3.Client, revision int64, outCh chan<- Info, errCh chan<- error) {
ch := cli.Watch(ctx, common.ShardDDLPessimismInfoKeyAdapter.Path(),
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
ch := cli.Watch(wCtx, common.ShardDDLPessimismInfoKeyAdapter.Path(),
clientv3.WithPrefix(), clientv3.WithRev(revision))

for {
Expand Down
6 changes: 4 additions & 2 deletions pkg/shardddl/pessimism/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,15 @@ func WatchOperationDelete(ctx context.Context, cli *clientv3.Client, task, sourc
func watchOperation(ctx context.Context, cli *clientv3.Client, watchType mvccpb.Event_EventType,
task, source string, revision int64,
outCh chan<- Operation, errCh chan<- error) {
wCtx, cancel := context.WithCancel(ctx)
defer cancel()
var ch clientv3.WatchChan
// caller may use empty keys to expect a prefix watch
if source == "" {
ch = cli.Watch(ctx, common.ShardDDLPessimismOperationKeyAdapter.Path(), clientv3.WithPrefix(),
ch = cli.Watch(wCtx, common.ShardDDLPessimismOperationKeyAdapter.Path(), clientv3.WithPrefix(),
clientv3.WithRev(revision), clientv3.WithPrevKV())
} else {
ch = cli.Watch(ctx, common.ShardDDLPessimismOperationKeyAdapter.Encode(task, source),
ch = cli.Watch(wCtx, common.ShardDDLPessimismOperationKeyAdapter.Encode(task, source),
clientv3.WithRev(revision), clientv3.WithPrevKV())
}

Expand Down
27 changes: 26 additions & 1 deletion pkg/upgrade/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package upgrade

import (
"context"
"fmt"
"testing"

. "github.com/pingcap/check"
Expand All @@ -24,14 +25,22 @@ import (
"github.com/pingcap/dm/dm/common"
)

var etcdTestCli *clientv3.Client
var (
etcdTestCli *clientv3.Client
bigTxnTestCli *clientv3.Client
)

func TestUpgrade(t *testing.T) {
mockCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer mockCluster.Terminate(t)

etcdTestCli = mockCluster.RandClient()

bigCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1, MaxTxnOps: 2048})
defer bigCluster.Terminate(t)

bigTxnTestCli = bigCluster.RandClient()

TestingT(t)
}

Expand Down Expand Up @@ -136,4 +145,20 @@ func (t *testForEtcd) TestUpgradeToVer3(c *C) {
c.Assert(err, IsNil)
c.Assert(resp.Kvs, HasLen, 1)
c.Assert(string(resp.Kvs[0].Value), Equals, newVal)

for i := 0; i < 500; i++ {
key := common.UpstreamConfigKeyAdapterV1.Encode(fmt.Sprintf("%s-%d", source, i))
val := fmt.Sprintf("%s-%d", oldVal, i)
_, err := etcdTestCli.Put(ctx, key, val)
c.Assert(err, IsNil)
}
c.Assert(upgradeToVer3(ctx, etcdTestCli), ErrorMatches, ".*too many operations in txn request.*")

for i := 0; i < 1000; i++ {
key := common.UpstreamConfigKeyAdapterV1.Encode(fmt.Sprintf("%s-%d", source, i))
val := fmt.Sprintf("%s-%d", oldVal, i)
_, err := bigTxnTestCli.Put(ctx, key, val)
c.Assert(err, IsNil)
}
c.Assert(upgradeToVer3(ctx, bigTxnTestCli), IsNil)
}
2 changes: 2 additions & 0 deletions tests/dmctl_basic/conf/get_master1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ advertise-peer-urls = "http://127.0.0.1:8291"
initial-cluster = "master1=http://127.0.0.1:8291"
initial-cluster-state = "new"
join = ""
max-txn-ops = 2048
max-request-bytes = 1572864
auto-compaction-mode = "periodic"
auto-compaction-retention = "1h"
quota-backend-bytes = 2147483648
Expand Down

0 comments on commit 8089f1a

Please sign in to comment.