Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gc_worker: Skip TiFlash nodes when doing UnsafeDestroyRange and Green GC #15505

Merged
merged 3 commits into from
Mar 20, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (w *GCWorker) getGCConcurrency(ctx context.Context) (int, error) {
return w.loadGCConcurrencyWithDefault()
}

stores, err := w.getUpStores(ctx)
stores, err := w.getUpStoresForGC(ctx)
concurrency := len(stores)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] failed to get up stores to calculate concurrency. use config.",
Expand Down Expand Up @@ -669,7 +669,7 @@ func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64, concu

func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte, concurrency int) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := w.getUpStores(ctx)
stores, err := w.getUpStoresForGC(ctx)
if err != nil {
logutil.Logger(ctx).Error("[gc worker] delete ranges: got an error while trying to get store list from PD",
zap.String("uuid", w.uuid),
Expand Down Expand Up @@ -736,23 +736,70 @@ func (w *GCWorker) doUnsafeDestroyRangeRequest(ctx context.Context, startKey []b
return nil
}

func (w *GCWorker) getUpStores(ctx context.Context) ([]*metapb.Store, error) {
const (
engineLabelKey = "engine"
engineLabelTiFlash = "tiflash"
engineLabelTiKV = "tikv"
)

// needsGCOperationForStore checks if the store-level requests related to GC needs to be sent to the store. The store-level
// requests includes UnsafeDestroyRange, PhysicalScanLock, etc.
func needsGCOperationForStore(store *metapb.Store) (bool, error) {
engineLabel := ""

for _, label := range store.GetLabels() {
if label.GetKey() == engineLabelKey {
engineLabel = label.GetValue()
break
}
}

switch engineLabel {
case engineLabelTiFlash:
// For a TiFlash node, it uses other approach to delete dropped tables, so it's safe to skip sending
// UnsafeDestroyRange requests; it has only learner peers and their data must exist in TiKV, so it's safe to
// skip physical resolve locks for it.
return false, nil

case "":
// If no engine label is set, it should be a TiKV node.
fallthrough
case engineLabelTiKV:
return true, nil

default:
return true, errors.Errorf("unsupported store engine \"%v\" with storeID %v, addr %v",
engineLabel,
store.GetId(),
store.GetAddress())
}
}

// getUpStoresForGC gets the list of stores that needs to be processed during GC.
func (w *GCWorker) getUpStoresForGC(ctx context.Context) ([]*metapb.Store, error) {
stores, err := w.pdClient.GetAllStores(ctx)
if err != nil {
return nil, errors.Trace(err)
}

upStores := make([]*metapb.Store, 0, len(stores))
for _, store := range stores {
if store.State == metapb.StoreState_Up {
if store.State != metapb.StoreState_Up {
continue
}
needsGCOp, err := needsGCOperationForStore(store)
if err != nil {
return nil, errors.Trace(err)
}
if needsGCOp {
upStores = append(upStores, store)
}
}
return upStores, nil
}

func (w *GCWorker) getUpStoresMap(ctx context.Context) (map[uint64]*metapb.Store, error) {
stores, err := w.getUpStores(ctx)
func (w *GCWorker) getUpStoresMapForGC(ctx context.Context) (map[uint64]*metapb.Store, error) {
stores, err := w.getUpStoresForGC(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -990,7 +1037,7 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
zap.Uint64("safePoint", safePoint))
startTime := time.Now()

stores, err := w.getUpStoresMap(ctx)
stores, err := w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -1008,7 +1055,7 @@ func (w *GCWorker) resolveLocksPhysical(ctx context.Context, safePoint uint64) e
return errors.Trace(err)
}

stores, err = w.getUpStoresMap(ctx)
stores, err = w.getUpStoresMapForGC(ctx)
if err != nil {
return errors.Trace(err)
}
Expand Down
34 changes: 32 additions & 2 deletions store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,36 @@ func (s *testGCWorkerSuite) TestCheckScanLockMode(c *C) {
c.Assert(usePhysical, Equals, false)
}

func (s *testGCWorkerSuite) TestNeedsGCOperationForStore(c *C) {
newStore := func(hasEngineLabel bool, engineLabel string) *metapb.Store {
store := &metapb.Store{}
if hasEngineLabel {
store.Labels = []*metapb.StoreLabel{{Key: engineLabelKey, Value: engineLabel}}
}
return store
}

// TiKV needs to do the store-level GC operations.
res, err := needsGCOperationForStore(newStore(false, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, ""))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)
res, err = needsGCOperationForStore(newStore(true, engineLabelTiKV))
c.Assert(err, IsNil)
c.Assert(res, IsTrue)

// TiFlash does not need these operations.
res, err = needsGCOperationForStore(newStore(true, engineLabelTiFlash))
c.Assert(err, IsNil)
c.Assert(res, IsFalse)

// Throw an error for unknown store types.
_, err = needsGCOperationForStore(newStore(true, "invalid"))
c.Assert(err, NotNil)
}

const (
failRPCErr = 0
failNilResp = 1
Expand Down Expand Up @@ -508,7 +538,7 @@ func (s *testGCWorkerSuite) testDeleteRangesFailureImpl(c *C, failType int) {
c.Assert(err, IsNil)
c.Assert(preparedRanges, DeepEquals, ranges)

stores, err := s.gcWorker.getUpStores(context.Background())
stores, err := s.gcWorker.getUpStoresForGC(context.Background())
c.Assert(err, IsNil)
c.Assert(len(stores), Equals, 3)

Expand Down Expand Up @@ -869,7 +899,7 @@ func (s *testGCWorkerSuite) makeMergedMockClient(c *C, count int) (*mergeLockSca

const scanLockLimit = 3

storesMap, err := s.gcWorker.getUpStoresMap(context.Background())
storesMap, err := s.gcWorker.getUpStoresMapForGC(context.Background())
c.Assert(err, IsNil)
scanner := newMergeLockScanner(100000, s.client, storesMap)
scanner.scanLockLimit = scanLockLimit
Expand Down