Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

ha: refactor the schedule model #473

Merged
merged 125 commits into from
Feb 18, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
125 commits
Select commit Hold shift + click to select a range
48408ae
ha: add some design for the new HA model
csuzhangxc Feb 10, 2020
aa5b69f
Update pkg/ha/doc.go
csuzhangxc Feb 10, 2020
ca3191d
ha: add some design for the new HA model
csuzhangxc Feb 10, 2020
e6643f0
ha: add etcd operation sample for source
csuzhangxc Feb 11, 2020
c9000f2
ha: add etcd operation sample for source
csuzhangxc Feb 11, 2020
80964fa
add subtask
lichunzhu Feb 11, 2020
98a225e
address comments
lichunzhu Feb 11, 2020
4308f8f
ha: add etcd operation sample for dm-worker info
csuzhangxc Feb 11, 2020
4232b88
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 11, 2020
a1b76b3
ha: update copyright year
csuzhangxc Feb 11, 2020
5953a80
ha: update wording
csuzhangxc Feb 11, 2020
b69de45
return map for subtask key
lichunzhu Feb 11, 2020
1dcff3b
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 11, 2020
1ec15dc
fix bug
lichunzhu Feb 11, 2020
3119076
ha: add etcd operation for source bound
csuzhangxc Feb 11, 2020
c4c81d6
Merge remote-tracking branch 'remotes/origin/ha-dev' into ha-refactor
csuzhangxc Feb 11, 2020
7c97eed
add keepalive
lichunzhu Feb 11, 2020
580b3b6
refine code
lichunzhu Feb 11, 2020
5a0a934
fix hound
lichunzhu Feb 12, 2020
caa378a
fix make check
lichunzhu Feb 12, 2020
e0c07a2
ha: add etcd operation for stage
csuzhangxc Feb 12, 2020
91b1d01
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 12, 2020
6ec665d
ha: report error for watcher through chan
csuzhangxc Feb 12, 2020
af3687c
add errCh, support multi put
lichunzhu Feb 12, 2020
ef8a5d7
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 12, 2020
d012095
ha: add etcd operation for operations in one txn
csuzhangxc Feb 12, 2020
0a234be
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 12, 2020
55cd37c
ha: refine code
csuzhangxc Feb 12, 2020
7c2fc72
refine code
lichunzhu Feb 12, 2020
0a5893e
support getting alive workers
lichunzhu Feb 12, 2020
428a94e
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 12, 2020
eae1b1c
refine code
lichunzhu Feb 12, 2020
ed12d30
address comments
lichunzhu Feb 13, 2020
6ef1a7f
refine wait time
lichunzhu Feb 13, 2020
44b8d54
add revision support
lichunzhu Feb 13, 2020
d903258
add ut for rev
lichunzhu Feb 13, 2020
d27aaf7
support revision in stage
lichunzhu Feb 13, 2020
4727c07
refine stage revision test
lichunzhu Feb 13, 2020
d372f9f
support etcd operation for subtask and relay
lichunzhu Feb 13, 2020
cc47624
add revision for bound and source
lichunzhu Feb 14, 2020
6e91443
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 14, 2020
b77d34e
ha: get all bound relationship
csuzhangxc Feb 14, 2020
bb6be0c
refine revision for event
lichunzhu Feb 14, 2020
cb0858f
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 14, 2020
278046f
refine comments
lichunzhu Feb 14, 2020
4436cec
support etcd operations on dm-worker
lichunzhu Feb 14, 2020
9ac916c
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 14, 2020
2ecdc56
fix check
lichunzhu Feb 14, 2020
535d059
scheduler: add worker agent; add scheduler skeleton
csuzhangxc Feb 14, 2020
b874b79
scheduler: handle source config; refine code
csuzhangxc Feb 14, 2020
295d020
scheduler: record bounds and unbounds; add test steps
csuzhangxc Feb 14, 2020
277f130
ha: add delete API for source bound
csuzhangxc Feb 15, 2020
ce49d96
scheduler: bound/unbound when the worker become online/offline
csuzhangxc Feb 15, 2020
cb4f16f
scheduler: refine tests
csuzhangxc Feb 15, 2020
935eccf
scheduler: put relay stage when put the first bound; add some code fo…
csuzhangxc Feb 15, 2020
eb0925f
scheduler: support update relay stage.
csuzhangxc Feb 15, 2020
e324398
scheduler: support add subtasks
csuzhangxc Feb 15, 2020
d9bea9d
ha: add get all for source config and relay stage.
csuzhangxc Feb 15, 2020
4198b1f
ha: add get all for subtask config
csuzhangxc Feb 15, 2020
5b744a6
scheduler: support update subtask; recover config and stage for sourc…
csuzhangxc Feb 15, 2020
e6f6a3e
refine worker UT
lichunzhu Feb 16, 2020
30d33e4
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 16, 2020
73cee97
ha: address comments
csuzhangxc Feb 16, 2020
b65ece5
move decrypt config to init
lichunzhu Feb 16, 2020
f8d7c75
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 16, 2020
964aeb6
*: refine bound source to worker
csuzhangxc Feb 16, 2020
2f7bff7
*: support remove subtask; update remove source
csuzhangxc Feb 16, 2020
4207d9e
scheduler: fix unbounds when removing source
csuzhangxc Feb 16, 2020
7711e6f
scheduler: add SendRequest API for worker agent
csuzhangxc Feb 16, 2020
19e329b
scheduler: add more comments and test cases
csuzhangxc Feb 16, 2020
bdbe56f
scheduler: address comment to fix deadlock
csuzhangxc Feb 17, 2020
713640e
remove coordinator, switch to scheduler api
lichunzhu Feb 17, 2020
5d07e1c
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 17, 2020
c2a7651
refine clear env method
lichunzhu Feb 17, 2020
095f7a4
clear etcd info
lichunzhu Feb 17, 2020
0a9d45c
refine UT
lichunzhu Feb 17, 2020
7f841c1
refine master UT
lichunzhu Feb 17, 2020
8a2c88c
extract ClearTestInfoOperation
lichunzhu Feb 17, 2020
4a6dcb4
set mysqlConfig password to empty
lichunzhu Feb 17, 2020
595a11a
fix
lichunzhu Feb 17, 2020
0794bb0
*: remove the code about Coordinator
csuzhangxc Feb 17, 2020
ab12458
operate source before keepalive, refine logs and add purgeRelayDir
lichunzhu Feb 17, 2020
b7a863a
Merge branch 'ha-refactor' of https://github.com/lichunzhu/dm into ha…
lichunzhu Feb 17, 2020
88afe72
*: rename `MySQLConfig` to `SourceConfig`, `operate-worker` to `opera…
csuzhangxc Feb 17, 2020
7c7be03
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 17, 2020
1795377
worker: fix merge
csuzhangxc Feb 17, 2020
af535f3
address comment
lichunzhu Feb 17, 2020
06b259b
switch to mock
lichunzhu Feb 17, 2020
fe8d7a3
*: encode check's config to toml; log the error about start work fail
csuzhangxc Feb 17, 2020
6128026
make worker not relying on downstream tidb
lichunzhu Feb 17, 2020
06a0332
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 17, 2020
53890fd
*: rename `operate-worker` to `operate-source`, `dm-mysql.toml` to `s…
csuzhangxc Feb 17, 2020
ac2da20
Merge branch 'ha-refactor' of github.com:pingcap/dm into ha-refactor
csuzhangxc Feb 17, 2020
98da039
fix parse problem
lichunzhu Feb 17, 2020
b445e7c
fix parse problem again
lichunzhu Feb 18, 2020
0f5729f
scheduler: support add the same worker multiple times
csuzhangxc Feb 18, 2020
79f0991
add comments and UT for source configss
lichunzhu Feb 18, 2020
d21c2b7
Merge branch 'ha-refactor' of https://github.com/lichunzhu/dm into ha…
lichunzhu Feb 18, 2020
f41ef63
fix UT error
lichunzhu Feb 18, 2020
2783ba6
address comments
lichunzhu Feb 18, 2020
91bc029
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 18, 2020
6e0c686
fix all_mode integration test, remove disable heartbeat
lichunzhu Feb 18, 2020
e5302d4
comment all enable-heartbeat in integration tests
lichunzhu Feb 18, 2020
d543271
refine dmctl_basic UT
lichunzhu Feb 18, 2020
badb6af
refine dmctl_basic UT part2
lichunzhu Feb 18, 2020
d8e34f5
refine dmctl_basic integration tests part.3
lichunzhu Feb 18, 2020
ead0984
tests: turn off relay_interrupt until compatible with relay again.
csuzhangxc Feb 18, 2020
c7e2296
tests: fix print_status
csuzhangxc Feb 18, 2020
230322c
tests: try to fix ha
csuzhangxc Feb 18, 2020
7899b28
tests: fix initial_unit
csuzhangxc Feb 18, 2020
39e9880
tests: try to fix ha
csuzhangxc Feb 18, 2020
633e6b6
refine dmctl_basic
lichunzhu Feb 18, 2020
10ac6e9
small fi
lichunzhu Feb 18, 2020
2f06e0f
tests: fix incremental_mode; update test_prepare
csuzhangxc Feb 18, 2020
9a05569
fix http_apis test
lichunzhu Feb 18, 2020
8a21f33
Merge branch 'ha-refactor' of https://github.com/pingcap/dm into ha-r…
lichunzhu Feb 18, 2020
894c8f6
sleep after operation in http_apis
lichunzhu Feb 18, 2020
1033e72
disable relay in ha integration tests
lichunzhu Feb 18, 2020
d437200
tests: rename schema in dm_syncer; skip online DDL
csuzhangxc Feb 18, 2020
c7e3abe
refine start_task test
lichunzhu Feb 18, 2020
5b384b3
tests: revert online_ddl case; remove retry_cancel
csuzhangxc Feb 18, 2020
72ade95
Merge remote-tracking branch 'remotes/origin/ha-dev' into ha-refactor
csuzhangxc Feb 18, 2020
15aa987
*: fix merge
csuzhangxc Feb 18, 2020
2d4b386
*: fix merge
csuzhangxc Feb 18, 2020
355c674
tests: abort online_ddl
csuzhangxc Feb 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ import (
var (
useOfClosedErrMsg = "use of closed network connection"
// WorkerRegisterKeyAdapter used to encode and decode register key.
// k/v: Encode(addr) -> name
// k/v: Encode(name) -> the information of the DM-worker node.
WorkerRegisterKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/r/")
// WorkerKeepAliveKeyAdapter used to encode and decode keepalive key.
// k/v: Encode(addr,name) -> time
// k/v: Encode(name) -> time
WorkerKeepAliveKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/a/")
// UpstreamConfigKeyAdapter store all config of which MySQL-task has not stopped.
// k/v: Encode(source-id) -> config
UpstreamConfigKeyAdapter KeyAdapter = keyEncoderDecoder("/dm-master/upstream/config/")
// UpstreamBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// k/v: Encode(addr) -> source-id
// k/v: Encode(name) -> the bound relationship.
UpstreamBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/bound-worker/")
// UpstreamSubTaskKeyAdapter used to store SubTask which are subscribing data from MySQL source.
// k/v: Encode(source-id, task-name) -> SubTaskConfig
Expand All @@ -50,9 +50,9 @@ var (

func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter:
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter, WorkerKeepAliveKeyAdapter:
return 1
case WorkerKeepAliveKeyAdapter, UpstreamSubTaskKeyAdapter:
case UpstreamSubTaskKeyAdapter:
return 2
}
return -1
Expand Down
4 changes: 2 additions & 2 deletions dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (t *testCommon) TestKeyAdapter(c *C) {
want: "/dm-worker/r/3132372e302e302e313a32333832",
},
{
keys: []string{"127.0.0.1:2382", "worker1"},
keys: []string{"worker1"},
adapter: WorkerKeepAliveKeyAdapter,
want: "/dm-worker/a/3132372e302e302e313a32333832/776f726b657231",
want: "/dm-worker/a/776f726b657231",
},
{
keys: []string{"mysql1"},
Expand Down
176 changes: 176 additions & 0 deletions pkg/ha/bound.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ha

import (
"context"
"encoding/json"
"fmt"

"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"go.uber.org/zap"

"github.com/pingcap/dm/dm/common"
"github.com/pingcap/dm/pkg/etcdutil"
"github.com/pingcap/dm/pkg/log"
)

// SourceBound represents the bound relationship between the DM-worker instance and the upstream MySQL source.
type SourceBound struct {
Source string `json:"source"` // the source ID of the upstream.
Worker string `json:"worker"` // the name of the bounded DM-worker for the source.
}

// NewSourceBound creates a new SourceBound instance.
func NewSourceBound(source, worker string) SourceBound {
return SourceBound{
Source: source,
Worker: worker,
}
}

// NotBound returns whether the relationship has not bound.
// An empty bound means the relationship has not bound.
func (b SourceBound) NotBound() bool {
return b.Source == "" && b.Worker == ""
}

// String implements Stringer interface.
func (b SourceBound) String() string {
s, _ := b.toJSON()
return s
}

// toJSON returns the string of JSON represent.
func (b SourceBound) toJSON() (string, error) {
data, err := json.Marshal(b)
if err != nil {
return "", err
}
return string(data), nil
}

// sourceBoundFromJSON constructs SourceBound from its JSON represent.
func sourceBoundFromJSON(s string) (b SourceBound, err error) {
err = json.Unmarshal([]byte(s), &b)
return
}

// PutSourceBound puts the bound relationship into etcd.
// k/v: worker-name -> bound relationship.
func PutSourceBound(cli *clientv3.Client, bound SourceBound) (int64, error) {
op, err := putSourceBoundOp(bound)
if err != nil {
return 0, err
}

ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

resp, err := cli.Txn(ctx).Then(op).Commit()
if err != nil {
return 0, err
}
return resp.Header.Revision, nil
}

// GetSourceBound gets the source bound relationship for the specified DM-worker.
func GetSourceBound(cli *clientv3.Client, worker string) (SourceBound, int64, error) {
ctx, cancel := context.WithTimeout(cli.Ctx(), etcdutil.DefaultRequestTimeout)
defer cancel()

bound := SourceBound{}
resp, err := cli.Get(ctx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker))
if err != nil {
return bound, 0, err
}

if resp.Count == 0 {
return bound, 0, nil
} else if resp.Count > 1 {
// TODO(csuzhangxc): add terror.
// this should not happen.
return bound, 0, fmt.Errorf("too many bound relationship (%d) exist for the DM-worker %s", resp.Count, worker)
}

bound, err = sourceBoundFromJSON(string(resp.Kvs[0].Value))
if err != nil {
return bound, 0, err
}

return bound, resp.Header.Revision, nil
}

// WatchSourceBound watches PUT & DELETE operations for the bound relationship of the specified DM-worker.
// For the DELETE operations, it returns an empty bound relationship.
func WatchSourceBound(ctx context.Context, cli *clientv3.Client,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we return error for retry?

worker string, revision int64, outCh chan<- SourceBound) {
ch := cli.Watch(ctx, common.UpstreamBoundWorkerKeyAdapter.Encode(worker), clientv3.WithRev(revision))

for {
select {
case <-ctx.Done():
return
case resp := <-ch:
if resp.Canceled {
return
}

for _, ev := range resp.Events {
var (
bound SourceBound
err error
)
switch ev.Type {
case mvccpb.PUT:
bound, err = sourceBoundFromJSON(string(ev.Kv.Value))
if err != nil {
// this should not happen.
log.L().Error("fail to construct source bound relationship", zap.ByteString("json", ev.Kv.Value))
continue
}
case mvccpb.DELETE:
default:
// this should not happen.
log.L().Error("unsupported etcd event type", zap.Reflect("kv", ev.Kv), zap.Reflect("type", ev.Type))
continue
}

select {
case outCh <- bound:
case <-ctx.Done():
return
}
}
}
}
}

// deleteSourceBoundOp returns a DELETE ectd operation for the bound relationship of the specified DM-worker.
func deleteSourceBoundOp(worker string) clientv3.Op {
return clientv3.OpDelete(common.UpstreamBoundWorkerKeyAdapter.Encode(worker))
}

// putSourceBoundOp returns a PUT etcd operation for the bound relationship.
// k/v: worker-name -> bound relationship.
func putSourceBoundOp(bound SourceBound) (clientv3.Op, error) {
value, err := bound.toJSON()
if err != nil {
return clientv3.Op{}, err
}
key := common.UpstreamBoundWorkerKeyAdapter.Encode(bound.Worker)

return clientv3.OpPut(key, value), nil
}
95 changes: 95 additions & 0 deletions pkg/ha/bound_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package ha

import (
"context"
"time"

. "github.com/pingcap/check"
)

func (t *testForEtcd) TestSourceBoundJSON(c *C) {
b1 := NewSourceBound("mysql-replica-1", "dm-worker-1")

j, err := b1.toJSON()
c.Assert(err, IsNil)
c.Assert(j, Equals, `{"source":"mysql-replica-1","worker":"dm-worker-1"}`)
c.Assert(j, Equals, b1.String())

b2, err := sourceBoundFromJSON(j)
c.Assert(err, IsNil)
c.Assert(b2, DeepEquals, b1)
}

func (t *testForEtcd) TestSourceBoundEtcd(c *C) {
defer clearTestInfoOperation(c)

var (
watchTimeout = 500 * time.Millisecond
worker = "dm-worker-1"
emptyBound = SourceBound{}
bound = NewSourceBound("mysql-replica-1", worker)
)
c.Assert(bound.NotBound(), IsFalse)

// no bound exists.
bo1, rev1, err := GetSourceBound(etcdTestCli, worker)
c.Assert(err, IsNil)
c.Assert(rev1, Equals, int64(0))
c.Assert(bo1, DeepEquals, emptyBound)

// put a bound.
rev2, err := PutSourceBound(etcdTestCli, bound)
c.Assert(err, IsNil)
c.Assert(rev2, Greater, rev1)

// watch the PUT operation for the bound.
boundCh := make(chan SourceBound, 10)
ctx, cancel := context.WithTimeout(context.Background(), watchTimeout)
WatchSourceBound(ctx, etcdTestCli, worker, rev2, boundCh)
cancel()
close(boundCh)
c.Assert(len(boundCh), Equals, 1)
c.Assert(<-boundCh, DeepEquals, bound)

// get the bound back.
bo2, rev3, err := GetSourceBound(etcdTestCli, worker)
c.Assert(err, IsNil)
c.Assert(rev3, Equals, rev2)
c.Assert(bo2, DeepEquals, bound)

// delete the bound.
deleteOp := deleteSourceBoundOp(worker)
resp, err := etcdTestCli.Txn(context.Background()).Then(deleteOp).Commit()
c.Assert(err, IsNil)
rev4 := resp.Header.Revision
c.Assert(rev4, Greater, rev3)

// watch the DELETE operation for the bound.
boundCh = make(chan SourceBound, 10)
ctx, cancel = context.WithTimeout(context.Background(), watchTimeout)
WatchSourceBound(ctx, etcdTestCli, worker, rev4, boundCh)
cancel()
close(boundCh)
c.Assert(len(boundCh), Equals, 1)
bo3 := <-boundCh
c.Assert(bo3.NotBound(), IsTrue)

// get again, not exists now.
bo4, rev5, err := GetSourceBound(etcdTestCli, worker)
c.Assert(err, IsNil)
c.Assert(rev5, Equals, int64(0))
c.Assert(bo4, DeepEquals, emptyBound)
}
Loading