diff --git a/_utils/terror_gen/errors_release.txt b/_utils/terror_gen/errors_release.txt index 896a295340..ac8601f824 100644 --- a/_utils/terror_gen/errors_release.txt +++ b/_utils/terror_gen/errors_release.txt @@ -108,6 +108,7 @@ ErrTracingDataChecksum,[code=11101:class=functional:scope=internal:level=high]," ErrTracingGetTSO,[code=11102:class=functional:scope=internal:level=high],"get tso" ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium],"backoff argument %s value %v not valid" ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed" +ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid" ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s" ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s" ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s" diff --git a/pkg/binlog/event/event.go b/pkg/binlog/event/event.go index c21dd70642..23278c6634 100644 --- a/pkg/binlog/event/event.go +++ b/pkg/binlog/event/event.go @@ -65,7 +65,7 @@ var ( // user var name used in dummy USER_VAR_EVENT dummyUserVarName = []byte("!dummyvar") // dummy (commented) query in a QueryEvent - dummyQuery = []byte("# dummy query, often used to fill a hole in a binlog file") + dummyQuery = []byte("# dummy query generated by DM, often used to fill a hole in a binlog file") ) // GenEventHeader generates a EventHeader's raw data according to a passed-in EventHeader struct. diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index 97fd7d1540..e7d133e784 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -35,6 +35,12 @@ type Set interface { Equal(other Set) bool Contain(other Set) bool + // Truncate truncates the current GTID sets until the `end` in-place. + // NOTE: the original GTID sets should contain the end GTID sets, otherwise it's invalid. + // like truncating `00c04543-f584-11e9-a765-0242ac120002:1-100` with `00c04543-f584-11e9-a765-0242ac120002:40-60` + // should become `00c04543-f584-11e9-a765-0242ac120002:1-60`. + Truncate(end Set) error + String() string } @@ -173,6 +179,36 @@ func (g *mySQLGTIDSet) Contain(other Set) bool { return g.set.Contain(other.Origin()) } +func (g *mySQLGTIDSet) Truncate(end Set) error { + if end == nil { + return nil // do nothing + } + if !g.Contain(end) { + return terror.ErrGTIDTruncateInvalid.Generate(g, end) + } + endGs := end.(*mySQLGTIDSet) // already verify the type is `*mySQLGTIDSet` in `Contain`. + if endGs == nil { + return nil // do nothing + } + + for sid, setG := range g.set.Sets { + setE, ok := endGs.set.Sets[sid] + if !ok { + continue // no need to truncate for this SID + } + for i, interG := range setG.Intervals { + for _, interE := range setE.Intervals { + if interG.Start <= interE.Start && interG.Stop >= interE.Stop { + interG.Stop = interE.Stop // truncate the stop + } + } + setG.Intervals[i] = interG // overwrite the value (because it's not a pointer) + } + } + + return nil +} + func (g *mySQLGTIDSet) String() string { if g.set == nil { return "" @@ -288,6 +324,32 @@ func (m *mariadbGTIDSet) Contain(other Set) bool { return m.set.Contain(other.Origin()) } +func (m *mariadbGTIDSet) Truncate(end Set) error { + if end == nil { + return nil // do nothing + } + if !m.Contain(end) { + return terror.ErrGTIDTruncateInvalid.Generate(m, end) + } + endGs := end.(*mariadbGTIDSet) // already verify the type is `*mariadbGTIDSet` in `Contain`. + if endGs == nil { + return nil // do nothing + } + + for did, mGTID := range m.set.Sets { + eGTID, ok := endGs.set.Sets[did] + if !ok { + continue // no need to truncate for this domain ID + } + if mGTID.SequenceNumber > eGTID.SequenceNumber { + mGTID.SequenceNumber = eGTID.SequenceNumber // truncate the seqNO + mGTID.ServerID = eGTID.ServerID // also update server-id to match the seqNO + } + } + + return nil +} + func (m *mariadbGTIDSet) String() string { if m.set == nil { return "" diff --git a/pkg/gtid/gtid_test.go b/pkg/gtid/gtid_test.go index 34dfb2ef03..e660c7dc0a 100644 --- a/pkg/gtid/gtid_test.go +++ b/pkg/gtid/gtid_test.go @@ -18,6 +18,8 @@ import ( "testing" . "github.com/pingcap/check" + + "github.com/pingcap/dm/pkg/terror" ) var _ = Suite(&testGTIDSuite{}) @@ -178,3 +180,190 @@ func (s *testGTIDSuite) TestMairaGTIDContain(c *C) { c.Assert(g1.Contain(g2), IsFalse) c.Assert(g2.Contain(g1), IsFalse) } + +func (s *testGTIDSuite) TestMySQLGTIDTruncate(c *C) { + var ( + flavor = "mysql" + g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100") + g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100") + gNil *mySQLGTIDSet + gEmpty, _ = ParserGTID(flavor, "") + gMariaDBNil *mariadbGTIDSet + ) + // truncate to nil or empty GTID sets has no effect + c.Assert(g1.Truncate(nil), IsNil) + c.Assert(g1, DeepEquals, g2) + c.Assert(g1.Truncate(gNil), IsNil) + c.Assert(g1, DeepEquals, g2) + c.Assert(g1.Truncate(gEmpty), IsNil) + c.Assert(g1, DeepEquals, g2) + + // nil truncate to nil has no effect + c.Assert(gNil.Truncate(nil), IsNil) + c.Assert(gNil.Truncate(gNil), IsNil) + + // nil truncate to not nil report an error + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(gNil.Truncate(g1)), IsTrue) + + // truncate with invalid MySQL GTID sets report an error + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(g1.Truncate(gMariaDBNil)), IsTrue) + + cases := []struct { + before string + end string + after string + hasError bool + }{ + // before not contain end + { + before: "00c04543-f584-11e9-a765-0242ac120002:100", + end: "00c04543-f584-11e9-a765-0242ac120002:99", + hasError: true, + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60", + end: "00c04543-f584-11e9-a765-0242ac120002:50-70", + hasError: true, + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60", + end: "00c04543-f584-11e9-a765-0242ac120002:30-50", + hasError: true, + }, + // truncate take effect + { + before: "00c04543-f584-11e9-a765-0242ac120002:100", + end: "00c04543-f584-11e9-a765-0242ac120002:100", + after: "00c04543-f584-11e9-a765-0242ac120002:100", + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60", + end: "00c04543-f584-11e9-a765-0242ac120002:45-55", + after: "00c04543-f584-11e9-a765-0242ac120002:40-55", + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60:70:80-100", + end: "00c04543-f584-11e9-a765-0242ac120002:45-55:85-95", + after: "00c04543-f584-11e9-a765-0242ac120002:40-55:70:80-95", + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60:70:80-100", + end: "00c04543-f584-11e9-a765-0242ac120002:45-55:70:85-95", + after: "00c04543-f584-11e9-a765-0242ac120002:40-55:70:80-95", + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100", + end: "00c04543-f584-11e9-a765-0242ac120002:45-55", + after: "00c04543-f584-11e9-a765-0242ac120002:40-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100", + }, + { + before: "00c04543-f584-11e9-a765-0242ac120002:40-60,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100", + end: "00c04543-f584-11e9-a765-0242ac120002:45-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-80", + after: "00c04543-f584-11e9-a765-0242ac120002:40-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-80", + }, + } + + for _, cs := range cases { + bg, err := ParserGTID(flavor, cs.before) + c.Assert(err, IsNil) + eg, err := ParserGTID(flavor, cs.end) + c.Assert(err, IsNil) + ag, err := ParserGTID(flavor, cs.after) + c.Assert(err, IsNil) + err = bg.Truncate(eg) + if cs.hasError { + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(err), IsTrue) + } else { + c.Assert(bg, DeepEquals, ag) + } + } +} + +func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) { + var ( + flavor = "mariadb" + g1, _ = ParserGTID(flavor, "1-2-3") + g2, _ = ParserGTID(flavor, "1-2-3") + gNil *mariadbGTIDSet + gEmpty, _ = ParserGTID(flavor, "") + gMySQLNil *mySQLGTIDSet + ) + // truncate to nil or empty GTID sets has no effect + c.Assert(g1.Truncate(nil), IsNil) + c.Assert(g1, DeepEquals, g2) + c.Assert(g1.Truncate(gNil), IsNil) + c.Assert(g1, DeepEquals, g2) + c.Assert(g1.Truncate(gEmpty), IsNil) + c.Assert(g1, DeepEquals, g2) + + // nil truncate to nil has no effect + c.Assert(gNil.Truncate(nil), IsNil) + c.Assert(gNil.Truncate(gNil), IsNil) + + // nil truncate to not nil report an error + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(gNil.Truncate(g1)), IsTrue) + + // truncate with invalid MariaDB GTID sets report an error + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(g1.Truncate(gMySQLNil)), IsTrue) + + cases := []struct { + before string + end string + after string + hasError bool + }{ + // before not contain end + { + before: "1-2-3", + end: "2-2-3", + hasError: true, + }, + { + before: "1-2-3", + end: "1-2-4", + hasError: true, + }, + + // truncate take effect + { + before: "1-2-3", + end: "1-2-3", + after: "1-2-3", + }, + { + before: "1-2-10", + end: "1-2-8", + after: "1-2-8", + }, + { + before: "1-2-10", + end: "1-3-8", + after: "1-3-8", + }, + { + before: "1-2-10,2-2-10", + end: "1-2-8", + after: "1-2-8,2-2-10", + }, + { + before: "1-2-10,2-2-10", + end: "1-3-8,2-2-6", + after: "1-3-8,2-2-6", + }, + } + + for _, cs := range cases { + bg, err := ParserGTID(flavor, cs.before) + c.Assert(err, IsNil) + eg, err := ParserGTID(flavor, cs.end) + c.Assert(err, IsNil) + ag, err := ParserGTID(flavor, cs.after) + c.Assert(err, IsNil) + err = bg.Truncate(eg) + if cs.hasError { + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(err), IsTrue) + } else { + c.Assert(bg, DeepEquals, ag) + } + } +} diff --git a/pkg/terror/error_list.go b/pkg/terror/error_list.go index 07c0551349..c8317f3515 100644 --- a/pkg/terror/error_list.go +++ b/pkg/terror/error_list.go @@ -135,6 +135,9 @@ const ( codeBackoffArgsNotValid codeInitLoggerFail + + // pkg/gtid + codeGTIDTruncateInvalid ) // Config related error code list @@ -589,6 +592,8 @@ var ( ErrBackoffArgsNotValid = New(codeBackoffArgsNotValid, ClassFunctional, ScopeInternal, LevelMedium, "backoff argument %s value %v not valid") // pkg ErrInitLoggerFail = New(codeInitLoggerFail, ClassFunctional, ScopeInternal, LevelMedium, "init logger failed") + // pkg/gtid + ErrGTIDTruncateInvalid = New(codeGTIDTruncateInvalid, ClassFunctional, ScopeInternal, LevelHigh, "truncate GTID sets %v to %v not valid") // Config related error ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s") diff --git a/relay/relay.go b/relay/relay.go index 7ba8250e5e..e53a49ca99 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -344,7 +344,10 @@ func (r *Relay) tryRecoverLatestFile(parser2 *parser.Parser) error { if result.Recovered { r.tctx.L().Warn("relay log file recovered", zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs)) - err = r.meta.Save(result.LatestPos, result.LatestGTIDs) + if err = latestGTID.Truncate(result.LatestGTIDs); err != nil { + return err + } + err = r.meta.Save(result.LatestPos, latestGTID) if err != nil { return terror.Annotatef(err, "save position %s, GTID sets %v after recovered", result.LatestPos, result.LatestGTIDs) } @@ -387,6 +390,11 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo cfg := r.cfg.From r.tctx.L().Error("the requested binlog files have purged in the master server or the master server have switched, currently DM do no support to handle this error", zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), log.ShortError(err)) + // log the status for debug + pos, gs, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor) + if err2 == nil { + r.tctx.L().Info("current master status", zap.Stringer("position", pos), log.WrapStringerField("GTID sets", gs)) + } } binlogReadErrorCounter.Inc() } diff --git a/relay/relay_test.go b/relay/relay_test.go index 00a40fe36e..6c5f6700f8 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/dm/pkg/binlog/event" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/streamer" + "github.com/pingcap/dm/pkg/terror" "github.com/pingcap/dm/pkg/utils" "github.com/pingcap/dm/relay/reader" "github.com/pingcap/dm/relay/retry" @@ -140,6 +141,8 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" + genGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" + greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" filename = "mysql-bin.000001" startPos = gmysql.Position{Name: filename, Pos: 123} @@ -182,16 +185,33 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { // write some invalid data into the relay log file f, err := os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600) c.Assert(err, IsNil) - defer f.Close() _, err = f.Write([]byte("invalid event data")) c.Assert(err, IsNil) + f.Close() + + // GTID sets in meta data does not contain the GTID sets in relay log, invalid + c.Assert(terror.ErrGTIDTruncateInvalid.Equal(r.tryRecoverLatestFile(parser2)), IsTrue) + + // write some invalid data into the relay log file again + f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600) + c.Assert(err, IsNil) + _, err = f.Write([]byte("invalid event data")) + c.Assert(err, IsNil) + f.Close() + + // write a greater GTID sets in meta + greaterGITDSet, err := gtid.ParserGTID(relayCfg.Flavor, greaterGITDSetStr) + c.Assert(err, IsNil) + c.Assert(r.meta.Save(startPos, greaterGITDSet), IsNil) // invalid data truncated, meta updated c.Assert(r.tryRecoverLatestFile(parser2), IsNil) _, latestPos := r.meta.Pos() c.Assert(latestPos, DeepEquals, gmysql.Position{Name: filename, Pos: g.LatestPos}) _, latestGTIDs := r.meta.GTID() - c.Assert(latestGTIDs.Contain(g.LatestGTID), IsTrue) // verifyMetadata is not enough + genGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, genGTIDSetStr) + c.Assert(err, IsNil) + c.Assert(latestGTIDs.Equal(genGTIDSet), IsTrue) // verifyMetadata is not enough // no relay log file need to recover c.Assert(r.meta.Save(minCheckpoint, latestGTIDs), IsNil)