Skip to content

Commit

Permalink
relay: fix GTID recover after relay log recovered (pingcap#335)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Oct 28, 2019
1 parent 8b2f08e commit c6344f3
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 4 deletions.
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ ErrTracingDataChecksum,[code=11101:class=functional:scope=internal:level=high],"
ErrTracingGetTSO,[code=11102:class=functional:scope=internal:level=high],"get tso"
ErrBackoffArgsNotValid,[code=11103:class=functional:scope=internal:level=medium],"backoff argument %s value %v not valid"
ErrInitLoggerFail,[code=11104:class=functional:scope=internal:level=medium],"init logger failed"
ErrGTIDTruncateInvalid,[code=11105:class=functional:scope=internal:level=high],"truncate GTID sets %v to %v not valid"
ErrConfigCheckItemNotSupport,[code=20001:class=config:scope=internal:level=medium],"checking item %s is not supported\n%s"
ErrConfigTomlTransform,[code=20002:class=config:scope=internal:level=medium],"%s"
ErrConfigTaskYamlTransform,[code=20003:class=config:scope=internal:level=medium],"%s"
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var (
// user var name used in dummy USER_VAR_EVENT
dummyUserVarName = []byte("!dummyvar")
// dummy (commented) query in a QueryEvent
dummyQuery = []byte("# dummy query, often used to fill a hole in a binlog file")
dummyQuery = []byte("# dummy query generated by DM, often used to fill a hole in a binlog file")
)

// GenEventHeader generates a EventHeader's raw data according to a passed-in EventHeader struct.
Expand Down
62 changes: 62 additions & 0 deletions pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type Set interface {
Equal(other Set) bool
Contain(other Set) bool

// Truncate truncates the current GTID sets until the `end` in-place.
// NOTE: the original GTID sets should contain the end GTID sets, otherwise it's invalid.
// like truncating `00c04543-f584-11e9-a765-0242ac120002:1-100` with `00c04543-f584-11e9-a765-0242ac120002:40-60`
// should become `00c04543-f584-11e9-a765-0242ac120002:1-60`.
Truncate(end Set) error

String() string
}

Expand Down Expand Up @@ -173,6 +179,36 @@ func (g *mySQLGTIDSet) Contain(other Set) bool {
return g.set.Contain(other.Origin())
}

func (g *mySQLGTIDSet) Truncate(end Set) error {
if end == nil {
return nil // do nothing
}
if !g.Contain(end) {
return terror.ErrGTIDTruncateInvalid.Generate(g, end)
}
endGs := end.(*mySQLGTIDSet) // already verify the type is `*mySQLGTIDSet` in `Contain`.
if endGs == nil {
return nil // do nothing
}

for sid, setG := range g.set.Sets {
setE, ok := endGs.set.Sets[sid]
if !ok {
continue // no need to truncate for this SID
}
for i, interG := range setG.Intervals {
for _, interE := range setE.Intervals {
if interG.Start <= interE.Start && interG.Stop >= interE.Stop {
interG.Stop = interE.Stop // truncate the stop
}
}
setG.Intervals[i] = interG // overwrite the value (because it's not a pointer)
}
}

return nil
}

func (g *mySQLGTIDSet) String() string {
if g.set == nil {
return ""
Expand Down Expand Up @@ -288,6 +324,32 @@ func (m *mariadbGTIDSet) Contain(other Set) bool {
return m.set.Contain(other.Origin())
}

func (m *mariadbGTIDSet) Truncate(end Set) error {
if end == nil {
return nil // do nothing
}
if !m.Contain(end) {
return terror.ErrGTIDTruncateInvalid.Generate(m, end)
}
endGs := end.(*mariadbGTIDSet) // already verify the type is `*mariadbGTIDSet` in `Contain`.
if endGs == nil {
return nil // do nothing
}

for did, mGTID := range m.set.Sets {
eGTID, ok := endGs.set.Sets[did]
if !ok {
continue // no need to truncate for this domain ID
}
if mGTID.SequenceNumber > eGTID.SequenceNumber {
mGTID.SequenceNumber = eGTID.SequenceNumber // truncate the seqNO
mGTID.ServerID = eGTID.ServerID // also update server-id to match the seqNO
}
}

return nil
}

func (m *mariadbGTIDSet) String() string {
if m.set == nil {
return ""
Expand Down
189 changes: 189 additions & 0 deletions pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"testing"

. "github.com/pingcap/check"

"github.com/pingcap/dm/pkg/terror"
)

var _ = Suite(&testGTIDSuite{})
Expand Down Expand Up @@ -178,3 +180,190 @@ func (s *testGTIDSuite) TestMairaGTIDContain(c *C) {
c.Assert(g1.Contain(g2), IsFalse)
c.Assert(g2.Contain(g1), IsFalse)
}

func (s *testGTIDSuite) TestMySQLGTIDTruncate(c *C) {
var (
flavor = "mysql"
g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100")
g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100")
gNil *mySQLGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
gMariaDBNil *mariadbGTIDSet
)
// truncate to nil or empty GTID sets has no effect
c.Assert(g1.Truncate(nil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gNil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gEmpty), IsNil)
c.Assert(g1, DeepEquals, g2)

// nil truncate to nil has no effect
c.Assert(gNil.Truncate(nil), IsNil)
c.Assert(gNil.Truncate(gNil), IsNil)

// nil truncate to not nil report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(gNil.Truncate(g1)), IsTrue)

// truncate with invalid MySQL GTID sets report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(g1.Truncate(gMariaDBNil)), IsTrue)

cases := []struct {
before string
end string
after string
hasError bool
}{
// before not contain end
{
before: "00c04543-f584-11e9-a765-0242ac120002:100",
end: "00c04543-f584-11e9-a765-0242ac120002:99",
hasError: true,
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60",
end: "00c04543-f584-11e9-a765-0242ac120002:50-70",
hasError: true,
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60",
end: "00c04543-f584-11e9-a765-0242ac120002:30-50",
hasError: true,
},
// truncate take effect
{
before: "00c04543-f584-11e9-a765-0242ac120002:100",
end: "00c04543-f584-11e9-a765-0242ac120002:100",
after: "00c04543-f584-11e9-a765-0242ac120002:100",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60:70:80-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55:85-95",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55:70:80-95",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60:70:80-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55:70:85-95",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55:70:80-95",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100",
},
{
before: "00c04543-f584-11e9-a765-0242ac120002:40-60,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100",
end: "00c04543-f584-11e9-a765-0242ac120002:45-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-80",
after: "00c04543-f584-11e9-a765-0242ac120002:40-55,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-80",
},
}

for _, cs := range cases {
bg, err := ParserGTID(flavor, cs.before)
c.Assert(err, IsNil)
eg, err := ParserGTID(flavor, cs.end)
c.Assert(err, IsNil)
ag, err := ParserGTID(flavor, cs.after)
c.Assert(err, IsNil)
err = bg.Truncate(eg)
if cs.hasError {
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(err), IsTrue)
} else {
c.Assert(bg, DeepEquals, ag)
}
}
}

func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) {
var (
flavor = "mariadb"
g1, _ = ParserGTID(flavor, "1-2-3")
g2, _ = ParserGTID(flavor, "1-2-3")
gNil *mariadbGTIDSet
gEmpty, _ = ParserGTID(flavor, "")
gMySQLNil *mySQLGTIDSet
)
// truncate to nil or empty GTID sets has no effect
c.Assert(g1.Truncate(nil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gNil), IsNil)
c.Assert(g1, DeepEquals, g2)
c.Assert(g1.Truncate(gEmpty), IsNil)
c.Assert(g1, DeepEquals, g2)

// nil truncate to nil has no effect
c.Assert(gNil.Truncate(nil), IsNil)
c.Assert(gNil.Truncate(gNil), IsNil)

// nil truncate to not nil report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(gNil.Truncate(g1)), IsTrue)

// truncate with invalid MariaDB GTID sets report an error
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(g1.Truncate(gMySQLNil)), IsTrue)

cases := []struct {
before string
end string
after string
hasError bool
}{
// before not contain end
{
before: "1-2-3",
end: "2-2-3",
hasError: true,
},
{
before: "1-2-3",
end: "1-2-4",
hasError: true,
},

// truncate take effect
{
before: "1-2-3",
end: "1-2-3",
after: "1-2-3",
},
{
before: "1-2-10",
end: "1-2-8",
after: "1-2-8",
},
{
before: "1-2-10",
end: "1-3-8",
after: "1-3-8",
},
{
before: "1-2-10,2-2-10",
end: "1-2-8",
after: "1-2-8,2-2-10",
},
{
before: "1-2-10,2-2-10",
end: "1-3-8,2-2-6",
after: "1-3-8,2-2-6",
},
}

for _, cs := range cases {
bg, err := ParserGTID(flavor, cs.before)
c.Assert(err, IsNil)
eg, err := ParserGTID(flavor, cs.end)
c.Assert(err, IsNil)
ag, err := ParserGTID(flavor, cs.after)
c.Assert(err, IsNil)
err = bg.Truncate(eg)
if cs.hasError {
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(err), IsTrue)
} else {
c.Assert(bg, DeepEquals, ag)
}
}
}
5 changes: 5 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ const (
codeBackoffArgsNotValid

codeInitLoggerFail

// pkg/gtid
codeGTIDTruncateInvalid
)

