Skip to content

Commit

Permalink
ha: refactor the schedule model (pingcap#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Feb 18, 2020
1 parent 1478b5e commit 24c7693
Show file tree
Hide file tree
Showing 155 changed files with 6,496 additions and 3,063 deletions.
24 changes: 20 additions & 4 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -328,10 +328,9 @@ ErrMasterGenEmbedEtcdConfigFail,[code=38037:class=dm-master:scope=internal:level
ErrMasterStartEmbedEtcdFail,[code=38038:class=dm-master:scope=internal:level=high],"fail to start embed etcd"
ErrMasterParseURLFail,[code=38039:class=dm-master:scope=internal:level=high],"fail to parse URL %s"
ErrMasterJoinEmbedEtcdFail,[code=38040:class=dm-master:scope=internal:level=high],"fail to join embed etcd: %s"
ErrMasterCoordinatorNotStart,[code=38041:class=dm-master:scope=internal:level=high],"coordinator does not start"
ErrMasterAcquireWorkerFailed,[code=38042:class=dm-master:scope=internal:level=medium],"acquire worker failed: %s"
ErrMasterAdvertiseAddrNotValid,[code=38043:class=dm-master:scope=internal:level=high],"advertise address %s not valid"
ErrMasterRequestIsNotForwardToLeader,[code=38044:class=dm-master:scope=internal:level=high],"master is not leader, and can't forward request to leader"
ErrMasterInvalidOperateTaskOp,[code=38041:class=dm-master:scope=internal:level=medium],"invalid op %s on task"
ErrMasterAdvertiseAddrNotValid,[code=38042:class=dm-master:scope=internal:level=high],"advertise address %s not valid"
ErrMasterRequestIsNotForwardToLeader,[code=38043:class=dm-master:scope=internal:level=high],"master is not leader, and can't forward request to leader"
ErrWorkerParseFlagSet,[code=40001:class=dm-worker:scope=internal:level=medium],"parse dm-worker config flag set"
ErrWorkerInvalidFlag,[code=40002:class=dm-worker:scope=internal:level=medium],"'%s' is an invalid flag"
ErrWorkerDecodeConfigFromFile,[code=40003:class=dm-worker:scope=internal:level=medium],"toml decode file"
Expand Down Expand Up @@ -422,3 +421,20 @@ ErrSchemaTrackerCannotGetTable,[code=44005:class=schema-tracker:scope=internal:l
ErrSchemaTrackerCannotExecDDL,[code=44006:class=schema-tracker:scope=internal:level=high],"cannot track DDL: %s"
ErrSchemaTrackerCannotFetchDownstreamTable,[code=44007:class=schema-tracker:scope=downstream:level=medium],"cannot fetch downstream table schema of `%s`.`%s` to initialize upstream schema `%s`.`%s` in schema tracker"
ErrSchemaTrackerCannotParseDownstreamTable,[code=44008:class=schema-tracker:scope=internal:level=high],"cannot parse downstream table schema of `%s`.`%s` to initialize upstream schema `%s`.`%s` in schema tracker"
ErrSchedulerNotStarted,[code=46001:class=scheduler:scope=internal:level=high],"the scheduler has not started"
ErrSchedulerStarted,[code=46002:class=scheduler:scope=internal:level=medium],"the scheduler has already started"
ErrSchedulerWorkerExist,[code=46003:class=scheduler:scope=internal:level=medium],"dm-worker with name %s already exists"
ErrSchedulerWorkerNotExist,[code=46004:class=scheduler:scope=internal:level=medium],"dm-worker with name %s not exists"
ErrSchedulerWorkerOnline,[code=46005:class=scheduler:scope=internal:level=medium],"dm-worker with name %s is still online, must shut it down first"
ErrSchedulerWorkerInvalidTrans,[code=46006:class=scheduler:scope=internal:level=medium],"invalid stage transformation for dm-worker %s, from %s to %s"
ErrSchedulerSourceCfgExist,[code=46007:class=scheduler:scope=internal:level=medium],"source config with ID %s already exists"
ErrSchedulerSourceCfgNotExist,[code=46008:class=scheduler:scope=internal:level=medium],"source config with ID %s not exists"
ErrSchedulerSourcesUnbound,[code=46009:class=dm-master:scope=internal:level=medium],"sources %v have not bound"
ErrSchedulerSourceOpTaskExist,[code=46010:class=dm-master:scope=internal:level=medium],"source with name % need to operate with tasks %v exist"
ErrSchedulerRelayStageInvalidUpdate,[code=46011:class=scheduler:scope=internal:level=medium],"invalid new expectant relay stage %s"
ErrSchedulerRelayStageSourceNotExist,[code=46012:class=scheduler:scope=internal:level=medium],"sources %v need to update expectant relay stage not exist"
ErrSchedulerMultiTask,[code=46013:class=scheduler:scope=internal:level=medium],"the scheduler cannot perform multiple different tasks %v in one operation"
ErrSchedulerSubTaskExist,[code=46014:class=scheduler:scope=internal:level=medium],"subtasks with name %s for sources %v already exist"
ErrSchedulerSubTaskStageInvalidUpdate,[code=46015:class=dm-master:scope=internal:level=medium],"invalid new expectant subtask stage %s"
ErrSchedulerSubTaskOpTaskNotExist,[code=46016:class=dm-master:scope=internal:level=medium],"subtasks with name %s need to be operate not exist"
ErrSchedulerSubTaskOpSourceNotExist,[code=46017:class=dm-master:scope=internal:level=medium],"sources %v need to be operate not exist"
9 changes: 4 additions & 5 deletions cmd/dm-syncer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,10 @@ func (oc *oldConfig) convertToNewFormat() (*config.SubTaskConfig, error) {
return nil, errors.Trace(err)
}
newTask := &config.SubTaskConfig{
Name: "dm-syncer",
SourceID: "dm-syncer-from-old-config",
DisableHeartbeat: true,
Mode: config.ModeIncrement,
Meta: meta,
Name: "dm-syncer",
SourceID: "dm-syncer-from-old-config",
Mode: config.ModeIncrement,
Meta: meta,

LogLevel: oc.LogLevel,
LogFile: oc.LogFile,
Expand Down
17 changes: 12 additions & 5 deletions dm/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ import (
var (
useOfClosedErrMsg = "use of closed network connection"
// WorkerRegisterKeyAdapter used to encode and decode register key.
// k/v: Encode(addr) -> name
// k/v: Encode(name) -> the information of the DM-worker node.
WorkerRegisterKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/r/")
// WorkerKeepAliveKeyAdapter used to encode and decode keepalive key.
// k/v: Encode(addr,name) -> time
// k/v: Encode(name) -> time
WorkerKeepAliveKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-worker/a/")
// UpstreamConfigKeyAdapter store all config of which MySQL-task has not stopped.
// k/v: Encode(source-id) -> config
UpstreamConfigKeyAdapter KeyAdapter = keyEncoderDecoder("/dm-master/upstream/config/")
// UpstreamBoundWorkerKeyAdapter used to store address of worker in which MySQL-tasks which are running.
// k/v: Encode(addr) -> source-id
// k/v: Encode(name) -> the bound relationship.
UpstreamBoundWorkerKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/bound-worker/")
// UpstreamSubTaskKeyAdapter used to store SubTask which are subscribing data from MySQL source.
// k/v: Encode(source-id, task-name) -> SubTaskConfig
UpstreamSubTaskKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/upstream/subtask/")
// StageRelayKeyAdapter used to store the running stage of the relay.
// k/v: Encode(source-id) -> the running stage of the relay.
StageRelayKeyAdapter KeyAdapter = keyEncoderDecoder("/dm-master/stage/relay/")
// StageSubTaskKeyAdapter used to store the running stage of the subtask.
// k/v: Encode(source-id, task-name) -> the running stage of the subtask.
StageSubTaskKeyAdapter KeyAdapter = keyHexEncoderDecoder("/dm-master/stage/subtask/")

// ShardDDLPessimismInfoKeyAdapter used to store shard DDL info in pessimistic model.
// k/v: Encode(task-name, source-id) -> shard DDL info
Expand All @@ -50,9 +56,10 @@ var (

func keyAdapterKeysLen(s KeyAdapter) int {
switch s {
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter:
case WorkerRegisterKeyAdapter, UpstreamConfigKeyAdapter, UpstreamBoundWorkerKeyAdapter,
WorkerKeepAliveKeyAdapter, StageRelayKeyAdapter:
return 1
case WorkerKeepAliveKeyAdapter, UpstreamSubTaskKeyAdapter:
case UpstreamSubTaskKeyAdapter, StageSubTaskKeyAdapter:
return 2
}
return -1
Expand Down
4 changes: 2 additions & 2 deletions dm/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (t *testCommon) TestKeyAdapter(c *C) {
want: "/dm-worker/r/3132372e302e302e313a32333832",
},
{
keys: []string{"127.0.0.1:2382", "worker1"},
keys: []string{"worker1"},
adapter: WorkerKeepAliveKeyAdapter,
want: "/dm-worker/a/3132372e302e302e313a32333832/776f726b657231",
want: "/dm-worker/a/776f726b657231",
},
{
keys: []string{"mysql1"},
Expand Down
12 changes: 7 additions & 5 deletions dm/config/checker_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ type Duration struct {
}

// MarshalText hacks to satisfy the encoding.TextMarshaler interface
// For MarshalText, we should use (d Duration) which can be used by both pointer and instance
func (d Duration) MarshalText() ([]byte, error) {
return []byte(d.Duration.String()), nil
}

// UnmarshalText hacks to satisfy the encoding.TextUnmarshaler interface
func (d Duration) UnmarshalText(text []byte) error {
// For UnmarshalText, we should use (d *Duration) to change the value of this instance instead of the copy
func (d *Duration) UnmarshalText(text []byte) error {
var err error
d.Duration, err = time.ParseDuration(string(text))
return err
Expand All @@ -47,10 +49,10 @@ type CheckerConfig struct {
BackoffRollback Duration `toml:"backoff-rollback" json:"backoff-rollback"`
BackoffMax Duration `toml:"backoff-max" json:"backoff-max"`
// unexpose config
CheckInterval Duration `json:"-"`
BackoffMin Duration `json:"-"`
BackoffJitter bool `json:"-"`
BackoffFactor float64 `json:"-"`
CheckInterval Duration `toml:"check-interval" json:"-"`
BackoffMin Duration `toml:"backoff-min" json:"-"`
BackoffJitter bool `toml:"backoff-jitter" json:"-"`
BackoffFactor float64 `toml:"backoff-factor" json:"-"`
}

// Adjust sets default value for field: CheckInterval/BackoffMin/BackoffJitter/BackoffFactor
Expand Down
45 changes: 23 additions & 22 deletions dm/config/mysql_config.go → dm/config/source_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"time"

"github.com/BurntSushi/toml"
"github.com/siddontang/go-mysql/mysql"

"github.com/pingcap/dm/pkg/binlog"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/tracing"
"github.com/pingcap/dm/pkg/utils"
"github.com/siddontang/go-mysql/mysql"
)

const (
Expand All @@ -39,8 +40,8 @@ type PurgeConfig struct {
RemainSpace int64 `toml:"remain-space" json:"remain-space"` // if remain space in @RelayBaseDir less than @RemainSpace (GB), then it can be purged
}

// MysqlConfig is the configuration for Worker
type MysqlConfig struct {
// SourceConfig is the configuration for Worker
type SourceConfig struct {
EnableGTID bool `toml:"enable-gtid" json:"enable-gtid"`
AutoFixGTID bool `toml:"auto-fix-gtid" json:"auto-fix-gtid"`
RelayDir string `toml:"relay-dir" json:"relay-dir"`
Expand Down Expand Up @@ -69,9 +70,9 @@ type MysqlConfig struct {
ServerID uint32 `toml:"server-id" json:"server-id"`
}

// NewMysqlConfig creates a new base config for worker.
func NewMysqlConfig() *MysqlConfig {
c := &MysqlConfig{
// NewSourceConfig creates a new base config for upstream MySQL/MariaDB source.
func NewSourceConfig() *SourceConfig {
c := &SourceConfig{
RelayDir: "relay-dir",
Purge: PurgeConfig{
Interval: 60 * 60,
Expand All @@ -95,14 +96,14 @@ func NewMysqlConfig() *MysqlConfig {
}

// Clone clones a config
func (c *MysqlConfig) Clone() *MysqlConfig {
clone := &MysqlConfig{}
func (c *SourceConfig) Clone() *SourceConfig {
clone := &SourceConfig{}
*clone = *c
return clone
}

// Toml returns TOML format representation of config
func (c *MysqlConfig) Toml() (string, error) {
func (c *SourceConfig) Toml() (string, error) {
var b bytes.Buffer

err := toml.NewEncoder(&b).Encode(c)
Expand All @@ -114,36 +115,36 @@ func (c *MysqlConfig) Toml() (string, error) {
}

// Parse parses flag definitions from the argument list.
func (c *MysqlConfig) Parse(content string) error {
func (c *SourceConfig) Parse(content string) error {
// Parse first to get config file.
metaData, err := toml.Decode(content, c)
return c.check(&metaData, err)
}

// EncodeToml encodes config.
func (c *MysqlConfig) EncodeToml() (string, error) {
func (c *SourceConfig) EncodeToml() (string, error) {
buf := new(bytes.Buffer)
if err := toml.NewEncoder(buf).Encode(c); err != nil {
return "", err
}
return buf.String(), nil
}

func (c *MysqlConfig) String() string {
func (c *SourceConfig) String() string {
cfg, err := json.Marshal(c)
if err != nil {
log.L().Error("fail to marshal config to json", log.ShortError(err))
}
return string(cfg)
}

func (c *MysqlConfig) adjust() {
func (c *SourceConfig) adjust() {
c.From.Adjust()
c.Checker.Adjust()
}

// Verify verifies the config
func (c *MysqlConfig) Verify() error {
func (c *SourceConfig) Verify() error {
if len(c.SourceID) == 0 {
return terror.ErrWorkerNeedSourceID.Generate()
}
Expand Down Expand Up @@ -175,7 +176,7 @@ func (c *MysqlConfig) Verify() error {
}

// DecryptPassword returns a decrypted config replica in config
func (c *MysqlConfig) DecryptPassword() (*MysqlConfig, error) {
func (c *SourceConfig) DecryptPassword() (*SourceConfig, error) {
clone := c.Clone()
var (
pswdFrom string
Expand All @@ -192,7 +193,7 @@ func (c *MysqlConfig) DecryptPassword() (*MysqlConfig, error) {
}

// GenerateDBConfig creates DBConfig for DB
func (c *MysqlConfig) GenerateDBConfig() (*DBConfig, error) {
func (c *SourceConfig) GenerateDBConfig() (*DBConfig, error) {
// decrypt password
clone, err := c.DecryptPassword()
if err != nil {
Expand All @@ -203,8 +204,8 @@ func (c *MysqlConfig) GenerateDBConfig() (*DBConfig, error) {
return from, nil
}

// Adjust flavor and serverid of MysqlConfig
func (c *MysqlConfig) Adjust(db *sql.DB) (err error) {
// Adjust flavor and serverid of SourceConfig
func (c *SourceConfig) Adjust(db *sql.DB) (err error) {
c.From.Adjust()
c.Checker.Adjust()

Expand All @@ -227,7 +228,7 @@ func (c *MysqlConfig) Adjust(db *sql.DB) (err error) {
}

// AdjustFlavor adjust Flavor from DB
func (c *MysqlConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error) {
func (c *SourceConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error) {
if c.Flavor != "" {
switch c.Flavor {
case mysql.MariaDBFlavor, mysql.MySQLFlavor:
Expand All @@ -245,7 +246,7 @@ func (c *MysqlConfig) AdjustFlavor(ctx context.Context, db *sql.DB) (err error)
}

// AdjustServerID adjust server id from DB
func (c *MysqlConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
func (c *SourceConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
if c.ServerID != 0 {
return nil
}
Expand Down Expand Up @@ -273,12 +274,12 @@ func (c *MysqlConfig) AdjustServerID(ctx context.Context, db *sql.DB) error {
}

// LoadFromFile loads config from file.
func (c *MysqlConfig) LoadFromFile(path string) error {
func (c *SourceConfig) LoadFromFile(path string) error {
metaData, err := toml.DecodeFile(path, c)
return c.check(&metaData, err)
}

func (c *MysqlConfig) check(metaData *toml.MetaData, err error) error {
func (c *SourceConfig) check(metaData *toml.MetaData, err error) error {
if err != nil {
return terror.ErrWorkerDecodeConfigFromFile.Delegate(err)
}
Expand Down
Loading

0 comments on commit 24c7693

Please sign in to comment.