Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: combine etcd log into dm-master #360

Merged
merged 20 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ ErrTaskCheckFailedOpenDB,[code=26002:class=task-check:scope=internal:level=high]
ErrTaskCheckNewTableRouter,[code=26003:class=task-check:scope=internal:level=medium],"new table router error"
ErrTaskCheckNewColumnMapping,[code=26004:class=task-check:scope=internal:level=medium],"new column mapping error"
ErrTaskCheckSyncConfigError,[code=26005:class=task-check:scope=internal:level=medium],"%s %v: %v\n detail: %v"
ErrTaskCheckNewBWList,[code=26006:class=task-check:scope=internal:level=medium],"new black white list error"
ErrRelayParseUUIDIndex,[code=28001:class=relay-event-lib:scope=internal:level=high],"parse server-uuid.index"
ErrRelayParseUUIDSuffix,[code=28002:class=relay-event-lib:scope=internal:level=high],"UUID (with suffix) %s not valid"
ErrRelayUUIDWithSuffixNotFound,[code=28003:class=relay-event-lib:scope=internal:level=high],"no UUID (with suffix) matched %s found in %s, all UUIDs are %v"
Expand Down Expand Up @@ -205,6 +206,7 @@ ErrRelayPurgeArgsNotValid,[code=30042:class=relay-unit:scope=internal:level=high
ErrPreviousGTIDsNotValid,[code=30043:class=relay-unit:scope=internal:level=high],"previousGTIDs %s not valid"
ErrDumpUnitRuntime,[code=32001:class=dump-unit:scope=internal:level=high],"mydumper runs with error"
ErrDumpUnitGenTableRouter,[code=32002:class=dump-unit:scope=internal:level=high],"generate table router"
ErrDumpUnitGenBWList,[code=32003:class=dump-unit:scope=internal:level=high],"generate black white list"
ErrLoadUnitCreateSchemaFile,[code=34001:class=load-unit:scope=internal:level=medium],"generate schema file"
ErrLoadUnitInvalidFileEnding,[code=34002:class=load-unit:scope=internal:level=high],"corresponding ending of sql: ')' not found"
ErrLoadUnitParseQuoteValues,[code=34003:class=load-unit:scope=internal:level=high],"parse quote values error"
Expand All @@ -220,6 +222,7 @@ ErrLoadUnitNoDBFile,[code=34012:class=load-unit:scope=internal:level=high],"inva
ErrLoadUnitNoTableFile,[code=34013:class=load-unit:scope=internal:level=high],"invalid data sql file, cannot find table - %s"
ErrLoadUnitDumpDirNotFound,[code=34014:class=load-unit:scope=internal:level=high],"%s does not exist or it's not a dir"
ErrLoadUnitDuplicateTableFile,[code=34015:class=load-unit:scope=internal:level=high],"invalid table schema file, duplicated item - %s"
ErrLoadUnitNewBWList,[code=34016:class=load-unit:scope=internal:level=high],"new black white list"
ErrSyncerUnitPanic,[code=36001:class=sync-unit:scope=internal:level=high],"panic error: %v"
ErrSyncUnitInvalidTableName,[code=36002:class=sync-unit:scope=internal:level=high],"extract table name for DML error: %s"
ErrSyncUnitTableNameQuery,[code=36003:class=sync-unit:scope=internal:level=high],"table name parse error: %s"
Expand Down Expand Up @@ -279,6 +282,7 @@ ErrSyncerUnitResolveCasualityFail,[code=36056:class=sync-unit:scope=internal:lev
ErrSyncerUnitReopenStreamNotSupport,[code=36057:class=sync-unit:scope=internal:level=high],"reopen %T not supported"
ErrSyncerUnitUpdateConfigInSharding,[code=36058:class=sync-unit:scope=internal:level=high],"try update config when some tables' (%v) sharding DDL not synced not supported"
ErrSyncerUnitExecWithNoBlockingDDL,[code=36059:class=sync-unit:scope=internal:level=high],"process unit not waiting for sharding DDL to sync"
ErrSyncerUnitNewBWList,[code=36060:class=sync-unit:scope=internal:level=high],"new black white list"
ErrMasterSQLOpNilRequest,[code=38001:class=dm-master:scope=internal:level=medium],"nil request not valid"
ErrMasterSQLOpNotSupport,[code=38002:class=dm-master:scope=internal:level=medium],"op %s not supported"
ErrMasterSQLOpWithoutSharding,[code=38003:class=dm-master:scope=internal:level=medium],"operate request without --sharding specified not valid"
Expand Down
5 changes: 4 additions & 1 deletion checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (c *Checker) Init() (err error) {
_, checkSchema := c.checkingItems[config.TableSchemaChecking]

for _, instance := range c.instances {
bw := filter.New(instance.cfg.CaseSensitive, instance.cfg.BWList)
bw, err := filter.New(instance.cfg.CaseSensitive, instance.cfg.BWList)
if err != nil {
return terror.ErrTaskCheckNewBWList.Delegate(err)
}
r, err := router.NewTableRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
if err != nil {
return terror.ErrTaskCheckNewTableRouter.Delegate(err)
Expand Down
13 changes: 12 additions & 1 deletion dm/master/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ func (c *Config) adjust() error {
c.InitialClusterState = defaultInitialClusterState
}

_, err = c.genEmbedEtcdConfig() // verify embed etcd config
return err
}

Expand Down Expand Up @@ -298,6 +297,7 @@ func (c *Config) Reload() error {
}

// genEmbedEtcdConfig generates the configuration needed by embed etcd.
// This method should be called after logger initialized and before any concurrent gRPC calls.
func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg := embed.NewConfig()
cfg.Name = c.Name
Expand All @@ -324,6 +324,17 @@ func (c *Config) genEmbedEtcdConfig() (*embed.Config, error) {
cfg.InitialCluster = c.InitialCluster
cfg.ClusterState = c.InitialClusterState

// use zap as the logger for embed etcd
// NOTE: `genEmbedEtcdConfig` can only be called after logger initialized.
// NOTE: if using zap logger for etcd, must build it before any concurrent gRPC calls,
// otherwise, DATA RACE occur in builder and gRPC.
cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(log.L().Logger, log.L().Core(), log.Props().Syncer)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
cfg.Logger = "zap"
err = cfg.Validate() // verify & trigger the builder
if err != nil {
return nil, terror.ErrMasterGenEmbedEtcdConfigFail.AnnotateDelegate(err, "fail to validate embed etcd config")
}

return cfg, nil
}

Expand Down
6 changes: 6 additions & 0 deletions dm/master/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/check"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

Expand All @@ -38,6 +39,11 @@ var (
type testConfigSuite struct {
}

func (t *testConfigSuite) SetUpSuite(c *check.C) {
// initialized the logger to make genEmbedEtcdConfig working.
log.InitLogger(&log.Config{})
}

func (t *testConfigSuite) TestPrintSampleConfig(c *check.C) {
var (
buf []byte
Expand Down
13 changes: 4 additions & 9 deletions dm/master/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,18 @@ const (
)

// startEtcd starts an embedded etcd server.
func startEtcd(masterCfg *Config,
func startEtcd(etcdCfg *embed.Config,
gRPCSvr func(*grpc.Server),
httpHandles map[string]http.Handler) (*embed.Etcd, error) {
cfg, err := masterCfg.genEmbedEtcdConfig()
if err != nil {
return nil, err
}

// attach extra gRPC and HTTP server
if gRPCSvr != nil {
cfg.ServiceRegister = gRPCSvr
etcdCfg.ServiceRegister = gRPCSvr
}
if httpHandles != nil {
cfg.UserHandlers = httpHandles
etcdCfg.UserHandlers = httpHandles
}

e, err := embed.StartEtcd(cfg)
e, err := embed.StartEtcd(etcdCfg)
if err != nil {
return nil, terror.ErrMasterStartEmbedEtcdFail.Delegate(err)
}
Expand Down
16 changes: 13 additions & 3 deletions dm/master/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/pd/pkg/tempurl"
"go.etcd.io/etcd/embed"

"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
)

Expand All @@ -34,13 +35,20 @@ var _ = check.Suite(&testEtcdSuite{})
type testEtcdSuite struct {
}

func (t *testEtcdSuite) SetUpSuite(c *check.C) {
// initialized the logger to make genEmbedEtcdConfig working.
log.InitLogger(&log.Config{})
}

func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
cfgCluster := NewConfig() // used to start an etcd cluster
cfgCluster.Name = "dm-master-1"
cfgCluster.DataDir = c.MkDir()
cfgCluster.MasterAddr = tempurl.Alloc()[len("http://"):]
cfgCluster.PeerUrls = tempurl.Alloc()
c.Assert(cfgCluster.adjust(), check.IsNil)
cfgClusterEtcd, err := cfgCluster.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)

cfgBefore := t.cloneConfig(cfgCluster) // before `prepareJoinEtcd` applied
cfgBefore.DataDir = c.MkDir() // overwrite some config items
Expand All @@ -61,7 +69,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {

// try to join self
cfgAfter.Join = cfgAfter.MasterAddr
err := prepareJoinEtcd(cfgAfter)
err = prepareJoinEtcd(cfgAfter)
c.Assert(terror.ErrMasterJoinEmbedEtcdFail.Equal(err), check.IsTrue)
c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: join self.*is forbidden.*")

Expand Down Expand Up @@ -94,7 +102,7 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
c.Assert(os.RemoveAll(memberDP), check.IsNil) // remove previous data

// start an etcd cluster
e1, err := startEtcd(cfgCluster, nil, nil)
e1, err := startEtcd(cfgClusterEtcd, nil, nil)
c.Assert(err, check.IsNil)
defer e1.Close()

Expand Down Expand Up @@ -145,7 +153,9 @@ func (t *testEtcdSuite) TestPrepareJoinEtcd(c *check.C) {
c.Assert(err, check.ErrorMatches, ".*fail to join embed etcd: there is a member that has not joined successfully, continue the join or remove it.*")

// start the joining etcd
e2, err := startEtcd(cfgAfter, nil, nil)
cfgAfterEtcd, err := cfgAfter.genEmbedEtcdConfig()
c.Assert(err, check.IsNil)
e2, err := startEtcd(cfgAfterEtcd, nil, nil)
c.Assert(err, check.IsNil)
defer e2.Close()

Expand Down
23 changes: 16 additions & 7 deletions dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,21 @@ func NewServer(cfg *Config) *Server {

// Start starts to serving
func (s *Server) Start(ctx context.Context) (err error) {
// prepare config to join an existing cluster
err = prepareJoinEtcd(s.cfg)
if err != nil {
return
}

// generates embed etcd config before any concurrent gRPC calls.
// potential concurrent gRPC calls:
// - workerrpc.NewGRPCClient
// - getHTTPAPIHandler
etcdCfg, err := s.cfg.genEmbedEtcdConfig()
if err != nil {
return
}

// create clients to DM-workers
for _, workerAddr := range s.cfg.DeployMap {
s.workerClients[workerAddr], err = workerrpc.NewGRPCClient(workerAddr)
Expand Down Expand Up @@ -126,14 +141,8 @@ func (s *Server) Start(ctx context.Context) (err error) {
// gRPC API server
gRPCSvr := func(gs *grpc.Server) { pb.RegisterMasterServer(gs, s) }

// prepare config to join an existing cluster
err = prepareJoinEtcd(s.cfg)
if err != nil {
return
}

// start embed etcd server, gRPC API server and HTTP (API, status and debug) server.
s.etcd, err = startEtcd(s.cfg, gRPCSvr, userHandles)
s.etcd, err = startEtcd(etcdCfg, gRPCSvr, userHandles)
if err != nil {
return
}
Expand Down
3 changes: 2 additions & 1 deletion dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,8 @@ func (t *testMaster) TestServer(c *check.C) {

t.testHTTPInterface(c, fmt.Sprintf("http://%s/status", cfg.MasterAddr), []byte(utils.GetRawInfo()))
t.testHTTPInterface(c, fmt.Sprintf("http://%s/debug/pprof/", cfg.MasterAddr), []byte("Types of profiles available"))
t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no workers or not exist"))
// HTTP API in this unit test is unstable, but we test it in `http_apis` in integration test.
//t.testHTTPInterface(c, fmt.Sprintf("http://%s/apis/v1alpha1/status/test-task", cfg.MasterAddr), []byte("task test-task has no workers or not exist"))

dupServer := NewServer(cfg)
err := dupServer.Start(ctx)
Expand Down
61 changes: 27 additions & 34 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,77 +4,70 @@ require (
github.com/BurntSushi/toml v0.3.1
github.com/DATA-DOG/go-sqlmock v1.3.3
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
github.com/cespare/xxhash/v2 v2.1.1 // indirect
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e
github.com/coreos/etcd v3.3.15+incompatible
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/go-ole/go-ole v1.2.4 // indirect
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.2.1
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/golang/mock v1.3.1
github.com/golang/protobuf v1.3.2
github.com/google/btree v1.0.0 // indirect
github.com/google/go-cmp v0.3.1 // indirect
github.com/gorilla/mux v1.7.3 // indirect
github.com/gorilla/websocket v1.4.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.9.6
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.12.1
github.com/json-iterator/go v1.1.8 // indirect
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/klauspost/cpuid v1.2.1 // indirect
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/montanaflynn/stats v0.5.0 // indirect
github.com/onsi/ginkgo v1.9.0 // indirect
github.com/onsi/gomega v1.6.0 // indirect
github.com/opentracing/opentracing-go v1.1.0 // indirect
github.com/pingcap/check v0.0.0-20190102082844-67f458068fc8
github.com/pingcap/check v0.0.0-20191107115940-caf2b9e6ccf4
github.com/pingcap/errcode v0.3.0 // indirect
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190708053854-e7b1061e6e81
github.com/pingcap/kvproto v0.0.0-20190827032240-9696cd0c6acb // indirect
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191008032157-51a2e3b2e34b
github.com/pingcap/pd v0.0.0-20190712044914-75a1f9f3062b
github.com/pingcap/tidb v0.0.0-20190827060935-cc07b110825e
github.com/pingcap/tidb-tools v3.0.0-beta.1.0.20190821032033-e6ccf3994944+incompatible
github.com/pingcap/tipb v0.0.0-20191008064422-018b2fadf414 // indirect
github.com/prometheus/client_golang v1.1.0
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 // indirect
github.com/prometheus/procfs v0.0.4 // indirect
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20191112053614-3b43b46331d5
github.com/pingcap/pd v1.1.0-beta.0.20191114130008-2488cb978644
github.com/pingcap/tidb v1.1.0-beta.0.20191115021711-b274eb2079dc
github.com/pingcap/tidb-tools v3.0.6-0.20191113022349-48d5e90d3271+incompatible
github.com/prometheus/client_golang v1.2.1
github.com/prometheus/procfs v0.0.6 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 // indirect
github.com/satori/go.uuid v1.2.0
github.com/shirou/gopsutil v2.18.12+incompatible // indirect
github.com/shopspring/decimal v0.0.0-20190905144223-a36b5d85f337 // indirect
github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726
github.com/siddontang/go-log v0.0.0-20190221022429-1e957dd83bed // indirect
github.com/siddontang/go-mysql v0.0.0-20191009015310-f66c8b344478
github.com/sirupsen/logrus v1.4.2 // indirect
github.com/soheilhy/cmux v0.1.4
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 // indirect
github.com/uber-go/atomic v1.4.0 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible // indirect
github.com/uber/jaeger-lib v2.0.0+incompatible // indirect
github.com/unrolled/render v1.0.1 // indirect
go.etcd.io/etcd v3.3.15+incompatible
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20191002192127-34f69633bfdc // indirect
golang.org/x/net v0.0.0-20191007182048-72f939374954
golang.org/x/sys v0.0.0-20191008105621-543471e840be
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4
golang.org/x/tools v0.0.0-20191007185444-6536af71d98a // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708 // indirect
golang.org/x/net v0.0.0-20191112182307-2180aed22343
golang.org/x/sys v0.0.0-20191113165036-4c7a9d0fe056
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/tools v0.0.0-20191114222411-4191b8cbba09 // indirect
google.golang.org/appengine v1.6.1 // indirect
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55
google.golang.org/grpc v1.23.0
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/stretchr/testify.v1 v1.2.2 // indirect
gopkg.in/yaml.v2 v2.2.4
sigs.k8s.io/yaml v1.1.0 // indirect
google.golang.org/genproto v0.0.0-20191114150713-6bbd007550de
google.golang.org/grpc v1.25.1
gopkg.in/yaml.v2 v2.2.5
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 // indirect
)

go 1.13
Loading