// Config related error code list
Expand Down Expand Up @@ -589,6 +592,8 @@ var (
ErrBackoffArgsNotValid = New(codeBackoffArgsNotValid, ClassFunctional, ScopeInternal, LevelMedium, "backoff argument %s value %v not valid")
// pkg
ErrInitLoggerFail = New(codeInitLoggerFail, ClassFunctional, ScopeInternal, LevelMedium, "init logger failed")
// pkg/gtid
ErrGTIDTruncateInvalid = New(codeGTIDTruncateInvalid, ClassFunctional, ScopeInternal, LevelHigh, "truncate GTID sets %v to %v not valid")

// Config related error
ErrConfigCheckItemNotSupport = New(codeConfigCheckItemNotSupport, ClassConfig, ScopeInternal, LevelMedium, "checking item %s is not supported\n%s")
Expand Down
10 changes: 9 additions & 1 deletion relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,10 @@ func (r *Relay) tryRecoverLatestFile(parser2 *parser.Parser) error {
if result.Recovered {
r.tctx.L().Warn("relay log file recovered",
zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs))
err = r.meta.Save(result.LatestPos, result.LatestGTIDs)
if err = latestGTID.Truncate(result.LatestGTIDs); err != nil {
return err
}
err = r.meta.Save(result.LatestPos, latestGTID)
if err != nil {
return terror.Annotatef(err, "save position %s, GTID sets %v after recovered", result.LatestPos, result.LatestGTIDs)
}
Expand Down Expand Up @@ -387,6 +390,11 @@ func (r *Relay) handleEvents(ctx context.Context, reader2 reader.Reader, transfo
cfg := r.cfg.From
r.tctx.L().Error("the requested binlog files have purged in the master server or the master server have switched, currently DM do no support to handle this error",
zap.String("db host", cfg.Host), zap.Int("db port", cfg.Port), log.ShortError(err))
// log the status for debug
pos, gs, err2 := utils.GetMasterStatus(r.db, r.cfg.Flavor)
if err2 == nil {
r.tctx.L().Info("current master status", zap.Stringer("position", pos), log.WrapStringerField("GTID sets", gs))
}
}
binlogReadErrorCounter.Inc()
}
Expand Down
24 changes: 22 additions & 2 deletions relay/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/streamer"
"github.com/pingcap/dm/pkg/terror"
"github.com/pingcap/dm/pkg/utils"
"github.com/pingcap/dm/relay/reader"
"github.com/pingcap/dm/relay/retry"
Expand Down Expand Up @@ -140,6 +141,8 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14"
latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495"
genGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456"
filename = "mysql-bin.000001"
startPos = gmysql.Position{Name: filename, Pos: 123}

Expand Down Expand Up @@ -182,16 +185,33 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) {
// write some invalid data into the relay log file
f, err := os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
defer f.Close()
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// GTID sets in meta data does not contain the GTID sets in relay log, invalid
c.Assert(terror.ErrGTIDTruncateInvalid.Equal(r.tryRecoverLatestFile(parser2)), IsTrue)

// write some invalid data into the relay log file again
f, err = os.OpenFile(filepath.Join(r.meta.Dir(), filename), os.O_WRONLY|os.O_APPEND, 0600)
c.Assert(err, IsNil)
_, err = f.Write([]byte("invalid event data"))
c.Assert(err, IsNil)
f.Close()

// write a greater GTID sets in meta
greaterGITDSet, err := gtid.ParserGTID(relayCfg.Flavor, greaterGITDSetStr)
c.Assert(err, IsNil)
c.Assert(r.meta.Save(startPos, greaterGITDSet), IsNil)

// invalid data truncated, meta updated
c.Assert(r.tryRecoverLatestFile(parser2), IsNil)
_, latestPos := r.meta.Pos()
c.Assert(latestPos, DeepEquals, gmysql.Position{Name: filename, Pos: g.LatestPos})
_, latestGTIDs := r.meta.GTID()
c.Assert(latestGTIDs.Contain(g.LatestGTID), IsTrue) // verifyMetadata is not enough
genGTIDSet, err := gtid.ParserGTID(relayCfg.Flavor, genGTIDSetStr)
c.Assert(err, IsNil)
c.Assert(latestGTIDs.Equal(genGTIDSet), IsTrue) // verifyMetadata is not enough

// no relay log file need to recover
c.Assert(r.meta.Save(minCheckpoint, latestGTIDs), IsNil)
Expand Down

0 comments on commit c6344f3

Please sign in to comment.