diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index 2121d84994c..991a3ff69f4 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -45,6 +45,7 @@ type FeedState string // All FeedStates const ( StateNormal FeedState = "normal" + StateError FeedState = "error" StateFailed FeedState = "failed" StateStopped FeedState = "stopped" StateRemoved FeedState = "removed" diff --git a/cdc/owner/gc_manager.go b/cdc/owner/gc_manager.go new file mode 100644 index 00000000000..47146fad12c --- /dev/null +++ b/cdc/owner/gc_manager.go @@ -0,0 +1,128 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "math" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/pkg/config" + cdcContext "github.com/pingcap/ticdc/pkg/context" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/tidb/store/tikv/oracle" + "go.uber.org/zap" +) + +const ( + // cdcServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint. + cdcServiceSafePointID = "ticdc" + pdTimeUpdateInterval = 10 * time.Minute +) + +// gcSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint +var gcSafepointUpdateInterval = 1 * time.Minute + +type gcManager struct { + gcTTL int64 + + lastUpdatedTime time.Time + lastSucceededTime time.Time + lastSafePointTs uint64 + + pdPhysicalTimeCache time.Time + lastUpdatedPdTime time.Time +} + +func newGCManager() *gcManager { + serverConfig := config.GetGlobalServerConfig() + failpoint.Inject("InjectGcSafepointUpdateInterval", func(val failpoint.Value) { + gcSafepointUpdateInterval = time.Duration(val.(int) * int(time.Millisecond)) + }) + return &gcManager{ + gcTTL: serverConfig.GcTTL, + } +} + +func (m *gcManager) updateGCSafePoint(ctx cdcContext.Context, state *model.GlobalReactorState) error { + if time.Since(m.lastUpdatedTime) < gcSafepointUpdateInterval { + return nil + } + minCheckpointTs := uint64(math.MaxUint64) + for _, cfState := range state.Changefeeds { + if cfState.Info == nil { + continue + } + switch cfState.Info.State { + case model.StateNormal, model.StateStopped, model.StateError: + default: + continue + } + checkpointTs := cfState.Info.GetCheckpointTs(cfState.Status) + if minCheckpointTs > checkpointTs { + minCheckpointTs = checkpointTs + } + } + if minCheckpointTs == math.MaxUint64 { + return nil + } + m.lastUpdatedTime = time.Now() + + actual, err := ctx.GlobalVars().PDClient.UpdateServiceGCSafePoint(ctx, cdcServiceSafePointID, m.gcTTL, minCheckpointTs) + if err != nil { + log.Warn("updateGCSafePoint failed", + zap.Uint64("safePointTs", minCheckpointTs), + zap.Error(err)) + if time.Since(m.lastSucceededTime) >= time.Second*time.Duration(m.gcTTL) { + return cerror.ErrUpdateServiceSafepointFailed.Wrap(err) + } + return nil + } + failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) { + actual = uint64(val.(int)) + }) + if actual > minCheckpointTs { + log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs", zap.Uint64("actual", actual), zap.Uint64("checkpointTs", minCheckpointTs)) + } + m.lastSafePointTs = actual + m.lastSucceededTime = time.Now() + return nil +} + +func (m *gcManager) currentTimeFromPDCached(ctx cdcContext.Context) (time.Time, error) { + if time.Since(m.lastUpdatedPdTime) <= pdTimeUpdateInterval { + return m.pdPhysicalTimeCache, nil + } + physical, logical, err := ctx.GlobalVars().PDClient.GetTS(ctx) + if err != nil { + return time.Now(), errors.Trace(err) + } + m.pdPhysicalTimeCache = oracle.GetTimeFromTS(oracle.ComposeTS(physical, logical)) + m.lastUpdatedPdTime = time.Now() + return m.pdPhysicalTimeCache, nil +} + +func (m *gcManager) CheckStaleCheckpointTs(ctx cdcContext.Context, checkpointTs model.Ts) error { + pdTime, err := m.currentTimeFromPDCached(ctx) + if err != nil { + return errors.Trace(err) + } + if checkpointTs < m.lastSafePointTs || pdTime.Sub(oracle.GetTimeFromTS(checkpointTs)) > time.Duration(m.gcTTL)*time.Second { + return cerror.ErrSnapshotLostByGC.GenWithStackByArgs(checkpointTs, m.lastSafePointTs) + } + return nil +} diff --git a/cdc/owner/gc_manager_test.go b/cdc/owner/gc_manager_test.go new file mode 100644 index 00000000000..0c503a0e92a --- /dev/null +++ b/cdc/owner/gc_manager_test.go @@ -0,0 +1,164 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "fmt" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/errors" + "github.com/pingcap/ticdc/cdc/model" + cdcContext "github.com/pingcap/ticdc/pkg/context" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/pingcap/ticdc/pkg/orchestrator" + "github.com/pingcap/ticdc/pkg/util/testleak" + "github.com/pingcap/tidb/store/tikv/oracle" + pd "github.com/tikv/pd/client" +) + +var _ = check.Suite(&gcManagerSuite{}) + +type gcManagerSuite struct { +} + +type mockPDClient struct { + pd.Client + updateServiceGCSafePointFunc func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) +} + +func (m *mockPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + return m.updateServiceGCSafePointFunc(ctx, serviceID, ttl, safePoint) +} + +func (m *mockPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return oracle.GetPhysical(time.Now()), 0, nil +} + +func (s *gcManagerSuite) TestUpdateGCSafePoint(c *check.C) { + defer testleak.AfterTest(c)() + gcManager := newGCManager() + ctx := cdcContext.NewBackendContext4Test(true) + mockPDClient := &mockPDClient{} + ctx.GlobalVars().PDClient = mockPDClient + state := model.NewGlobalState().(*model.GlobalReactorState) + tester := orchestrator.NewReactorStateTester(c, state, nil) + + // no changefeed, the gc safe point should not be updated + mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + c.Errorf("should not update gc safe point") + return 0, nil + } + err := gcManager.updateGCSafePoint(ctx, state) + c.Assert(err, check.IsNil) + // add a stopped changefeed + changefeedID1 := "changefeed-test1" + changefeedID2 := "changefeed-test2" + tester.MustUpdate(fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID1), []byte(`{"config":{"cyclic-replication":{}},"state":"failed"}`)) + tester.MustApplyPatches() + state.Changefeeds[changefeedID1].PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + return &model.ChangeFeedStatus{CheckpointTs: 1}, true, nil + }) + tester.MustApplyPatches() + err = gcManager.updateGCSafePoint(ctx, state) + c.Assert(err, check.IsNil) + + // switch the state of changefeed to normal + state.Changefeeds[changefeedID1].PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + info.State = model.StateNormal + return info, true, nil + }) + tester.MustApplyPatches() + // the gc safe point should be updated to 1(checkpoint Ts of changefeed-test1) + mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + c.Assert(serviceID, check.Equals, cdcServiceSafePointID) + c.Assert(ttl, check.Equals, gcManager.gcTTL) + c.Assert(safePoint, check.Equals, uint64(1)) + return 0, nil + } + err = gcManager.updateGCSafePoint(ctx, state) + c.Assert(err, check.IsNil) + + // add another changefeed + tester.MustUpdate(fmt.Sprintf("/tidb/cdc/changefeed/info/%s", changefeedID2), []byte(`{"config":{"cyclic-replication":{}},"state":"normal"}`)) + tester.MustApplyPatches() + state.Changefeeds[changefeedID1].PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + return &model.ChangeFeedStatus{CheckpointTs: 20}, true, nil + }) + state.Changefeeds[changefeedID2].PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + return &model.ChangeFeedStatus{CheckpointTs: 30}, true, nil + }) + tester.MustApplyPatches() + // the gc safe point should not be updated, because it was recently updated + mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + c.Errorf("should not update gc safe point") + return 0, nil + } + err = gcManager.updateGCSafePoint(ctx, state) + c.Assert(err, check.IsNil) + + // assume that the gc safe point updated one hour ago + gcManager.lastUpdatedTime = time.Now().Add(-time.Hour) + + // the gc safe point should be updated to 1(checkpoint Ts of changefeed-test1) + mockPDClient.updateServiceGCSafePointFunc = func(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { + c.Assert(serviceID, check.Equals, cdcServiceSafePointID) + c.Assert(ttl, check.Equals, gcManager.gcTTL) + c.Assert(safePoint, check.Equals, uint64(20)) + return 0, nil + } + err = gcManager.updateGCSafePoint(ctx, state) + c.Assert(err, check.IsNil) +} + +func (s *gcManagerSuite) TestTimeFromPD(c *check.C) { + defer testleak.AfterTest(c)() + gcManager := newGCManager() + ctx := cdcContext.NewBackendContext4Test(true) + mockPDClient := &mockPDClient{} + ctx.GlobalVars().PDClient = mockPDClient + t1, err := gcManager.currentTimeFromPDCached(ctx) + c.Assert(err, check.IsNil) + c.Assert(t1, check.Equals, gcManager.pdPhysicalTimeCache) + + time.Sleep(50 * time.Millisecond) + // should return cached time + t2, err := gcManager.currentTimeFromPDCached(ctx) + c.Assert(err, check.IsNil) + c.Assert(t2, check.Equals, gcManager.pdPhysicalTimeCache) + c.Assert(t2, check.Equals, t1) + + time.Sleep(50 * time.Millisecond) + // assume that the gc safe point updated one hour ago + gcManager.lastUpdatedPdTime = time.Now().Add(-time.Hour) + t3, err := gcManager.currentTimeFromPDCached(ctx) + c.Assert(err, check.IsNil) + c.Assert(t3, check.Equals, gcManager.pdPhysicalTimeCache) + // should return new time + c.Assert(t3, check.Not(check.Equals), t2) +} + +func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) { + defer testleak.AfterTest(c)() + gcManager := newGCManager() + ctx := cdcContext.NewBackendContext4Test(true) + mockPDClient := &mockPDClient{} + ctx.GlobalVars().PDClient = mockPDClient + err := gcManager.CheckStaleCheckpointTs(ctx, 10) + c.Assert(cerror.ErrSnapshotLostByGC.Equal(errors.Cause(err)), check.IsTrue) + + err = gcManager.CheckStaleCheckpointTs(ctx, oracle.GoTimeToTS(time.Now())) + c.Assert(err, check.IsNil) +} diff --git a/tests/_utils/start_tidb_cluster_impl b/tests/_utils/start_tidb_cluster_impl index 96981462241..984cb5c6071 100755 --- a/tests/_utils/start_tidb_cluster_impl +++ b/tests/_utils/start_tidb_cluster_impl @@ -276,7 +276,7 @@ runAsDaemon = true kvstore_path = "${OUT_DIR}/tiflash/kvstore" pd_addr = "${UP_PD_HOST_1}:${UP_PD_PORT_1}" ignore_databases = "system,default" -storage_engine = "tmt" +storage_engine = "dt" EOF cat - >"$OUT_DIR/tiflash-proxy.toml" <