diff --git a/ddl/ddl.go b/ddl/ddl.go index 66ae7cb911ea3..da677920406b7 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -263,7 +263,7 @@ type DDL interface { // RegisterEventCh registers event channel for ddl. RegisterEventCh(chan<- *util.Event) // SchemaSyncer gets the schema syncer. - SchemaSyncer() SchemaSyncer + SchemaSyncer() util.SchemaSyncer // OwnerManager gets the owner manager. OwnerManager() owner.Manager // GetID gets the ddl ID. @@ -292,7 +292,7 @@ type ddlCtx struct { uuid string store kv.Storage ownerManager owner.Manager - schemaSyncer SchemaSyncer + schemaSyncer util.SchemaSyncer ddlJobDoneCh chan struct{} ddlEventCh chan<- *util.Event lease time.Duration // lease is schema lease. @@ -359,7 +359,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, id := uuid.NewV4().String() ctx, cancelFunc := context.WithCancel(ctx) var manager owner.Manager - var syncer SchemaSyncer + var syncer util.SchemaSyncer if etcdCli == nil { // The etcdCli is nil if the store is localstore which is only used for testing. // So we use mockOwnerManager and MockSchemaSyncer. @@ -367,7 +367,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage, syncer = NewMockSchemaSyncer() } else { manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc) - syncer = NewSchemaSyncer(etcdCli, id) + syncer = util.NewSchemaSyncer(etcdCli, id, manager) } ddlCtx := &ddlCtx{ @@ -450,6 +450,17 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { // checks owner firstly and try to find whether a job exists and run. asyncNotify(worker.ddlJobCh) } + + go tidbutil.WithRecovery( + func() { d.schemaSyncer.StartCleanWork() }, + func(r interface{}) { + if r != nil { + logutil.Logger(ddlLogCtx).Error("[ddl] DDL syncer clean worker meet panic", + zap.String("ID", d.uuid), zap.Reflect("r", r), zap.Stack("stack trace")) + metrics.PanicCounter.WithLabelValues(metrics.LabelDDLSyncer).Inc() + } + }) + metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc() } } @@ -461,6 +472,7 @@ func (d *ddl) close() { startTime := time.Now() close(d.quitCh) d.ownerManager.Cancel() + d.schemaSyncer.CloseCleanWork() err := d.schemaSyncer.RemoveSelfVersionPath() if err != nil { logutil.Logger(ddlLogCtx).Error("[ddl] remove self version path failed", zap.Error(err)) @@ -525,7 +537,7 @@ func (d *ddl) genGlobalIDs(count int) ([]int64, error) { } // SchemaSyncer implements DDL.SchemaSyncer interface. -func (d *ddl) SchemaSyncer() SchemaSyncer { +func (d *ddl) SchemaSyncer() util.SchemaSyncer { return d.schemaSyncer } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 752c66b996e36..0dafe40bd6031 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -40,6 +40,8 @@ var ( RunWorker = true // ddlWorkerID is used for generating the next DDL worker ID. ddlWorkerID = int32(0) + // WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors. + WaitTimeWhenErrorOccured = 1 * time.Second ) type workerType byte @@ -637,6 +639,8 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time if terror.ErrorEqual(err, context.DeadlineExceeded) { return } + d.schemaSyncer.NotifyCleanExpiredPaths() + // Wait until timeout. select { case <-ctx.Done(): return diff --git a/ddl/mock.go b/ddl/mock.go index 1911f8aeec704..c73cb6f81cb51 100644 --- a/ddl/mock.go +++ b/ddl/mock.go @@ -22,10 +22,11 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" + "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/sessionctx" ) -var _ SchemaSyncer = &MockSchemaSyncer{} +var _ util.SchemaSyncer = &MockSchemaSyncer{} const mockCheckVersInterval = 2 * time.Millisecond @@ -37,7 +38,7 @@ type MockSchemaSyncer struct { } // NewMockSchemaSyncer creates a new mock SchemaSyncer. -func NewMockSchemaSyncer() SchemaSyncer { +func NewMockSchemaSyncer() util.SchemaSyncer { return &MockSchemaSyncer{} } @@ -113,6 +114,15 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer } } +// NotifyCleanExpiredPaths implements SchemaSyncer.NotifyCleanExpiredPaths interface. +func (s *MockSchemaSyncer) NotifyCleanExpiredPaths() bool { return true } + +// StartCleanWork implements SchemaSyncer.StartCleanWork interface. +func (s *MockSchemaSyncer) StartCleanWork() {} + +// CloseCleanWork implements SchemaSyncer.CloseCleanWork interface. +func (s *MockSchemaSyncer) CloseCleanWork() {} + type mockDelRange struct { } diff --git a/ddl/syncer.go b/ddl/util/syncer.go similarity index 76% rename from ddl/syncer.go rename to ddl/util/syncer.go index 62cc4e8663c6c..f79993e64ecd4 100644 --- a/ddl/syncer.go +++ b/ddl/util/syncer.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ddl +package util import ( "context" @@ -25,8 +25,10 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/util/logutil" @@ -49,6 +51,8 @@ const ( keyOpDefaultTimeout = 2 * time.Second keyOpRetryInterval = 30 * time.Millisecond checkVersInterval = 20 * time.Millisecond + + ddlPrompt = "ddl-syncer" ) var ( @@ -58,8 +62,8 @@ var ( // SyncerSessionTTL is the etcd session's TTL in seconds. // and it's an exported variable for testing. SyncerSessionTTL = 90 - // WaitTimeWhenErrorOccured is waiting interval when processing DDL jobs encounter errors. - WaitTimeWhenErrorOccured = 1 * time.Second + // ddlLogCtx uses for log. + ddlLogCtx = context.Background() ) // SchemaSyncer is used to synchronize schema version between the DDL worker leader and followers through etcd. @@ -87,6 +91,17 @@ type SchemaSyncer interface { // the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease. // It returns until all servers' versions are equal to the latest version or the ctx is done. OwnerCheckAllVersions(ctx context.Context, latestVer int64) error + // NotifyCleanExpiredPaths informs to clean up expired paths. + // The returned value is used for testing. + NotifyCleanExpiredPaths() bool + // StartCleanWork starts to clean up tasks. + StartCleanWork() + // CloseCleanWork ends cleanup tasks. + CloseCleanWork() +} + +type ownerChecker interface { + IsOwner() bool } type schemaVersionSyncer struct { @@ -97,13 +112,21 @@ type schemaVersionSyncer struct { sync.RWMutex globalVerCh clientv3.WatchChan } + + // for clean worker + ownerChecker ownerChecker + notifyCleanExpiredPathsCh chan struct{} + quiteCh chan struct{} } // NewSchemaSyncer creates a new SchemaSyncer. -func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer { +func NewSchemaSyncer(etcdCli *clientv3.Client, id string, oc ownerChecker) SchemaSyncer { return &schemaVersionSyncer{ - etcdCli: etcdCli, - selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id), + etcdCli: etcdCli, + selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id), + ownerChecker: oc, + notifyCleanExpiredPathsCh: make(chan struct{}, 1), + quiteCh: make(chan struct{}), } } @@ -388,3 +411,106 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestV time.Sleep(checkVersInterval) } } + +const ( + opDefaultRetryCnt = 10 + failedGetTTLLimit = 20 + opDefaultTimeout = 3 * time.Second + opRetryInterval = 500 * time.Millisecond +) + +// NeededCleanTTL is exported for testing. +var NeededCleanTTL = int64(-60) + +func (s *schemaVersionSyncer) StartCleanWork() { + for { + select { + case <-s.notifyCleanExpiredPathsCh: + if !s.ownerChecker.IsOwner() { + continue + } + + for i := 0; i < opDefaultRetryCnt; i++ { + childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout) + resp, err := s.etcdCli.Leases(childCtx) + cancelFunc() + if err != nil { + logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get leases.", zap.Error(err)) + continue + } + + if isFinished := s.doCleanExpirePaths(resp.Leases); isFinished { + break + } + time.Sleep(opRetryInterval) + } + case <-s.quiteCh: + return + } + } +} + +func (s *schemaVersionSyncer) CloseCleanWork() { + close(s.quiteCh) +} + +func (s *schemaVersionSyncer) NotifyCleanExpiredPaths() bool { + var isNotified bool + var err error + startTime := time.Now() + select { + case s.notifyCleanExpiredPathsCh <- struct{}{}: + isNotified = true + default: + err = errors.New("channel is full, failed to notify clean expired paths") + } + metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerNotifyCleanExpirePaths, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds()) + return isNotified +} + +func (s *schemaVersionSyncer) doCleanExpirePaths(leases []clientv3.LeaseStatus) bool { + failedGetIDs := 0 + failedRevokeIDs := 0 + startTime := time.Now() + + defer func() { + metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanExpirePaths, metrics.RetLabel(nil)).Observe(time.Since(startTime).Seconds()) + }() + // TODO: Now LeaseStatus only has lease ID. + for _, lease := range leases { + // The DDL owner key uses '%x', so here print it too. + leaseID := fmt.Sprintf("%x, %d", lease.ID, lease.ID) + childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout) + ttlResp, err := s.etcdCli.TimeToLive(childCtx, lease.ID) + cancelFunc() + if err != nil { + logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get one TTL.", zap.String("leaseID", leaseID), zap.Error(err)) + failedGetIDs++ + continue + } + + if failedGetIDs > failedGetTTLLimit { + return false + } + if ttlResp.TTL >= NeededCleanTTL { + continue + } + + st := time.Now() + childCtx, cancelFunc = context.WithTimeout(context.Background(), opDefaultTimeout) + _, err = s.etcdCli.Revoke(childCtx, lease.ID) + cancelFunc() + if err != nil && terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) { + logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths, failed to revoke lease.", zap.String("leaseID", leaseID), + zap.Int64("TTL", ttlResp.TTL), zap.Error(err)) + failedRevokeIDs++ + } + logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths,", zap.String("leaseID", leaseID), zap.Int64("TTL", ttlResp.TTL)) + metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanOneExpirePath, metrics.RetLabel(err)).Observe(time.Since(st).Seconds()) + } + + if failedGetIDs == 0 && failedRevokeIDs == 0 { + return true + } + return false +} diff --git a/ddl/util/syncer_test.go b/ddl/util/syncer_test.go new file mode 100644 index 0000000000000..9199ba2ac2857 --- /dev/null +++ b/ddl/util/syncer_test.go @@ -0,0 +1,249 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/etcdserver" + "github.com/coreos/etcd/integration" + "github.com/coreos/etcd/mvcc/mvccpb" + "github.com/pingcap/errors" + "github.com/pingcap/parser/terror" + . "github.com/pingcap/tidb/ddl" + . "github.com/pingcap/tidb/ddl/util" + "github.com/pingcap/tidb/owner" + "github.com/pingcap/tidb/store/mockstore" + goctx "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +const minInterval = 10 * time.Nanosecond // It's used to test timeout. + +func TestSyncerSimple(t *testing.T) { + testLease := 5 * time.Millisecond + origin := CheckVersFirstWaitTime + CheckVersFirstWaitTime = 0 + defer func() { + CheckVersFirstWaitTime = origin + }() + + store, err := mockstore.NewMockTikvStore() + if err != nil { + t.Fatal(err) + } + defer store.Close() + + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + cli := clus.RandClient() + ctx := goctx.Background() + d := NewDDL(ctx, cli, store, nil, nil, testLease, nil) + defer d.Stop() + + // for init function + if err = d.SchemaSyncer().Init(ctx); err != nil { + t.Fatalf("schema version syncer init failed %v", err) + } + resp, err := cli.Get(ctx, DDLAllSchemaVersions, clientv3.WithPrefix()) + if err != nil { + t.Fatalf("client get version failed %v", err) + } + key := DDLAllSchemaVersions + "/" + d.OwnerManager().ID() + checkRespKV(t, 1, key, InitialVersion, resp.Kvs...) + // for MustGetGlobalVersion function + globalVer, err := d.SchemaSyncer().MustGetGlobalVersion(ctx) + if err != nil { + t.Fatalf("client get global version failed %v", err) + } + if InitialVersion != fmt.Sprintf("%d", globalVer) { + t.Fatalf("client get global version %d isn't equal to init version %s", globalVer, InitialVersion) + } + childCtx, _ := goctx.WithTimeout(ctx, minInterval) + _, err = d.SchemaSyncer().MustGetGlobalVersion(childCtx) + if !isTimeoutError(err) { + t.Fatalf("client get global version result not match, err %v", err) + } + + d1 := NewDDL(ctx, cli, store, nil, nil, testLease, nil) + defer d1.Stop() + if err = d1.SchemaSyncer().Init(ctx); err != nil { + t.Fatalf("schema version syncer init failed %v", err) + } + + // for watchCh + wg := sync.WaitGroup{} + wg.Add(1) + currentVer := int64(123) + go func() { + defer wg.Done() + select { + case resp := <-d.SchemaSyncer().GlobalVersionCh(): + if len(resp.Events) < 1 { + t.Fatalf("get chan events count less than 1") + } + checkRespKV(t, 1, DDLGlobalSchemaVersion, fmt.Sprintf("%v", currentVer), resp.Events[0].Kv) + case <-time.After(100 * time.Millisecond): + t.Fatalf("get udpate version failed") + } + }() + + // for update latestSchemaVersion + err = d.SchemaSyncer().OwnerUpdateGlobalVersion(ctx, currentVer) + if err != nil { + t.Fatalf("update latest schema version failed %v", err) + } + + wg.Wait() + + // for CheckAllVersions + childCtx, cancel := goctx.WithTimeout(ctx, 20*time.Millisecond) + err = d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer) + if err == nil { + t.Fatalf("check result not match") + } + cancel() + + // for UpdateSelfVersion + childCtx, cancel = goctx.WithTimeout(ctx, 100*time.Millisecond) + err = d.SchemaSyncer().UpdateSelfVersion(childCtx, currentVer) + if err != nil { + t.Fatalf("update self version failed %v", errors.ErrorStack(err)) + } + cancel() + childCtx, cancel = goctx.WithTimeout(ctx, 100*time.Millisecond) + err = d1.SchemaSyncer().UpdateSelfVersion(childCtx, currentVer) + if err != nil { + t.Fatalf("update self version failed %v", errors.ErrorStack(err)) + } + cancel() + childCtx, _ = goctx.WithTimeout(ctx, minInterval) + err = d1.SchemaSyncer().UpdateSelfVersion(childCtx, currentVer) + if !isTimeoutError(err) { + t.Fatalf("update self version result not match, err %v", err) + } + + // for CheckAllVersions + childCtx, _ = goctx.WithTimeout(ctx, 100*time.Millisecond) + err = d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer-1) + if err != nil { + t.Fatalf("check all versions failed %v", err) + } + childCtx, _ = goctx.WithTimeout(ctx, 100*time.Millisecond) + err = d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer) + if err != nil { + t.Fatalf("check all versions failed %v", err) + } + childCtx, _ = goctx.WithTimeout(ctx, minInterval) + err = d.SchemaSyncer().OwnerCheckAllVersions(childCtx, currentVer) + if !isTimeoutError(err) { + t.Fatalf("check all versions result not match, err %v", err) + } + + // for StartCleanWork + go d.SchemaSyncer().StartCleanWork() + ttl := 10 + // Make sure NeededCleanTTL > ttl, then we definitely clean the ttl. + NeededCleanTTL = int64(11) + ttlKey := "session_ttl_key" + ttlVal := "session_ttl_val" + session, err := owner.NewSession(ctx, "", cli, owner.NewSessionDefaultRetryCnt, ttl) + if err != nil { + t.Fatalf("new session failed %v", err) + } + childCtx, cancel = context.WithTimeout(ctx, 100*time.Millisecond) + err = PutKVToEtcd(childCtx, cli, 5, ttlKey, ttlVal, clientv3.WithLease(session.Lease())) + if err != nil { + t.Fatalf("put kv to etcd failed %v", err) + } + cancel() + // Make sure the ttlKey is exist in etcd. + resp, err = cli.Get(ctx, ttlKey) + if err != nil { + t.Fatalf("client get version failed %v", err) + } + checkRespKV(t, 1, ttlKey, ttlVal, resp.Kvs...) + d.SchemaSyncer().NotifyCleanExpiredPaths() + // Make sure the clean worker is done. + notifiedCnt := 1 + for i := 0; i < 100; i++ { + isNotified := d.SchemaSyncer().NotifyCleanExpiredPaths() + if isNotified { + notifiedCnt++ + } + // notifyCleanExpiredPathsCh's length is 1, + // so when notifiedCnt is 3, we can make sure the clean worker is done at least once. + if notifiedCnt == 3 { + break + } + time.Sleep(20 * time.Millisecond) + } + if notifiedCnt != 3 { + t.Fatal("clean worker don't finish") + } + // Make sure the ttlKey is removed in etcd. + resp, err = cli.Get(ctx, ttlKey) + if err != nil { + t.Fatalf("client get version failed %v", err) + } + checkRespKV(t, 0, ttlKey, "", resp.Kvs...) + + // for RemoveSelfVersionPath + resp, err = cli.Get(goctx.Background(), key) + if err != nil { + t.Fatalf("get key %s failed %v", key, err) + } + currVer := fmt.Sprintf("%v", currentVer) + checkRespKV(t, 1, key, currVer, resp.Kvs...) + d.SchemaSyncer().RemoveSelfVersionPath() + resp, err = cli.Get(goctx.Background(), key) + if err != nil { + t.Fatalf("get key %s failed %v", key, err) + } + if len(resp.Kvs) != 0 { + t.Fatalf("remove key %s failed %v", key, err) + } +} + +func isTimeoutError(err error) bool { + if terror.ErrorEqual(err, goctx.DeadlineExceeded) || grpc.Code(errors.Cause(err)) == codes.DeadlineExceeded || + terror.ErrorEqual(err, etcdserver.ErrTimeout) { + return true + } + return false +} + +func checkRespKV(t *testing.T, kvCount int, key, val string, + kvs ...*mvccpb.KeyValue) { + if len(kvs) != kvCount { + t.Fatalf("resp key %s kvs %v length is != %d", key, kvs, kvCount) + } + if kvCount == 0 { + return + } + + kv := kvs[0] + if string(kv.Key) != key { + t.Fatalf("key resp %s, exported %s", kv.Key, key) + } + if val != val { + t.Fatalf("val resp %s, exported %s", kv.Value, val) + } +} diff --git a/domain/domain_test.go b/domain/domain_test.go index 896bbe4d97ae0..9aeb2ff2e46de 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -131,7 +131,7 @@ func TestInfo(t *testing.T) { } // Test the scene where syncer.Done() gets the information. - err = failpoint.Enable("github.com/pingcap/tidb/ddl/ErrorMockSessionDone", `return(true)`) + err = failpoint.Enable("github.com/pingcap/tidb/ddl/util/ErrorMockSessionDone", `return(true)`) if err != nil { t.Fatal(err) } @@ -148,7 +148,7 @@ func TestInfo(t *testing.T) { if !syncerStarted { t.Fatal("start syncer failed") } - err = failpoint.Disable("github.com/pingcap/tidb/ddl/ErrorMockSessionDone") + err = failpoint.Disable("github.com/pingcap/tidb/ddl/util/ErrorMockSessionDone") if err != nil { t.Fatal(err) } diff --git a/domain/info.go b/domain/info.go index 49d614c295ab3..0831c267f8a7f 100644 --- a/domain/info.go +++ b/domain/info.go @@ -24,7 +24,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/ddl" + "github.com/pingcap/tidb/ddl/util" "github.com/pingcap/tidb/owner" "github.com/pingcap/tidb/util/hack" "github.com/pingcap/tidb/util/logutil" @@ -129,7 +129,7 @@ func (is *InfoSyncer) storeServerInfo(ctx context.Context) error { return errors.Trace(err) } str := string(hack.String(infoBuf)) - err = ddl.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease())) + err = util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, is.serverInfoPath, str, clientv3.WithLease(is.session.Lease())) return err } @@ -138,7 +138,7 @@ func (is *InfoSyncer) RemoveServerInfo() { if is.etcdCli == nil { return } - err := ddl.DeleteKeyFromEtcd(is.serverInfoPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) + err := util.DeleteKeyFromEtcd(is.serverInfoPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) if err != nil { logutil.Logger(context.Background()).Error("remove server info failed", zap.Error(err)) } diff --git a/go.mod b/go.mod index b0a09357089ba..1ba9886eab72f 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect - github.com/coreos/bbolt v1.3.2 // indirect + github.com/coreos/bbolt v1.3.3 // indirect github.com/coreos/etcd v3.3.13+incompatible github.com/coreos/go-semver v0.2.0 // indirect github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect @@ -78,7 +78,7 @@ require ( github.com/uber/jaeger-lib v1.5.0 // indirect github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d // indirect github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect - go.etcd.io/bbolt v1.3.2 // indirect + go.etcd.io/bbolt v1.3.3 // indirect go.uber.org/atomic v1.3.2 go.uber.org/zap v1.9.1 golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e diff --git a/go.sum b/go.sum index 76f7a1045a7bd..cc9d34eb0b3c9 100644 --- a/go.sum +++ b/go.sum @@ -12,8 +12,8 @@ github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd h1:qMd81Ts1T2OTKmB4acZcyKaMtRnY5Y44NuXGX2GFJ1w= github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI= -github.com/coreos/bbolt v1.3.2 h1:wZwiHHUieZCquLkDL0B8UhzreNWsPHooDAG3q34zk0s= -github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY= +github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY= @@ -191,8 +191,8 @@ github.com/unrolled/render v0.0.0-20180914162206-b9786414de4d/go.mod h1:tu82oB5W github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yookoala/realpath v1.0.0/go.mod h1:gJJMA9wuX7AcqLy1+ffPatSCySA1FQ2S8Ya9AIoYBpE= -go.etcd.io/bbolt v1.3.2 h1:Z/90sZLPOeCy2PwprqkFa25PdkusRzaj9P8zm/KNyvk= -go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= +go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= diff --git a/metrics/ddl.go b/metrics/ddl.go index 8a6de6347cd38..92de5826c6d8c 100644 --- a/metrics/ddl.go +++ b/metrics/ddl.go @@ -65,10 +65,13 @@ var ( Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms ~ 1024s }, []string{LblResult}) - OwnerUpdateGlobalVersion = "update_global_version" - OwnerGetGlobalVersion = "get_global_version" - OwnerCheckAllVersions = "check_all_versions" - OwnerHandleSyncerHistogram = prometheus.NewHistogramVec( + OwnerUpdateGlobalVersion = "update_global_version" + OwnerGetGlobalVersion = "get_global_version" + OwnerCheckAllVersions = "check_all_versions" + OwnerNotifyCleanExpirePaths = "notify_clean_expire_paths" + OwnerCleanExpirePaths = "clean_expire_paths" + OwnerCleanOneExpirePath = "clean_an_expire_path" + OwnerHandleSyncerHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "tidb", Subsystem: "ddl", @@ -93,6 +96,7 @@ var ( CreateDDLInstance = "create_ddl_instance" CreateDDL = "create_ddl" + StartCleanWork = "start_clean_work" DDLOwner = "owner" DDLCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ diff --git a/metrics/metrics.go b/metrics/metrics.go index 5711d411efc12..5cd1005be9a44 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -28,12 +28,13 @@ var ( // metrics labels. const ( - LabelSession = "session" - LabelDomain = "domain" - LabelDDLOwner = "ddl-owner" - LabelDDL = "ddl" - LabelGCWorker = "gcworker" - LabelAnalyze = "analyze" + LabelSession = "session" + LabelDomain = "domain" + LabelDDLOwner = "ddl-owner" + LabelDDL = "ddl" + LabelDDLSyncer = "ddl-syncer" + LabelGCWorker = "gcworker" + LabelAnalyze = "analyze" LabelBatchRecvLoop = "batch-recv-loop" LabelBatchSendLoop = "batch-send-loop"