Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Remove expired keys on PD #10406

Merged
merged 17 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func newDDL(ctx context.Context, etcdCli *clientv3.Client, store kv.Storage,
syncer = NewMockSchemaSyncer()
} else {
manager = owner.NewOwnerManager(etcdCli, ddlPrompt, id, DDLOwnerKey, cancelFunc)
syncer = NewSchemaSyncer(etcdCli, id)
syncer = NewSchemaSyncer(etcdCli, id, manager)
}

ddlCtx := &ddlCtx{
Expand Down Expand Up @@ -452,6 +452,16 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
// checks owner firstly and try to find whether a job exists and run.
asyncNotify(worker.ddlJobCh)
}

go tidbutil.WithRecovery(
func() { d.schemaSyncer.StartCleanWork() },
func(r interface{}) {
if r != nil {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
logutil.Logger(ddlLogCtx).Error("[ddl] DDL syncer clean worker meet panic", zap.String("ID", d.uuid))
metrics.PanicCounter.WithLabelValues(metrics.LabelDDLSyncer).Inc()
}
})
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s", metrics.StartCleanWork)).Inc()
}
}

Expand All @@ -463,6 +473,7 @@ func (d *ddl) close() {
startTime := time.Now()
close(d.quitCh)
d.ownerManager.Cancel()
d.schemaSyncer.CloseCleanWork()
err := d.schemaSyncer.RemoveSelfVersionPath()
if err != nil {
logutil.Logger(ddlLogCtx).Error("[ddl] remove self version path failed", zap.Error(err))
Expand Down
2 changes: 2 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,8 @@ func (w *worker) waitSchemaChanged(ctx context.Context, d *ddlCtx, waitTime time
if terror.ErrorEqual(err, context.DeadlineExceeded) {
return
}
d.schemaSyncer.NotifyCleanExpiredPaths()
// Wait until timeout.
select {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
case <-ctx.Done():
return
Expand Down
9 changes: 9 additions & 0 deletions ddl/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ func (s *MockSchemaSyncer) OwnerCheckAllVersions(ctx context.Context, latestVer
}
}

// NotifyCleanExpiredPaths implements SchemaSyncer.NotifyCleanExpiredPaths interface.
func (s *MockSchemaSyncer) NotifyCleanExpiredPaths() {}

// StartCleanWork implements SchemaSyncer.StartCleanWork interface.
func (s *MockSchemaSyncer) StartCleanWork() {}

// CloseCleanWork implements SchemaSyncer.CloseCleanWork interface.
func (s *MockSchemaSyncer) CloseCleanWork() {}

type mockDelRange struct {
}

Expand Down
129 changes: 126 additions & 3 deletions ddl/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/pingcap/errors"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -86,6 +88,16 @@ type SchemaSyncer interface {
// the latest schema version. If the result is false, wait for a while and check again util the processing time reach 2 * lease.
// It returns until all servers' versions are equal to the latest version or the ctx is done.
OwnerCheckAllVersions(ctx context.Context, latestVer int64) error
// NotifyCleanExpiredPaths informs to clean up expired paths.
NotifyCleanExpiredPaths()
// StartCleanWork starts to clean up tasks.
StartCleanWork()
// CloseCleanWork ends cleanup tasks.
CloseCleanWork()
}

type ownerChecker interface {
IsOwner() bool
}

type schemaVersionSyncer struct {
Expand All @@ -96,13 +108,21 @@ type schemaVersionSyncer struct {
sync.RWMutex
globalVerCh clientv3.WatchChan
}

// for clean worker
ownerChecker ownerChecker
notifyCleanExpiredPathsCh chan struct{}
quiteCh chan struct{}
}

// NewSchemaSyncer creates a new SchemaSyncer.
func NewSchemaSyncer(etcdCli *clientv3.Client, id string) SchemaSyncer {
func NewSchemaSyncer(etcdCli *clientv3.Client, id string, oc ownerChecker) SchemaSyncer {
return &schemaVersionSyncer{
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
etcdCli: etcdCli,
selfSchemaVerPath: fmt.Sprintf("%s/%s", DDLAllSchemaVersions, id),
ownerChecker: oc,
notifyCleanExpiredPathsCh: make(chan struct{}, 1),
quiteCh: make(chan struct{}),
}
}

Expand Down Expand Up @@ -380,3 +400,106 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, latestV
time.Sleep(checkVersInterval)
}
}

const (
opDefaultRetryCnt = 10
failedGetTTLLimit = 20
checkCleanJobInterval = 1 * time.Second
opDefaultTimeout = 3 * time.Second
opRetryInterval = 500 * time.Millisecond
// NeededCleanTTL is exported for testing.
NeededCleanTTL = -60
)

func (s *schemaVersionSyncer) StartCleanWork() {
for {
time.Sleep(checkCleanJobInterval)
if !s.ownerChecker.IsOwner() {
continue
}

select {
case <-s.notifyCleanExpiredPathsCh:
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < opDefaultRetryCnt; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can extract these codes into a func, to make it more readable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's OK. Here only get leases and do doCleanExpirePaths.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but the break in line 433 is not very obvious.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does a retry, I think it's OK.

childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout)
resp, err := s.etcdCli.Leases(childCtx)
cancelFunc()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get leases.", zap.Error(err))
continue
}

if isFinished := s.doCleanExpirePaths(resp.Leases); isFinished {
break
}
time.Sleep(opRetryInterval)
}
case <-s.quiteCh:
return
default:
}
}
}

