Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay, syncer(dm): stricter GTID check when retry replication #3496

Merged
merged 28 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
cdfbdeb
dm/test: add a test to reproduce #3487
lance6716 Nov 17, 2021
9a4d2f0
Merge branch 'master' of github.com:pingcap/ticdc into reproduce
lance6716 Nov 24, 2021
d510385
some fix
lance6716 Nov 24, 2021
b473cfe
fix fmt
lance6716 Nov 25, 2021
fef1bae
fix another case
lance6716 Nov 25, 2021
d808fd2
adjust test
lance6716 Nov 25, 2021
a6c0802
Merge branch 'master' of github.com:pingcap/ticdc into reproduce
lance6716 Nov 29, 2021
81b5fb2
enable chaos test
lance6716 Nov 29, 2021
43a4074
fix ut
lance6716 Nov 29, 2021
36bcc52
Merge branch 'master' of github.com:pingcap/ticdc into reproduce
lance6716 Nov 29, 2021
1318423
fix test
lance6716 Nov 29, 2021
baea5ad
let chaos exec DDL respect task context
lance6716 Nov 30, 2021
68a9429
Merge branch 'master' of github.com:pingcap/ticdc into reproduce
lance6716 Nov 30, 2021
6cee315
add IT to test list
lance6716 Nov 30, 2021
78f01e9
Merge branch 'master' of github.com:pingcap/ticdc into reproduce
lance6716 Dec 1, 2021
1787ffd
address comment 1
lance6716 Dec 1, 2021
7644130
address comment
lance6716 Dec 2, 2021
66fc82f
address comment and remove chaos trigger
lance6716 Dec 6, 2021
7e5b2c6
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
c4f0023
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
67f0c1a
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
b7b4120
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
09d0f67
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
9f40587
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
7b8588d
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
3eb60fe
Merge branch 'master' into reproduce
ti-chi-bot Dec 6, 2021
d981941
Merge branch 'master' into reproduce
ti-chi-bot Dec 7, 2021
c2c8acf
fix test
lance6716 Dec 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/dm_chaos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ name: DM Chaos
on:
schedule:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8
pull_request:
branches:
- master

# See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency.
concurrency:
Expand Down
5 changes: 1 addition & 4 deletions dm/chaos/cases/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
tcontext "github.com/pingcap/ticdc/dm/pkg/context"
"github.com/pingcap/ticdc/dm/pkg/log"
"github.com/pingcap/ticdc/dm/pkg/retry"
"github.com/pingcap/ticdc/dm/pkg/utils"
)

// dbConn holds a connection to a database and supports to reset the connection.
Expand Down Expand Up @@ -100,9 +99,7 @@ func (c *dbConn) execSQLs(ctx context.Context, queries ...string) error {
}

// execSQLs executes DDL queries.
func (c *dbConn) execDDLs(queries ...string) error {
ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultDBTimeout)
defer cancel()
func (c *dbConn) execDDLs(ctx context.Context, queries ...string) error {
return c.execSQLs(ctx, queries...)
}

