Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporary fix for ErrSnapshotSchemaNotFound and ErrSchemaStorageGCed (#1069) #1114

Merged
merged 1 commit into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,13 +525,15 @@ func (c *changeFeed) handleMoveTableJobs(ctx context.Context, captures map[model
case model.MoveTableStatusDeleted:
// add table to target capture
status, exist := cloneStatus(job.To)
replicaInfo := job.TableReplicaInfo.Clone()
replicaInfo.StartTs = c.status.CheckpointTs
if !exist {
// the target capture is not exist, add table to orphanTables.
c.orphanTables[tableID] = job.TableReplicaInfo.StartTs
c.orphanTables[tableID] = replicaInfo.StartTs
log.Warn("the target capture is not exist, sent the table to orphanTables", zap.Reflect("job", job))
continue
}
status.AddTable(tableID, job.TableReplicaInfo, job.TableReplicaInfo.StartTs)
status.AddTable(tableID, replicaInfo, c.status.CheckpointTs)
job.Status = model.MoveTableStatusFinished
delete(c.moveTableJobs, tableID)
log.Info("handle the move job, add table to the target capture", zap.Reflect("job", job))
Expand Down
5 changes: 5 additions & 0 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,11 @@ func (o *Owner) cleanUpStaleTasks(ctx context.Context, captures []*model.Capture
captureIDs[captureID] = struct{}{}
}

log.Debug("cleanUpStaleTasks",
zap.Reflect("statuses", statuses),
zap.Reflect("positions", positions),
zap.Reflect("workloads", workloads))

for captureID := range captureIDs {
if _, ok := active[captureID]; !ok {
status, ok1 := statuses[captureID]
Expand Down
20 changes: 19 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const (
defaultMemBufferCapacity int64 = 10 * 1024 * 1024 * 1024 // 10G

defaultSyncResolvedBatch = 1024

schemaStorageGCLag = time.Minute * 20
)

var (
Expand All @@ -86,6 +88,7 @@ type processor struct {
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
globalcheckpointTs uint64
flushCheckpointInterval time.Duration

ddlPuller puller.Puller
Expand Down Expand Up @@ -664,12 +667,17 @@ func (p *processor) globalStatusWorker(ctx context.Context) error {
defer globalResolvedTsNotifier.Close()

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
if lastResolvedTs == changefeedStatus.ResolvedTs &&
lastCheckPointTs == changefeedStatus.CheckpointTs {
return
}
if lastCheckPointTs < changefeedStatus.CheckpointTs {
p.schemaStorage.DoGC(changefeedStatus.CheckpointTs)
// Delay GC to accommodate pullers starting from a startTs that's too small
// TODO fix startTs problem and remove GC delay, or use other mechanism that prevents the problem deterministically
gcTime := oracle.GetTimeFromTS(changefeedStatus.CheckpointTs).Add(-schemaStorageGCLag)
gcTs := oracle.ComposeTS(gcTime.Unix(), 0)
p.schemaStorage.DoGC(gcTs)
lastCheckPointTs = changefeedStatus.CheckpointTs
}
if lastResolvedTs < changefeedStatus.ResolvedTs {
Expand Down Expand Up @@ -937,6 +945,16 @@ func (p *processor) addTable(ctx context.Context, tableID int64, replicaInfo *mo
return
}
}

globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
log.Warn("addTable: startTs < checkpoint",
zap.Int64("tableID", tableID),
zap.Uint64("checkpoint", globalcheckpointTs),
zap.Uint64("startTs", replicaInfo.StartTs))
}

globalResolvedTs := atomic.LoadUint64(&p.sinkEmittedResolvedTs)
log.Debug("Add table", zap.Int64("tableID", tableID),
zap.String("name", tableName),
Expand Down
4 changes: 3 additions & 1 deletion pkg/notify/notify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r1 := notifier.NewReceiver(-1)
r2 := notifier.NewReceiver(-1)
r3 := notifier.NewReceiver(-1)
finishedCh := make(chan struct{})
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
notifier.Notify()
}
close(finishedCh)
}()
<-r1.C
r1.Stop()
Expand All @@ -50,7 +52,6 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r2.Stop()
r3.Stop()
c.Assert(len(notifier.receivers), check.Equals, 0)
time.Sleep(time.Second)
r4 := notifier.NewReceiver(-1)
<-r4.C
r4.Stop()
Expand All @@ -59,6 +60,7 @@ func (s *notifySuite) TestNotifyHub(c *check.C) {
r5 := notifier2.NewReceiver(10 * time.Millisecond)
<-r5.C
r5.Stop()
<-finishedCh // To make the leak checker happy
}

func (s *notifySuite) TestContinusStop(c *check.C) {
Expand Down
27 changes: 27 additions & 0 deletions tests/move_table/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# diff Configuration.

log-level = "info"
chunk-size = 10
check-thread-count = 4
sample-percent = 100
use-rowid = false
use-checksum = true
fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "move_table"
tables = ["~usertable.*"]

[[source-db]]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
instance-id = "source-1"

[target-db]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
13 changes: 13 additions & 0 deletions tests/move_table/conf/workload
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
threadcount=10
recordcount=60000
operationcount=0
workload=core

readallfields=true

readproportion=0
updateproportion=0
scanproportion=0
insertproportion=0

requestdistribution=uniform
Loading