func (s *schemaVersionSyncer) CloseCleanWork() {
close(s.quiteCh)
logutil.Logger(context.Background()).Info("")
zimulala marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *schemaVersionSyncer) NotifyCleanExpiredPaths() {
var err error
startTime := time.Now()
select {
case s.notifyCleanExpiredPathsCh <- struct{}{}:
default:
err = errors.New("channel is full, failed to notify clean expired paths")
}
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerNotifyCleanExpirePaths, metrics.RetLabel(err)).Observe(time.Since(startTime).Seconds())
}

func (s *schemaVersionSyncer) doCleanExpirePaths(leases []clientv3.LeaseStatus) bool {
failedGetIDs := 0
failedRevokeIDs := 0
startTime := time.Now()

defer func() {
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanExpirePaths, metrics.RetLabel(nil)).Observe(time.Since(startTime).Seconds())
}()
// TODO: Now LeaseStatus only has lease ID.
for _, lease := range leases {
// The DDL owner key uses '%x', so here print it too.
leaseID := fmt.Sprintf("%x, %d", lease.ID, lease.ID)
childCtx, cancelFunc := context.WithTimeout(context.Background(), opDefaultTimeout)
ttlResp, err := s.etcdCli.TimeToLive(childCtx, lease.ID)
cancelFunc()
if err != nil {
logutil.Logger(ddlLogCtx).Info("[ddl] syncer clean expired paths, failed to get TTL.", zap.String("leaseID", leaseID), zap.Error(err))
zimulala marked this conversation as resolved.
Show resolved Hide resolved
failedGetIDs++
continue
}

if failedGetIDs > failedGetTTLLimit {
return false
}
if ttlResp.TTL >= NeededCleanTTL {
continue
}

st := time.Now()
childCtx, cancelFunc = context.WithTimeout(context.Background(), opDefaultTimeout)
_, err = s.etcdCli.Revoke(childCtx, lease.ID)
cancelFunc()
if err != nil && terror.ErrorEqual(err, rpctypes.ErrLeaseNotFound) {
zimulala marked this conversation as resolved.
Show resolved Hide resolved
logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths, failed to revoke lease.", zap.String("leaseID", leaseID),
zap.Int64("TTL", ttlResp.TTL), zap.Error(err))
failedRevokeIDs++
}
logutil.Logger(ddlLogCtx).Warn("[ddl] syncer clean expired paths,", zap.String("leaseID", leaseID), zap.Int64("TTL", ttlResp.TTL))
zimulala marked this conversation as resolved.
Show resolved Hide resolved
metrics.OwnerHandleSyncerHistogram.WithLabelValues(metrics.OwnerCleanOneExpirePath, metrics.RetLabel(err)).Observe(time.Since(st).Seconds())
}

if failedGetIDs == 0 && failedRevokeIDs == 0 {
return true
}
return false
}
12 changes: 8 additions & 4 deletions metrics/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ var (
Buckets: prometheus.ExponentialBuckets(0.01, 2, 20),
}, []string{LblResult})

OwnerUpdateGlobalVersion = "update_global_version"
OwnerGetGlobalVersion = "get_global_version"
OwnerCheckAllVersions = "check_all_versions"
OwnerHandleSyncerHistogram = prometheus.NewHistogramVec(
OwnerUpdateGlobalVersion = "update_global_version"
OwnerGetGlobalVersion = "get_global_version"
OwnerCheckAllVersions = "check_all_versions"
OwnerNotifyCleanExpirePaths = "notify_clean_expire_paths"
OwnerCleanExpirePaths = "clean_expire_paths"
OwnerCleanOneExpirePath = "clean_an_expire_path"
OwnerHandleSyncerHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "tidb",
Subsystem: "ddl",
Expand All @@ -93,6 +96,7 @@ var (

CreateDDLInstance = "create_ddl_instance"
CreateDDL = "create_ddl"
StartCleanWork = "start_clean_work"
DDLOwner = "owner"
DDLCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
13 changes: 7 additions & 6 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ var (

// metrics labels.
const (
LabelSession = "session"
LabelDomain = "domain"
LabelDDLOwner = "ddl-owner"
LabelDDL = "ddl"
LabelGCWorker = "gcworker"
LabelAnalyze = "analyze"
LabelSession = "session"
LabelDomain = "domain"
LabelDDLOwner = "ddl-owner"
LabelDDL = "ddl"
LabelDDLSyncer = "ddl-syncer"
LabelGCWorker = "gcworker"
LabelAnalyze = "analyze"

LabelBatchRecvLoop = "batch-recv-loop"
LabelBatchSendLoop = "batch-send-loop"
Expand Down