Expand Down
8 changes: 4 additions & 4 deletions dm/chaos/cases/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (t *task) incrLoop() error {

// execute preSQLs in upstream
for _, sql := range t.caseGenerator.GetPreSQLs() {
if err := t.sourceConns[sql.source].execDDLs(sql.statement); err != nil {
if err := t.sourceConns[sql.source].execDDLs(t.ctx, sql.statement); err != nil {
return err
}
}
Expand Down Expand Up @@ -363,7 +363,7 @@ func (t *task) genIncrData(pCtx context.Context) (err error) {
}
for _, testSQL := range testSQLs {
log.L().Info("execute test case sql", zap.String("ddl", testSQL.statement), zap.Int("source", testSQL.source))
if err2 := t.sourceConns[testSQL.source].execDDLs(testSQL.statement); err2 != nil {
if err2 := t.sourceConns[testSQL.source].execDDLs(t.ctx, testSQL.statement); err2 != nil {
return err2
}
}
Expand Down Expand Up @@ -401,7 +401,7 @@ func (t *task) genIncrData(pCtx context.Context) (err error) {
if err != nil {
return err
}
if err = t.sourceConns[idx].execDDLs(query); err != nil {
if err = t.sourceConns[idx].execDDLs(t.ctx, query); err != nil {
return err
}

Expand Down Expand Up @@ -440,7 +440,7 @@ func (t *task) genIncrData(pCtx context.Context) (err error) {
conn2 := conn
i2 := i
eg.Go(func() error {
if err2 := conn2.execDDLs(query); err2 != nil {
if err2 := conn2.execDDLs(t.ctx, query); err2 != nil {
if utils.IsMySQLError(err2, mysql.ErrDupFieldName) {
t.logger.Warn("ignore duplicate field name for ddl", log.ShortError(err))
return nil
Expand Down
17 changes: 17 additions & 0 deletions dm/pkg/binlog/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/mysql"

Expand Down Expand Up @@ -476,3 +477,19 @@ func statusVarsToKV(statusVars []byte) (map[byte][]byte, error) {

return vars, nil
}

// GetGTIDStr gets GTID string representation from a GTID event ot MariaDB GTID evnets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GetGTIDStr gets GTID string representation from a GTID event ot MariaDB GTID evnets.
// GetGTIDStr gets GTID string representation from a GTID event or MariaDB GTID evnets.

// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L732-L749

func GetGTIDStr(e *replication.BinlogEvent) (string, error) {
switch ev := e.Event.(type) {
case *replication.GTIDEvent:
u, _ := uuid.FromBytes(ev.SID)
return fmt.Sprintf("%s:%d", u.String(), ev.GNO), nil
case *replication.MariadbGTIDEvent:
GTID := ev.GTID
return fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber), nil
default:
return "", fmt.Errorf("unsupported event type %d", e.Header.EventType)
}
}
16 changes: 8 additions & 8 deletions dm/pkg/binlog/reader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,9 @@ package reader

import (
"context"
"fmt"

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
Expand Down Expand Up @@ -82,16 +80,18 @@ func GetGTIDsForPosFromStreamer(ctx context.Context, r Streamer, endPos gmysql.P
if latestGSet == nil {
return nil, errors.Errorf("should have a PreviousGTIDsEvent before the GTIDEvent %+v", e.Header)
}
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736
u, _ := uuid.FromBytes(ev.SID)
nextGTIDStr = fmt.Sprintf("%s:%d", u.String(), ev.GNO)
nextGTIDStr, err = event.GetGTIDStr(e)
if err != nil {
return nil, err
}
case *replication.MariadbGTIDEvent:
if latestGSet == nil {
return nil, errors.Errorf("should have a MariadbGTIDListEvent before the MariadbGTIDEvent %+v", e.Header)
}
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745
GTID := ev.GTID
nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
nextGTIDStr, err = event.GetGTIDStr(e)
if err != nil {
return nil, err
}
case *replication.PreviousGTIDsEvent:
// if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549
Expand Down
16 changes: 8 additions & 8 deletions dm/relay/file_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,12 @@ package relay
import (
"bytes"
"context"
"fmt"
"io"
"os"
"time"

gmysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/tidb/parser"

"github.com/pingcap/ticdc/dm/pkg/binlog/event"
Expand Down Expand Up @@ -219,16 +217,18 @@ func getTxnPosGTIDs(ctx context.Context, filename string, p *parser.Parser) (int
if latestGSet == nil {
return 0, nil, terror.ErrRelayNeedPrevGTIDEvBeforeGTIDEv.Generate(e.Header)
}
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736
u, _ := uuid.FromBytes(ev.SID)
nextGTIDStr = fmt.Sprintf("%s:%d", u.String(), ev.GNO)
nextGTIDStr, err = event.GetGTIDStr(e)
if err != nil {
return 0, nil, err
}
case *replication.MariadbGTIDEvent:
if latestGSet == nil {
return 0, nil, terror.ErrRelayNeedMaGTIDListEvBeforeGTIDEv.Generate(e.Header)
}
// learn from: https://github.com/go-mysql-org/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745
GTID := ev.GTID
nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber)
nextGTIDStr, err = event.GetGTIDStr(e)
if err != nil {
return 0, nil, err
}
case *replication.PreviousGTIDsEvent:
// if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549
Expand Down
59 changes: 34 additions & 25 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package relay

import (
"context"
"fmt"
"io"
"os"
"path"
Expand All @@ -27,7 +26,6 @@ import (
"github.com/BurntSushi/toml"
"github.com/go-mysql-org/go-mysql/mysql"
"github.com/go-mysql-org/go-mysql/replication"
"github.com/google/uuid"
"github.com/pingcap/errors"
"go.uber.org/zap"

Expand Down Expand Up @@ -75,6 +73,8 @@ type BinlogReader struct {
relay Process

currentUUID string // current UUID(with suffix)

lastFileGracefulEnd bool
}

// newBinlogReader creates a new BinlogReader.
Expand All @@ -91,13 +91,14 @@ func newBinlogReader(logger log.Logger, cfg *BinlogReaderConfig, relay Process)
newtctx := tcontext.NewContext(ctx, logger.WithFields(zap.String("component", "binlog reader")))

binlogReader := &BinlogReader{
cfg: cfg,
parser: parser,
indexPath: path.Join(cfg.RelayDir, utils.UUIDIndexFilename),
cancel: cancel,
tctx: newtctx,
notifyCh: make(chan interface{}, 1),
relay: relay,
cfg: cfg,
parser: parser,
indexPath: path.Join(cfg.RelayDir, utils.UUIDIndexFilename),
cancel: cancel,
tctx: newtctx,
notifyCh: make(chan interface{}, 1),
relay: relay,
lastFileGracefulEnd: true,
}
binlogReader.relay.RegisterListener(binlogReader)
return binlogReader
Expand Down Expand Up @@ -300,7 +301,7 @@ type SwitchPath struct {
nextBinlogName string
}

// parseRelay parses relay root directory, it support master-slave switch (switching to next sub directory).
// parseRelay parses relay root directory, it supports master-slave switch (switching to next sub directory).
func (r *BinlogReader) parseRelay(ctx context.Context, s *LocalStreamer, pos mysql.Position) error {
currentUUID, _, realPos, err := binlog.ExtractPos(pos, r.uuids)
if err != nil {
Expand Down Expand Up @@ -338,6 +339,16 @@ func (r *BinlogReader) parseRelay(ctx context.Context, s *LocalStreamer, pos mys
realPos.Name = switchPath.nextBinlogName
realPos.Pos = binlog.FileHeaderLen // start from pos 4 for next sub directory / file
r.tctx.L().Info("switching to next ready sub directory", zap.String("next uuid", r.currentUUID), zap.Stringer("position", pos))

// when switching sub directory, last binlog file may contain unfinished transaction, so we send a notification.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// when switching sub directory, last binlog file may contain unfinished transaction, so we send a notification.
// when switching subdirectory, last binlog file may contain unfinished transaction, so we send a notification.

if !r.lastFileGracefulEnd {
s.ch <- &replication.BinlogEvent{
RawData: []byte(ErrorMaybeDuplicateEvent.Error()),
Header: &replication.EventHeader{
EventType: replication.IGNORABLE_EVENT,
},
}
}
}
}

Expand Down Expand Up @@ -485,8 +496,7 @@ func (r *BinlogReader) parseFileAsPossible(ctx context.Context, s *LocalStreamer
}
}

// parseFile parses single relay log file from specified offset
// TODO: move all stateful variables into a class, such as r.fileParser.
// parseFile parses single relay log file from specified offset.
func (r *BinlogReader) parseFile(
ctx context.Context,
s *LocalStreamer,
Expand All @@ -499,6 +509,7 @@ func (r *BinlogReader) parseFile(
}

offset := state.latestPos
r.lastFileGracefulEnd = false

onEventFunc := func(e *replication.BinlogEvent) error {
if ce := r.tctx.L().Check(zap.DebugLevel, ""); ce != nil {
Expand All @@ -520,6 +531,7 @@ func (r *BinlogReader) parseFile(
if e.Header.Timestamp != 0 && e.Header.LogPos != 0 {
// not fake rotate event, update file pos
state.latestPos = int64(e.Header.LogPos)
r.lastFileGracefulEnd = true
} else {
r.tctx.L().Debug("skip fake rotate event", zap.Reflect("header", e.Header))
}
Expand All @@ -538,8 +550,11 @@ func (r *BinlogReader) parseFile(
state.latestPos = int64(e.Header.LogPos)
break
}
u, _ := uuid.FromBytes(ev.SID)
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%s:%d", u.String(), ev.GNO))
gtidStr, err2 := event.GetGTIDStr(e)
if err2 != nil {
return errors.Trace(err2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we have abandoned trace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember I have seen some zap logger printing error stack 🤔

}
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -549,8 +564,11 @@ func (r *BinlogReader) parseFile(
state.latestPos = int64(e.Header.LogPos)
break
}
GTID := ev.GTID
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
gtidStr, err2 := event.GetGTIDStr(e)
if err2 != nil {
return errors.Trace(err2)
}
state.replaceWithHeartbeat, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -627,15 +645,6 @@ func (r *BinlogReader) parseFile(
if err != nil {
if state.possibleLast && isIgnorableParseError(err) {
r.tctx.L().Warn("fail to parse relay log file, meet some ignorable error", zap.String("file", state.fullPath), zap.Int64("offset", offset), zap.Error(err))
// the file is truncated, we send a mock event with `IGNORABLE_EVENT` to notify the the consumer
// TODO: should add a integration test for this
e := &replication.BinlogEvent{
RawData: []byte(ErrorMaybeDuplicateEvent.Error()),
Header: &replication.EventHeader{
EventType: replication.IGNORABLE_EVENT,
},
}
s.ch <- e
} else {
r.tctx.L().Error("parse relay log file", zap.String("file", state.fullPath), zap.Int64("offset", offset), zap.Error(err))
return false, false, terror.ErrParserParseRelayLog.Delegate(err, state.fullPath)
Expand Down
Loading