Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
*: combine etcd log into dm-master (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Nov 19, 2019
1 parent 19b490b commit a64b26d
Show file tree
Hide file tree
Showing 22 changed files with 348 additions and 174 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ your contribution accepted.

Developing DM requires:

* [Go 1.11+](http://golang.org/doc/code.html)
* [Go 1.13+](http://golang.org/doc/code.html)
* An internet connection to download the dependencies

Simply run `make` to build the program.
Expand Down
16 changes: 10 additions & 6 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ ErrSyncClosed,[code=11041:class=functional:scope=internal:level=high],"Sync was
ErrSchemaTableNameNotValid,[code=11042:class=functional:scope=internal:level=high],"table name %s not valid"
ErrGenTableRouter,[code=11043:class=functional:scope=internal:level=high],"generate table router"
ErrEncryptSecretKeyNotValid,[code=11044:class=functional:scope=internal:level=high],"key size should be 16, 24 or 32, but input key's size is %d"
ErrEncryptNewCipher,[code=11045:class=functional:scope=internal:level=high],"new cipher"
ErrEncryptGenCipher,[code=11045:class=functional:scope=internal:level=high],"generate cipher"
ErrEncryptGenIV,[code=11046:class=functional:scope=internal:level=high],"generate iv"
ErrCiphertextLenNotValid,[code=11047:class=functional:scope=internal:level=high],"ciphertext's length should be greater than %d, but got %d not valid"
ErrCiphertextContextNotValid,[code=11048:class=functional:scope=internal:level=high],"ciphertext's content not valid"
Expand Down Expand Up @@ -152,9 +152,10 @@ ErrCheckpointTableNotExistInFile,[code=24005:class=checkpoint:scope=internal:lev
ErrCheckpointRestoreCountGreater,[code=24006:class=checkpoint:scope=internal:level=medium],"restoring count greater than total count for table[%v]"
ErrTaskCheckSameTableName,[code=26001:class=task-check:scope=internal:level=medium],"same table name in case-sensitive %v"
ErrTaskCheckFailedOpenDB,[code=26002:class=task-check:scope=internal:level=high],"failed to open DSN %s:***@%s:%d"
ErrTaskCheckNewTableRouter,[code=26003:class=task-check:scope=internal:level=medium],"new table router error"
ErrTaskCheckNewColumnMapping,[code=26004:class=task-check:scope=internal:level=medium],"new column mapping error"
ErrTaskCheckGenTableRouter,[code=26003:class=task-check:scope=internal:level=medium],"generate table router error"
ErrTaskCheckGenColumnMapping,[code=26004:class=task-check:scope=internal:level=medium],"generate column mapping error"
ErrTaskCheckSyncConfigError,[code=26005:class=task-check:scope=internal:level=medium],"%s %v: %v\n detail: %v"
ErrTaskCheckGenBWList,[code=26006:class=task-check:scope=internal:level=medium],"generate black white list error"
ErrRelayParseUUIDIndex,[code=28001:class=relay-event-lib:scope=internal:level=high],"parse server-uuid.index"
ErrRelayParseUUIDSuffix,[code=28002:class=relay-event-lib:scope=internal:level=high],"UUID (with suffix) %s not valid"
ErrRelayUUIDWithSuffixNotFound,[code=28003:class=relay-event-lib:scope=internal:level=high],"no UUID (with suffix) matched %s found in %s, all UUIDs are %v"
Expand Down Expand Up @@ -205,6 +206,7 @@ ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high
ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high],"previousGTIDs %s not valid"
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high],"mydumper runs with error"
ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high],"generate table router"
ErrDumpUnitGenBWList,[code=32003:class=dump-unit:scope=internal:level=high],"generate black white list"
ErrLoadUnitCreateSchemaFile,[code=34001:class=load-unit:scope=internal:level=medium],"generate schema file"
ErrLoadUnitInvalidFileEnding,[code=34002:class=load-unit:scope=internal:level=high],"corresponding ending of sql: ')' not found"
ErrLoadUnitParseQuoteValues,[code=34003:class=load-unit:scope=internal:level=high],"parse quote values error"
Expand All @@ -215,11 +217,12 @@ ErrLoadUnitNotCreateTable,[code=34007:class=load-unit:scope=internal:level=high]
ErrLoadUnitDispatchSQLFromFile,[code=34008:class=load-unit:scope=internal:level=high],"dispatch sql"
ErrLoadUnitInvalidInsertSQL,[code=34009:class=load-unit:scope=internal:level=high],"invalid insert sql %s"
ErrLoadUnitGenTableRouter,[code=34010:class=load-unit:scope=internal:level=high],"generate table router"
ErrLoadUnitNewColumnMapping,[code=34011:class=load-unit:scope=internal:level=high],"new column mapping"
ErrLoadUnitGenColumnMapping,[code=34011:class=load-unit:scope=internal:level=high],"generate column mapping"
ErrLoadUnitNoDBFile,[code=34012:class=load-unit:scope=internal:level=high],"invalid data sql file, cannot find db - %s"
ErrLoadUnitNoTableFile,[code=34013:class=load-unit:scope=internal:level=high],"invalid data sql file, cannot find table - %s"
ErrLoadUnitDumpDirNotFound,[code=34014:class=load-unit:scope=internal:level=high],"%s does not exist or it's not a dir"
ErrLoadUnitDuplicateTableFile,[code=34015:class=load-unit:scope=internal:level=high],"invalid table schema file, duplicated item - %s"
ErrLoadUnitGenBWList,[code=34016:class=load-unit:scope=internal:level=high],"generate black white list"
ErrSyncerUnitPanic,[code=36001:class=sync-unit:scope=internal:level=high],"panic error: %v"
ErrSyncUnitInvalidTableName,[code=36002:class=sync-unit:scope=internal:level=high],"extract table name for DML error: %s"
ErrSyncUnitTableNameQuery,[code=36003:class=sync-unit:scope=internal:level=high],"table name parse error: %s"
Expand Down Expand Up @@ -249,9 +252,9 @@ ErrSyncerUnitNilOperatorReq,[code=36026:class=sync-unit:scope=internal:level=med
ErrSyncerUnitDMLColumnNotMatch,[code=36027:class=sync-unit:scope=internal:level=high],"Column count doesn't match value count: %d (columns) vs %d (values)"
ErrSyncerUnitDMLOldNewValueMismatch,[code=36028:class=sync-unit:scope=internal:level=high],"Old value count doesn't match new value count: %d (old) vs %d (new)"
ErrSyncerUnitDMLPruneColumnMismatch,[code=36029:class=sync-unit:scope=internal:level=high],"prune DML columns and data mismatch in length: %d (columns) %d (data)"
ErrSyncerUnitNewBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high],"new binlog event filter"
ErrSyncerUnitGenBinlogEventFilter,[code=36030:class=sync-unit:scope=internal:level=high],"generate binlog event filter"
ErrSyncerUnitGenTableRouter,[code=36031:class=sync-unit:scope=internal:level=high],"generate table router"
ErrSyncerUnitNewColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high],"new column mapping"
ErrSyncerUnitGenColumnMapping,[code=36032:class=sync-unit:scope=internal:level=high],"generate column mapping"
ErrSyncerUnitDoColumnMapping,[code=36033:class=sync-unit:scope=internal:level=high],"mapping row data %v for table `%s`.`%s`"
ErrSyncerUnitCacheKeyNotFound,[code=36034:class=sync-unit:scope=internal:level=high],"cache key %s in %s not found"
ErrSyncerUnitHeartbeatCheckConfig,[code=36035:class=sync-unit:scope=internal:level=medium],""
Expand Down Expand Up @@ -279,6 +282,7 @@ ErrSyncerUnitResolveCasualityFail,[code=36056:class=sync-unit:scope=internal:lev
ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:level=high],"reopen %T not supported"
ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported"
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync"
ErrSyncerUnitGenBWList,[code=36060:class=sync-unit:scope=internal:level=high],"generate black white list"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
9 changes: 6 additions & 3 deletions checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,18 @@ func (c *Checker) Init() (err error) {
_, checkSchema := c.checkingItems[config.TableSchemaChecking]

for _, instance := range c.instances {
bw := filter.New(instance.cfg.CaseSensitive, instance.cfg.BWList)
bw, err := filter.New(instance.cfg.CaseSensitive, instance.cfg.BWList)
if err != nil {
return terror.ErrTaskCheckGenBWList.Delegate(err)
}
r, err := router.NewTableRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
if err != nil {
return terror.ErrTaskCheckNewTableRouter.Delegate(err)
return terror.ErrTaskCheckGenTableRouter.Delegate(err)
}

columnMapping[instance.cfg.SourceID], err = column.NewMapping(instance.cfg.CaseSensitive, instance.cfg.ColumnMappingRules)
if err != nil {
return terror.ErrTaskCheckNewColumnMapping.Delegate(err)
return terror.ErrTaskCheckGenColumnMapping.Delegate(err)
}

instance.sourceDBinfo = &dbutil.DBConfig{
Expand Down
14 changes: 13 additions & 1 deletion dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ func (c *Config) adjust() error {
c.InitialClusterState = defaultInitialClusterState
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}

Expand Down Expand Up @@ -298,6 +297,7 @@ func (c *Config) Reload() error {
}

// genEmbedEtcdConfig generates the configuration needed by embed etcd.
// This method should be called after logger initialized and before any concurrent gRPC calls.
func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg := embed.NewConfig()
cfg.Name = c.Name
Expand All @@ -324,6 +324,18 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState

// use zap as the logger for embed etcd
// NOTE: `genEmbedEtcdConfig` can only be called after logger initialized.
// NOTE: if using zap logger for etcd, must build it before any concurrent gRPC calls,
// otherwise, DATA RACE occur in builder and gRPC.
logger := log.L().WithFields(zap.String("component", "embed etcd"))
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(logger.Logger, logger.Core(), log.Props().Syncer) // use global app props.
cfg.Logger = "zap"
err = cfg.Validate() // verify & trigger the builder
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.AnnotateDelegate(err, "fail to validate embed etcd config")
}

return cfg, nil
}

Expand Down
6 changes: 6 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/check"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

Expand All @@ -38,6 +39,11 @@ var (
type testConfigSuite struct {
}

func (t *testConfigSuite) SetUpSuite(c *check.C) {
// initialized the logger to make genEmbedEtcdConfig working.
log.InitLogger(&log.Config{})
}

func (t *testConfigSuite) TestPrintSampleConfig(c *check.C) {
var (
buf []byte
Expand Down
13 changes: 4 additions & 9 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,18 @@ const (
)

// startEtcd starts an embedded etcd server.
func startEtcd(masterCfg *Config,
func startEtcd(etcdCfg *embed.Config,
gRPCSvr func(*grpc.Server),
httpHandles map[string]http.Handler) (*embed.Etcd, error) {
cfg, err := masterCfg.genEmbedEtcdConfig()
if err != nil {
return nil, err
}

// attach extra gRPC and HTTP server
if gRPCSvr != nil {
cfg.ServiceRegister = gRPCSvr
etcdCfg.ServiceRegister = gRPCSvr
}
if httpHandles != nil {
cfg.UserHandlers = httpHandles
etcdCfg.UserHandlers = httpHandles
}

e, err := embed.StartEtcd(cfg)
e, err := embed.StartEtcd(etcdCfg)
if err != nil {
return nil, terror.ErrMasterStartEmbedEtcdFail.Delegate(err)
}
Expand Down
16 changes: 13 additions & 3 deletions dm/master/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/pd/pkg/tempurl"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

Expand All @@ -34,13 +35,20 @@ var _ = check.Suite(&testEtcdSuite{})
type testEtcdSuite struct {
}

func (t *testEtcdSuite) SetUpSuite(c *check.C) {
// initialized the logger to make genEmbedEtcdConfig working.
log.InitLogger(&log.Config{})
}

func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
cfgCluster := NewConfig() // used to start an etcd cluster
cfgCluster.Name = "dm-master-1"
cfgCluster.DataDir = c.MkDir()
cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):]
cfgCluster.PeerUrls = tempurl.Alloc()
c.Assert(cfgCluster.adjust(), check.IsNil)
cfgClusterEtcd, err := cfgCluster.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)

cfgBefore := t.cloneConfig(cfgCluster) // before `prepareJoinEtcd` applied
cfgBefore.DataDir = c.MkDir() // overwrite some config items
Expand All @@ -61,7 +69,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {

// try to join self
cfgAfter.Join = cfgAfter.MasterAddr
err := prepareJoinEtcd(cfgAfter)
err = prepareJoinEtcd(cfgAfter)
c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue)
c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: join self.*is forbidden.*")

Expand Down Expand Up @@ -94,7 +102,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
c.Assert(os.RemoveAll(memberDP), check.IsNil) // remove previous data

// start an etcd cluster
e1, err := startEtcd(cfgCluster, nil, nil)
e1, err := startEtcd(cfgClusterEtcd, nil, nil)
c.Assert(err, check.IsNil)
defer e1.Close()

Expand Down Expand Up @@ -145,7 +153,9 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: there is a member that has not joined successfully, continue the join or remove it.*")

// start the joining etcd
e2, err := startEtcd(cfgAfter, nil, nil)
cfgAfterEtcd, err := cfgAfter.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)
e2, err := startEtcd(cfgAfterEtcd, nil, nil)
c.Assert(err, check.IsNil)
defer e2.Close()

Expand Down
27 changes: 20 additions & 7 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ func NewServer(cfg *Config) *Server {

// Start starts to serving
func (s *Server) Start(ctx context.Context) (err error) {
// prepare config to join an existing cluster
err = prepareJoinEtcd(s.cfg)
if err != nil {
return
}
log.L().Info("config after join prepared", zap.Stringer("config", s.cfg))

// generates embed etcd config before any concurrent gRPC calls.
// potential concurrent gRPC calls:
// - workerrpc.NewGRPCClient
// - getHTTPAPIHandler
// no `String` method exists for embed.Config, and can not marshal it to join too.
// but when starting embed etcd server, the etcd pkg will log the config.
// https://github.com/etcd-io/etcd/blob/3cf2f69b5738fb702ba1a935590f36b52b18979b/embed/etcd.go#L299
etcdCfg, err := s.cfg.genEmbedEtcdConfig()
if err != nil {
return
}

// create clients to DM-workers
for _, workerAddr := range s.cfg.DeployMap {
s.workerClients[workerAddr], err = workerrpc.NewGRPCClient(workerAddr)
Expand Down Expand Up @@ -126,14 +145,8 @@ func (s *Server) Start(ctx context.Context) (err error) {
// gRPC API server
gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) }

// prepare config to join an existing cluster
err = prepareJoinEtcd(s.cfg)
if err != nil {
return
}

// start embed etcd server, gRPC API server and HTTP (API, status and debug) server.
s.etcd, err = startEtcd(s.cfg, gRPCSvr, userHandles)
s.etcd, err = startEtcd(etcdCfg, gRPCSvr, userHandles)
if err != nil {
return
}
Expand Down
3 changes: 2 additions & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,8 @@ func (t *testMaster) TestServer(c *check.C) {

t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.MasterAddr), []byte(utils.GetRawInfo()))
t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.MasterAddr), []byte("Types of profiles available"))
t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no workers or not exist"))
// HTTP API in this unit test is unstable, but we test it in `http_apis` in integration test.
//t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no workers or not exist"))

dupServer := NewServer(cfg)
err := dupServer.Start(ctx)
Expand Down
Loading

0 comments on commit a64b26d

Please sign in to comment.