From 81d0b462e25eb2714238b3ee43b20694782f3e41 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 9 Apr 2019 20:32:29 +0800 Subject: [PATCH 01/28] reader: simple wrapper for TCPReader to reader binlog events from master --- relay/reader/reader.go | 132 +++++++++++++++++++++++++++++++++++++++++ relay/reader/stage.go | 36 +++++++++++ 2 files changed, 168 insertions(+) create mode 100644 relay/reader/reader.go create mode 100644 relay/reader/stage.go diff --git a/relay/reader/reader.go b/relay/reader/reader.go new file mode 100644 index 0000000000..4d7e51d3f6 --- /dev/null +++ b/relay/reader/reader.go @@ -0,0 +1,132 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "sync" + + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + "github.com/siddontang/go/sync2" + + br "github.com/pingcap/dm/pkg/binlog/reader" + "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" +) + +// Reader reads binlog events from a upstream master server. +// A transformer should receive binlog events from this reader. +// The reader should support: +// 1. handle expected errors +// 2. do retry if possible +type Reader interface { + // Start starts the reading process. + Start() error + + // Close closes the reader and release the resource. + Close() error + + // GetEvent gets the binlog event one by one, it will block if no event can be read. + // You can pass a context (like Cancel or Timeout) to break the block. + GetEvent(ctx context.Context) (*replication.BinlogEvent, error) +} + +// Config is the configuration used by the Reader. +type Config struct { + SyncConfig replication.BinlogSyncerConfig + Pos mysql.Position + GTIDs gtid.Set + EnableGTID bool + MasterID string // the identifier for the master, used when logging. +} + +// reader implements Reader interface. +type reader struct { + cfg *Config + mu sync.Mutex + stage sync2.AtomicInt32 + + in br.Reader // the underlying reader used to read binlog events. + out chan *replication.BinlogEvent +} + +// NewReader creates a Reader instance. +func NewReader(cfg *Config) Reader { + return &reader{ + cfg: cfg, + in: br.NewTCPReader(cfg.SyncConfig), + out: make(chan *replication.BinlogEvent), + } +} + +// Start implements Reader.Start. +func (r *reader) Start() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage.Get() != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) + } + + defer func() { + status := r.in.Status() + log.Infof("[relay] set up binlog reader for master %s with status %s", r.cfg.MasterID, status) + }() + + if r.cfg.EnableGTID { + return r.setUpReaderByGTID() + } + return r.setUpReaderByPos() +} + +// Close implements Reader.Close. +func (r *reader) Close() error { + if r.stage.Get() == int32(stageClosed) { + return errors.New("already closed") + } + + err := r.in.Close() + r.stage.Set(int32(stageClosed)) + return errors.Trace(err) +} + +// GetEvent implements Reader.GetEvent. +func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if r.stage.Get() != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) + } + + // TODO: handle errors and retry here + return r.in.GetEvent(ctx) +} + +func (r *reader) setUpReaderByGTID() error { + gs := r.cfg.GTIDs + log.Infof("[relay] start sync for master %s from GTID set %s", r.cfg.MasterID, gs) + err := r.in.StartSyncByGTID(gs) + if err != nil { + log.Errorf("[relay] start sync for master %s from GTID set %s error %v", + r.cfg.MasterID, gs, errors.ErrorStack(err)) + return r.setUpReaderByPos() + } + return nil +} + +func (r *reader) setUpReaderByPos() error { + pos := r.cfg.Pos + log.Infof("[relay] start sync for master %s from position %s", r.cfg.MasterID, pos) + return r.in.StartSyncByPos(pos) +} diff --git a/relay/reader/stage.go b/relay/reader/stage.go new file mode 100644 index 0000000000..d5be1ccabf --- /dev/null +++ b/relay/reader/stage.go @@ -0,0 +1,36 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +type readerStage int32 + +const ( + stageNew readerStage = iota + stagePrepared + stageClosed +) + +// String implements Stringer.String. +func (s readerStage) String() string { + switch s { + case stageNew: + return "new" + case stagePrepared: + return "prepared" + case stageClosed: + return "closed" + default: + return "unknown" + } +} From 0e7747e941833e563e63249b4844b20c8ca1a9e7 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 10 Apr 2019 16:16:20 +0800 Subject: [PATCH 02/28] reader: add a MockReader --- pkg/binlog/reader/mock.go | 89 ++++++++++++++++++++++++++ pkg/binlog/reader/mock_test.go | 112 +++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 pkg/binlog/reader/mock.go create mode 100644 pkg/binlog/reader/mock_test.go diff --git a/pkg/binlog/reader/mock.go b/pkg/binlog/reader/mock.go new file mode 100644 index 0000000000..0d16751fb0 --- /dev/null +++ b/pkg/binlog/reader/mock.go @@ -0,0 +1,89 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/gtid" +) + +// MockReader is a binlog event reader which read binlog events from an input channel. +type MockReader struct { + ch chan *replication.BinlogEvent + ech chan error +} + +// NewMockReader creates a MockReader instance. +func NewMockReader() Reader { + return &MockReader{ + ch: make(chan *replication.BinlogEvent), + ech: make(chan error), + } +} + +// StartSyncByPos implements Reader.StartSyncByPos. +func (r *MockReader) StartSyncByPos(pos gmysql.Position) error { + return nil +} + +// StartSyncByGTID implements Reader.StartSyncByGTID. +func (r *MockReader) StartSyncByGTID(gSet gtid.Set) error { + return nil +} + +// Close implements Reader.Close. +func (r *MockReader) Close() error { + return nil +} + +// GetEvent implements Reader.GetEvent. +func (r *MockReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + select { + case ev := <-r.ch: + return ev, nil + case err := <-r.ech: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Status implements Reader.Status. +func (r *MockReader) Status() interface{} { + return nil +} + +// PushEvent pushes an event into the reader. +func (r *MockReader) PushEvent(ctx context.Context, ev *replication.BinlogEvent) error { + select { + case r.ch <- ev: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// PushError pushes an error into the reader. +func (r *MockReader) PushError(ctx context.Context, err error) error { + select { + case r.ech <- err: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go new file mode 100644 index 0000000000..3422aa4be3 --- /dev/null +++ b/pkg/binlog/reader/mock_test.go @@ -0,0 +1,112 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "errors" + "time" + + . "github.com/pingcap/check" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +var ( + _ = Suite(&testMockReaderSuite{}) +) + +type testMockReaderSuite struct { +} + +type testCase struct { + ev *replication.BinlogEvent + err error +} + +func (t *testMockReaderSuite) TestRead(c *C) { + r := NewMockReader() + + // some interface methods do nothing + c.Assert(r.StartSyncByPos(mysql.Position{}), IsNil) + c.Assert(r.StartSyncByGTID(nil), IsNil) + c.Assert(r.Status(), IsNil) + c.Assert(r.Close(), IsNil) + + cases := []testCase{ + { + ev: &replication.BinlogEvent{ + RawData: []byte{1}, + }, + err: nil, + }, + { + ev: &replication.BinlogEvent{ + RawData: []byte{2}, + }, + err: nil, + }, + { + ev: nil, + err: errors.New("1"), + }, + { + ev: &replication.BinlogEvent{ + RawData: []byte{3}, + }, + err: nil, + }, + { + ev: nil, + err: errors.New("2"), + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + mockR := r.(*MockReader) + go func() { + for _, cs := range cases { + if cs.err != nil { + c.Assert(mockR.PushError(ctx, cs.err), IsNil) + } else { + c.Assert(mockR.PushEvent(ctx, cs.ev), IsNil) + } + } + }() + + got := make([]testCase, 0, len(cases)) + for { + ev, err := r.GetEvent(ctx) + if err != nil { + got = append(got, testCase{ev: nil, err: err}) + } else { + got = append(got, testCase{ev: ev, err: nil}) + } + if len(got) == len(cases) || ctx.Err() != nil { + break + } + } + + c.Assert(ctx.Err(), IsNil) + c.Assert(got, DeepEquals, cases) + + cancel() // cancel manually + c.Assert(mockR.PushError(ctx, cases[0].err), Equals, ctx.Err()) + c.Assert(mockR.PushEvent(ctx, cases[0].ev), Equals, ctx.Err()) + ev, err := r.GetEvent(ctx) + c.Assert(ev, IsNil) + c.Assert(err, Equals, ctx.Err()) +} From 9a6553a1447a525191b4397ecfc51118e9ea08c7 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 10 Apr 2019 16:56:41 +0800 Subject: [PATCH 03/28] reader: test reader with MockReader --- relay/reader/reader.go | 9 +++- relay/reader/reader_test.go | 105 ++++++++++++++++++++++++++++++++++++ 2 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 relay/reader/reader_test.go diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 4d7e51d3f6..108f15262e 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -86,10 +86,15 @@ func (r *reader) Start() error { log.Infof("[relay] set up binlog reader for master %s with status %s", r.cfg.MasterID, status) }() + var err error if r.cfg.EnableGTID { - return r.setUpReaderByGTID() + err = r.setUpReaderByGTID() + } else { + err = r.setUpReaderByPos() } - return r.setUpReaderByPos() + + r.stage.Set(int32(stagePrepared)) + return err } // Close implements Reader.Close. diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go new file mode 100644 index 0000000000..aeaee7b7e4 --- /dev/null +++ b/relay/reader/reader_test.go @@ -0,0 +1,105 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "testing" + "time" + + . "github.com/pingcap/check" + "github.com/siddontang/go-mysql/replication" + + br "github.com/pingcap/dm/pkg/binlog/reader" +) + +var ( + _ = Suite(&testReaderSuite{}) +) + +func TestSuite(t *testing.T) { + TestingT(t) +} + +type testReaderSuite struct { +} + +func (t *testReaderSuite) TestInterface(c *C) { + cases := []*replication.BinlogEvent{ + {RawData: []byte{1}}, + {RawData: []byte{2}}, + {RawData: []byte{3}}, + } + + cfg := &Config{ + SyncConfig: replication.BinlogSyncerConfig{ + ServerID: 101, + }, + MasterID: "test-master", + } + + // test with position + r := NewReader(cfg) + t.testInterfaceWithReader(c, r, cases) + + // test with GTID + cfg.EnableGTID = true + r = NewReader(cfg) + t.testInterfaceWithReader(c, r, cases) +} + +func (t *testReaderSuite) testInterfaceWithReader(c *C, r Reader, cases []*replication.BinlogEvent) { + // replace underlying reader with a mock reader for testing + concreteR := r.(*reader) + c.Assert(concreteR, NotNil) + mockR := br.NewMockReader() + concreteR.in = mockR + + // start reader + err := r.Start() + c.Assert(err, IsNil) + err = r.Start() // call multi times + c.Assert(err, NotNil) + + // getEvent by pushing event to mock reader + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + go func() { + concreteMR := mockR.(*br.MockReader) + for _, cs := range cases { + c.Assert(concreteMR.PushEvent(ctx, cs), IsNil) + } + }() + got := make([]*replication.BinlogEvent, 0, len(cases)) + for { + ev, err2 := r.GetEvent(ctx) + c.Assert(err2, IsNil) + got = append(got, ev) + if len(got) == len(cases) { + break + } + } + c.Assert(got, DeepEquals, cases) + + // close reader + err = r.Close() + c.Assert(err, IsNil) + err = r.Close() + c.Assert(err, NotNil) // call multi times + + // getEvent from a closed reader + ev, err := r.GetEvent(ctx) + c.Assert(err, NotNil) + c.Assert(ev, IsNil) +} From ef7677366f6595310603025bfe7178e89004fbdf Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 10 Apr 2019 17:39:04 +0800 Subject: [PATCH 04/28] *: test GTID backoff to position --- pkg/binlog/reader/mock.go | 11 +++++++--- pkg/binlog/reader/mock_test.go | 11 +++++++++- relay/reader/reader_test.go | 38 ++++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 4 deletions(-) diff --git a/pkg/binlog/reader/mock.go b/pkg/binlog/reader/mock.go index 0d16751fb0..8776fb7bfd 100644 --- a/pkg/binlog/reader/mock.go +++ b/pkg/binlog/reader/mock.go @@ -26,6 +26,11 @@ import ( type MockReader struct { ch chan *replication.BinlogEvent ech chan error + + // returned error for methods + ErrStartByPos error + ErrStartByGTID error + ErrClose error } // NewMockReader creates a MockReader instance. @@ -38,17 +43,17 @@ func NewMockReader() Reader { // StartSyncByPos implements Reader.StartSyncByPos. func (r *MockReader) StartSyncByPos(pos gmysql.Position) error { - return nil + return r.ErrStartByPos } // StartSyncByGTID implements Reader.StartSyncByGTID. func (r *MockReader) StartSyncByGTID(gSet gtid.Set) error { - return nil + return r.ErrStartByGTID } // Close implements Reader.Close. func (r *MockReader) Close() error { - return nil + return r.ErrClose } // GetEvent implements Reader.GetEvent. diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go index 3422aa4be3..0022665dc5 100644 --- a/pkg/binlog/reader/mock_test.go +++ b/pkg/binlog/reader/mock_test.go @@ -44,6 +44,16 @@ func (t *testMockReaderSuite) TestRead(c *C) { c.Assert(r.Status(), IsNil) c.Assert(r.Close(), IsNil) + // replace with special error + mockR := r.(*MockReader) + errSpecial := errors.New("special error for methods") + mockR.ErrStartByPos = errSpecial + mockR.ErrStartByGTID = errSpecial + mockR.ErrClose = errSpecial + c.Assert(r.StartSyncByPos(mysql.Position{}), Equals, errSpecial) + c.Assert(r.StartSyncByGTID(nil), Equals, errSpecial) + c.Assert(r.Close(), Equals, errSpecial) + cases := []testCase{ { ev: &replication.BinlogEvent{ @@ -76,7 +86,6 @@ func (t *testMockReaderSuite) TestRead(c *C) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - mockR := r.(*MockReader) go func() { for _, cs := range cases { if cs.err != nil { diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go index aeaee7b7e4..3d65686040 100644 --- a/relay/reader/reader_test.go +++ b/relay/reader/reader_test.go @@ -15,6 +15,7 @@ package reader import ( "context" + "errors" "testing" "time" @@ -103,3 +104,40 @@ func (t *testReaderSuite) testInterfaceWithReader(c *C, r Reader, cases []*repli c.Assert(err, NotNil) c.Assert(ev, IsNil) } + +func (t *testReaderSuite) TestBackOffGTID(c *C) { + cfg := &Config{ + SyncConfig: replication.BinlogSyncerConfig{ + ServerID: 101, + }, + MasterID: "test-master", + } + errByPos := errors.New("start sync by pos error") + errByGTID := errors.New("start sync by GTID error") + + // test with position + r := NewReader(cfg) + err := t.testBackOffWithReader(c, r, errByPos, errByGTID) + c.Assert(err, Equals, errByPos) + + // test with GTID + cfg.EnableGTID = true + r = NewReader(cfg) + err = t.testBackOffWithReader(c, r, errByPos, errByGTID) + c.Assert(err, Equals, errByPos) // also returned errByPos because backoff to position mode +} + +func (t *testReaderSuite) testBackOffWithReader(c *C, r Reader, errByPos error, errByGTID error) error { + // replace underlying reader with a mock reader for testing + concreteR := r.(*reader) + c.Assert(concreteR, NotNil) + mockR := br.NewMockReader() + concreteR.in = mockR + + // specify an error for StartSyncByPos/StartSyncByGTID + concreteMR := mockR.(*br.MockReader) + concreteMR.ErrStartByPos = errByPos + concreteMR.ErrStartByGTID = errByGTID + + return r.Start() +} From 9fa15adbdf15e59746793a5a876add5407a93e41 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 10 Apr 2019 19:25:26 +0800 Subject: [PATCH 05/28] reader: get event handle retryable and ignorable error --- pkg/binlog/reader/mock_test.go | 10 +++--- relay/reader/error.go | 40 +++++++++++++++++++++ relay/reader/reader.go | 15 ++++++-- relay/reader/reader_test.go | 63 ++++++++++++++++++++++++++++++---- 4 files changed, 115 insertions(+), 13 deletions(-) create mode 100644 relay/reader/error.go diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go index 0022665dc5..4450c0929d 100644 --- a/pkg/binlog/reader/mock_test.go +++ b/pkg/binlog/reader/mock_test.go @@ -96,21 +96,21 @@ func (t *testMockReaderSuite) TestRead(c *C) { } }() - got := make([]testCase, 0, len(cases)) + obtained := make([]testCase, 0, len(cases)) for { ev, err := r.GetEvent(ctx) if err != nil { - got = append(got, testCase{ev: nil, err: err}) + obtained = append(obtained, testCase{ev: nil, err: err}) } else { - got = append(got, testCase{ev: ev, err: nil}) + obtained = append(obtained, testCase{ev: ev, err: nil}) } - if len(got) == len(cases) || ctx.Err() != nil { + if len(obtained) == len(cases) || ctx.Err() != nil { break } } c.Assert(ctx.Err(), IsNil) - c.Assert(got, DeepEquals, cases) + c.Assert(obtained, DeepEquals, cases) cancel() // cancel manually c.Assert(mockR.PushError(ctx, cases[0].err), Equals, ctx.Err()) diff --git a/relay/reader/error.go b/relay/reader/error.go new file mode 100644 index 0000000000..466c8c2e6c --- /dev/null +++ b/relay/reader/error.go @@ -0,0 +1,40 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + "github.com/pingcap/errors" +) + +// isRetryableError checks whether the error is retryable. +func isRetryableError(err error) bool { + err = errors.Cause(err) + switch err { + case context.DeadlineExceeded: + return true + } + return false +} + +// isIgnorableError checks whether the error is ignorable. +func isIgnorableError(err error) bool { + err = errors.Cause(err) + switch err { + case context.Canceled: + return true + } + return false +} diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 108f15262e..a82ba75a26 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -114,8 +114,19 @@ func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) } - // TODO: handle errors and retry here - return r.in.GetEvent(ctx) + for { + ev, err := r.in.GetEvent(ctx) + if err == nil { + return ev, nil + } else if isRetryableError(err) { + log.Warnf("[relay] get event with retryable error %s", err) + continue // retry without sleep + } else if isIgnorableError(err) { + log.Warnf("[relay] get event with ignorable error %s", err) + return nil, nil // return without error and also without binlog event + } + return nil, errors.Trace(err) + } } func (r *reader) setUpReaderByGTID() error { diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go index 3d65686040..a81f14b5db 100644 --- a/relay/reader/reader_test.go +++ b/relay/reader/reader_test.go @@ -15,11 +15,11 @@ package reader import ( "context" - "errors" "testing" "time" . "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/siddontang/go-mysql/replication" br "github.com/pingcap/dm/pkg/binlog/reader" @@ -76,22 +76,22 @@ func (t *testReaderSuite) testInterfaceWithReader(c *C, r Reader, cases []*repli // getEvent by pushing event to mock reader ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() + concreteMR := mockR.(*br.MockReader) go func() { - concreteMR := mockR.(*br.MockReader) for _, cs := range cases { c.Assert(concreteMR.PushEvent(ctx, cs), IsNil) } }() - got := make([]*replication.BinlogEvent, 0, len(cases)) + obtained := make([]*replication.BinlogEvent, 0, len(cases)) for { ev, err2 := r.GetEvent(ctx) c.Assert(err2, IsNil) - got = append(got, ev) - if len(got) == len(cases) { + obtained = append(obtained, ev) + if len(obtained) == len(cases) { break } } - c.Assert(got, DeepEquals, cases) + c.Assert(obtained, DeepEquals, cases) // close reader err = r.Close() @@ -141,3 +141,54 @@ func (t *testReaderSuite) testBackOffWithReader(c *C, r Reader, errByPos error, return r.Start() } + +func (t *testReaderSuite) TestGetEventWithError(c *C) { + cfg := &Config{ + SyncConfig: replication.BinlogSyncerConfig{ + ServerID: 101, + }, + MasterID: "test-master", + } + + r := NewReader(cfg) + // replace underlying reader with a mock reader for testing + concreteR := r.(*reader) + c.Assert(concreteR, NotNil) + mockR := br.NewMockReader() + concreteR.in = mockR + + errOther := errors.New("other error") + in := []error{ + context.DeadlineExceeded, // retryable + context.Canceled, // ignorable + errOther, + } + expected := []error{ + nil, // from ignorable + errOther, + } + + err := r.Start() + c.Assert(err, IsNil) + + // getEvent by pushing event to mock reader + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + concreteMR := mockR.(*br.MockReader) + go func() { + for _, cs := range in { + c.Assert(concreteMR.PushError(ctx, cs), IsNil) + } + }() + + obtained := make([]error, 0, len(expected)) + for { + ev, err2 := r.GetEvent(ctx) + c.Assert(ev, IsNil) + obtained = append(obtained, err2) + if errors.Cause(err2) == errOther { + break // all received + } + } + c.Assert(obtained, DeepEquals, expected) +} From 33e22e0e746ce5a9dbfa5247a34ef4a0487d7629 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Wed, 10 Apr 2019 20:18:16 +0800 Subject: [PATCH 06/28] reader: add test cases for error --- relay/reader/error_test.go | 54 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 relay/reader/error_test.go diff --git a/relay/reader/error_test.go b/relay/reader/error_test.go new file mode 100644 index 0000000000..67f7f1962b --- /dev/null +++ b/relay/reader/error_test.go @@ -0,0 +1,54 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" +) + +var ( + _ = Suite(&testErrorSuite{}) +) + +type testErrorSuite struct { +} + +func (t *testErrorSuite) TestRetryable(c *C) { + err := errors.New("custom error") + c.Assert(isRetryableError(err), IsFalse) + + cases := []error{ + context.DeadlineExceeded, + errors.Annotate(context.DeadlineExceeded, "annotated"), + } + for _, cs := range cases { + c.Assert(isRetryableError(cs), IsTrue) + } +} + +func (t *testErrorSuite) TestIgnorable(c *C) { + err := errors.New("custom error") + c.Assert(isIgnorableError(err), IsFalse) + + cases := []error{ + context.Canceled, + errors.Annotate(context.Canceled, "annotated"), + } + for _, cs := range cases { + c.Assert(isIgnorableError(cs), IsTrue) + } +} From 1f5bc1eb295059784fe6475b857e2abd6c98af39 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Thu, 11 Apr 2019 14:36:38 +0800 Subject: [PATCH 07/28] event: add GenRotateEvent --- pkg/binlog/event/event.go | 27 +++++++++++++++++++++++++++ pkg/binlog/event/event_test.go | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/pkg/binlog/event/event.go b/pkg/binlog/event/event.go index 87b840fe0b..d30ae23482 100644 --- a/pkg/binlog/event/event.go +++ b/pkg/binlog/event/event.go @@ -159,6 +159,33 @@ func GenFormatDescriptionEvent(header *replication.EventHeader, latestPos uint32 return ev, errors.Trace(err) } +// GenRotateEvent generates a RotateEvent. +// ref: https://dev.mysql.com/doc/internals/en/rotate-event.html +func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogName []byte, position uint64) (*replication.BinlogEvent, error) { + if len(nextLogName) == 0 { + return nil, errors.NotValidf("empty next binlog name") + } + + // Post-header + postHeader := new(bytes.Buffer) + err := binary.Write(postHeader, binary.LittleEndian, position) + if err != nil { + return nil, errors.Annotatef(err, "write position %d", position) + } + + // Payload + payload := new(bytes.Buffer) + err = binary.Write(payload, binary.LittleEndian, nextLogName) + if err != nil { + return nil, errors.Annotatef(err, "write next binlog name % X", nextLogName) + } + + buf := new(bytes.Buffer) + event := &replication.RotateEvent{} + ev, err := assembleEvent(buf, event, false, *header, replication.ROTATE_EVENT, latestPos, postHeader.Bytes(), payload.Bytes()) + return ev, errors.Trace(err) +} + // GenPreviousGTIDsEvent generates a PreviousGTIDsEvent. // go-mysql has no PreviousGTIDsEvent struct defined, so return the event's raw data instead. // MySQL has no internal doc for PREVIOUS_GTIDS_EVENT. diff --git a/pkg/binlog/event/event_test.go b/pkg/binlog/event/event_test.go index 5e4d8e3072..d890b7f8b8 100644 --- a/pkg/binlog/event/event_test.go +++ b/pkg/binlog/event/event_test.go @@ -115,6 +115,40 @@ func (t *testEventSuite) TestGenFormatDescriptionEvent(c *C) { c.Assert(err, IsNil) } +func (t *testEventSuite) TestGenRotateEvent(c *C) { + var ( + header = &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: 11, + Flags: 0x01, + } + latestPos uint32 = 4 + nextLogName []byte // nil + position uint64 = 123 + ) + + // empty nextLogName, invalid + rotateEv, err := GenRotateEvent(header, latestPos, nextLogName, position) + c.Assert(err, NotNil) + c.Assert(rotateEv, IsNil) + + // valid nextLogName + nextLogName = []byte("mysql-bin.000010") + rotateEv, err = GenRotateEvent(header, latestPos, nextLogName, position) + c.Assert(err, IsNil) + c.Assert(rotateEv, NotNil) + + // verify the header + verifyHeader(c, rotateEv.Header, header, replication.ROTATE_EVENT, latestPos, uint32(len(rotateEv.RawData))) + + // verify the body + rotateEvBody, ok := rotateEv.Event.(*replication.RotateEvent) + c.Assert(ok, IsTrue) + c.Assert(rotateEvBody, NotNil) + c.Assert(rotateEvBody.NextLogName, DeepEquals, nextLogName) + c.Assert(rotateEvBody.Position, Equals, position) +} + func (t *testEventSuite) TestGenPreviousGTIDsEvent(c *C) { var ( header = &replication.EventHeader{ From 76ba02c02288e8802632c81de34ae2482c99f10f Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Thu, 11 Apr 2019 15:50:42 +0800 Subject: [PATCH 08/28] *: add a transformer --- relay/reader/reader.go | 3 +- relay/transformer/transformer.go | 100 ++++++++++++++ relay/transformer/transformer_test.go | 188 ++++++++++++++++++++++++++ relay/transformer/util.go | 42 ++++++ relay/transformer/util_test.go | 57 ++++++++ 5 files changed, 389 insertions(+), 1 deletion(-) create mode 100644 relay/transformer/transformer.go create mode 100644 relay/transformer/transformer_test.go create mode 100644 relay/transformer/util.go create mode 100644 relay/transformer/util_test.go diff --git a/relay/reader/reader.go b/relay/reader/reader.go index a82ba75a26..eccd9e9419 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -28,7 +28,7 @@ import ( ) // Reader reads binlog events from a upstream master server. -// A transformer should receive binlog events from this reader. +// The read binlog events should be send to a transformer. // The reader should support: // 1. handle expected errors // 2. do retry if possible @@ -109,6 +109,7 @@ func (r *reader) Close() error { } // GetEvent implements Reader.GetEvent. +// If some ignorable error occurred, the returned event and error both are nil. func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if r.stage.Get() != int32(stagePrepared) { return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) diff --git a/relay/transformer/transformer.go b/relay/transformer/transformer.go new file mode 100644 index 0000000000..fbc21e5fbb --- /dev/null +++ b/relay/transformer/transformer.go @@ -0,0 +1,100 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "github.com/pingcap/parser" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +var ( + artificialFlag = uint16(0x0020) // LOG_EVENT_ARTIFICIAL_F +) + +// Result represents a transform result. +type Result struct { + Ignore bool // whether the event should be ignored + LogPos uint32 // binlog event's End_log_pos or Position in RotateEvent + NextLogName string // next binlog filename, only valid for RotateEvent + GTIDSet mysql.GTIDSet // GTIDSet got from QueryEvent and XIDEvent when RawModeEnabled not true + CanSaveGTID bool // whether can save GTID into meta, true for DDL query and XIDEvent +} + +// Transformer receives binlog events from a reader and transforms them. +// The transformed binlog events should be send to one or more writers. +// The transformer should support: +// 1. extract binlog position, GTID info from the event. +// 2. decide the event whether needed by a downstream writer. +// - the downstream writer may also drop some events according to its strategy. +// NOTE: more works maybe moved from outer into Transformer later. +type Transformer interface { + // Transform transforms a binlog event. + Transform(e *replication.BinlogEvent) *Result +} + +// transformer implements Transformer interface. +type transformer struct { + parser2 *parser.Parser // used to parse query statement +} + +// NewTransformer creates a Transformer instance. +func NewTransformer(parser2 *parser.Parser) Transformer { + return &transformer{ + parser2: parser2, + } +} + +// Transform implements Transformer.Transform. +func (t *transformer) Transform(e *replication.BinlogEvent) *Result { + result := &Result{ + LogPos: e.Header.LogPos, + } + + switch ev := e.Event.(type) { + case *replication.RotateEvent: + result.LogPos = uint32(ev.Position) // next event's position + result.NextLogName = string(ev.NextLogName) // for RotateEvent, update binlog name + if e.Header.Timestamp == 0 || e.Header.LogPos == 0 { + result.Ignore = true // ignore fake rotate event + } + case *replication.QueryEvent: + // when RawModeEnabled not true, QueryEvent will be parsed. + // even for `BEGIN`, we still update pos/GTID, but only save GTID for DDL. + result.GTIDSet = ev.GSet + isDDL := checkIsDDL(string(ev.Query), t.parser2) + if isDDL { + result.CanSaveGTID = true + } + case *replication.XIDEvent: + // when RawModeEnabled not true, XIDEvent will be parsed. + result.GTIDSet = ev.GSet + result.CanSaveGTID = true // need save GTID for XID + case *replication.GenericEvent: + // handle some un-parsed events + switch e.Header.EventType { + case replication.HEARTBEAT_EVENT: + // ignore artificial heartbeat event + // ref: https://dev.mysql.com/doc/internals/en/heartbeat-event.html + result.Ignore = true + } + default: + if e.Header.Flags&artificialFlag != 0 { + // ignore events with LOG_EVENT_ARTIFICIAL_F flag(0x0020) set + // ref: https://dev.mysql.com/doc/internals/en/binlog-event-flag.html + result.Ignore = true + } + } + return result +} diff --git a/relay/transformer/transformer_test.go b/relay/transformer/transformer_test.go new file mode 100644 index 0000000000..70e165898a --- /dev/null +++ b/relay/transformer/transformer_test.go @@ -0,0 +1,188 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "testing" + "time" + + "github.com/pingcap/check" + "github.com/pingcap/parser" + "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" +) + +var ( + _ = check.Suite(&testTransformerSuite{}) +) + +func TestSuite(t *testing.T) { + check.TestingT(t) +} + +type testTransformerSuite struct { +} + +type Case struct { + event *replication.BinlogEvent + result *Result +} + +func (t *testTransformerSuite) TestTransform(c *check.C) { + var ( + tran = NewTransformer(parser.New()) + header = &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: 11, + Flags: 0x01, + } + latestPos uint32 = 456789 + gtidStr = "9f61c5f9-1eef-11e9-b6cf-0242ac140003:5" + gtidSet, _ = gtid.ParserGTID(mysql.MySQLFlavor, gtidStr) + schema = []byte("test_schema") + cases = make([]Case, 0, 10) + ) + + // RotateEvent + nextLogName := "mysql-bin.000123" + position := uint64(4) + ev, err := event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + + // fake RotateEvent with zero timestamp + header.Timestamp = 0 + ev, err = event.GenRotateEvent(header, latestPos, []byte(nextLogName), position) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, // ignore fake RotateEvent + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + header.Timestamp = uint32(time.Now().Unix()) // set to non-zero + + // fake RotateEvent with zero logPos + fakeRotateHeader := replication.EventHeader{} + fakeRotateHeader = *header + ev, err = event.GenRotateEvent(&fakeRotateHeader, latestPos, []byte(nextLogName), position) + c.Assert(err, check.IsNil) + ev.Header.LogPos = 0 // set to zero + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, // ignore fake RotateEvent + LogPos: uint32(position), + NextLogName: nextLogName, + }, + }) + + // QueryEvent for DDL + query := []byte("CREATE TABLE test_tbl (c1 INT)") + ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + c.Assert(err, check.IsNil) + ev.Event.(*replication.QueryEvent).GSet = gtidSet.Origin() // set GTIDs manually + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + GTIDSet: gtidSet.Origin(), + CanSaveGTID: true, + }, + }) + + // QueryEvent for non-DDL + query = []byte("BEGIN") + ev, err = event.GenQueryEvent(header, latestPos, 0, 0, 0, nil, schema, query) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + }, + }) + + // XIDEvent + xid := uint64(135) + ev, err = event.GenXIDEvent(header, latestPos, xid) + c.Assert(err, check.IsNil) + ev.Event.(*replication.XIDEvent).GSet = gtidSet.Origin() // set GTIDs manually + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + GTIDSet: gtidSet.Origin(), + CanSaveGTID: true, + }, + }) + + // GenericEvent, non-HEARTBEAT_EVENT + ev = &replication.BinlogEvent{Header: header, Event: &replication.GenericEvent{}} + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + }, + }) + + // GenericEvent, HEARTBEAT_EVENT + genericHeader := replication.EventHeader{} + genericHeader = *header + ev = &replication.BinlogEvent{Header: &genericHeader, Event: &replication.GenericEvent{}} + ev.Header.EventType = replication.HEARTBEAT_EVENT + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, + LogPos: ev.Header.LogPos, + }, + }) + + // other event type without LOG_EVENT_ARTIFICIAL_F + ev, err = event.GenCommonGTIDEvent(mysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + c.Assert(err, check.IsNil) + cases = append(cases, Case{ + event: ev, + result: &Result{ + LogPos: ev.Header.LogPos, + }, + }) + + // other event type with LOG_EVENT_ARTIFICIAL_F + ev, err = event.GenCommonGTIDEvent(mysql.MySQLFlavor, header.ServerID, latestPos, gtidSet) + c.Assert(err, check.IsNil) + ev.Header.Flags |= artificialFlag + cases = append(cases, Case{ + event: ev, + result: &Result{ + Ignore: true, + LogPos: ev.Header.LogPos, + }, + }) + + for _, cs := range cases { + c.Assert(tran.Transform(cs.event), check.DeepEquals, cs.result) + } +} diff --git a/relay/transformer/util.go b/relay/transformer/util.go new file mode 100644 index 0000000000..e900baaf79 --- /dev/null +++ b/relay/transformer/util.go @@ -0,0 +1,42 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "github.com/pingcap/parser" + "github.com/pingcap/parser/ast" + + parserpkg "github.com/pingcap/dm/pkg/parser" + "github.com/pingcap/dm/pkg/utils" +) + +// checkIsDDL checks input SQL whether is a valid DDL statement +func checkIsDDL(sql string, p *parser.Parser) bool { + sql = utils.TrimCtrlChars(sql) + + // if parse error, treat it as not a DDL + stmts, err := parserpkg.Parse(p, sql, "", "") + if err != nil || len(stmts) == 0 { + return false + } + + stmt := stmts[0] + switch stmt.(type) { + case ast.DDLNode: + return true + default: + // other thing this like `BEGIN` + return false + } +} diff --git a/relay/transformer/util_test.go b/relay/transformer/util_test.go new file mode 100644 index 0000000000..34dfe14665 --- /dev/null +++ b/relay/transformer/util_test.go @@ -0,0 +1,57 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package transformer + +import ( + "github.com/pingcap/check" + "github.com/pingcap/parser" +) + +var ( + _ = check.Suite(&testUtilSuite{}) +) + +type testUtilSuite struct { +} + +func (t *testUtilSuite) TestCheckIsDDL(c *check.C) { + var ( + cases = []struct { + sql string + isDDL bool + }{ + { + sql: "CREATE DATABASE test_is_ddl", + isDDL: true, + }, + { + sql: "BEGIN", + isDDL: false, + }, + { + sql: "INSERT INTO test_is_ddl.test_is_ddl_table VALUES (1)", + isDDL: false, + }, + { + sql: "INVAID SQL STATEMENT", + isDDL: false, + }, + } + parser2 = parser.New() + ) + + for _, cs := range cases { + c.Assert(checkIsDDL(cs.sql, parser2), check.Equals, cs.isDDL) + } +} From 0500128670d0c404a816da5a493022d5aa5f278d Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Thu, 11 Apr 2019 21:15:00 +0800 Subject: [PATCH 09/28] reader: add a FileReader --- pkg/binlog/reader/file.go | 150 ++++++++++++++++++++++++++++++++++++++ pkg/binlog/reader/tcp.go | 6 +- 2 files changed, 153 insertions(+), 3 deletions(-) create mode 100644 pkg/binlog/reader/file.go diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go new file mode 100644 index 0000000000..c6682a68bb --- /dev/null +++ b/pkg/binlog/reader/file.go @@ -0,0 +1,150 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// binlog events generator for MySQL used to generate some binlog events for tests. +// Readability takes precedence over performance. + +package reader + +import ( + "context" + "sync" + "time" + + "github.com/pingcap/errors" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + "github.com/siddontang/go/sync2" + + "github.com/pingcap/dm/pkg/gtid" +) + +// FileReader is a binlog event reader which read binlog events from a file. +type FileReader struct { + mu sync.Mutex + + stage sync2.AtomicInt32 + parser *replication.BinlogParser + + ch chan *replication.BinlogEvent + ech chan error + + ctx context.Context + cancel context.CancelFunc +} + +// FileReaderConfig is the configuration used by a FileReader. +type FileReaderConfig struct { + RawMode bool + timezone *time.Location +} + +// FileReaderStatus represents the status of a FileReader. +type FileReaderStatus struct { + Stage string `json:"stage"` + ConnID uint32 `json:"connection"` +} + +// NewFileReader creates a FileReader instance. +func NewFileReader(cfg *FileReaderConfig) Reader { + parser := replication.NewBinlogParser() + parser.SetVerifyChecksum(true) + parser.SetUseDecimal(true) + parser.SetRawMode(cfg.RawMode) + if cfg.timezone != nil { + parser.SetTimestampStringLocation(cfg.timezone) + } + return &FileReader{ + parser: parser, + ch: make(chan *replication.BinlogEvent), // without buffer now + ech: make(chan error), + } +} + +// StartSyncByPos implements Reader.StartSyncByPos. +func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage.Get() != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) + } + + r.ctx, r.cancel = context.WithCancel(context.Background()) + go func() { + err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) + if err != nil { + select { + case r.ech <- err: + case <-r.ctx.Done(): + } + } + }() + + r.stage.Set(int32(stagePrepared)) + return nil +} + +// StartSyncByGTID implements Reader.StartSyncByGTID. +func (r *FileReader) StartSyncByGTID(gSet gtid.Set) error { + // NOTE: may be supported later. + return errors.NotSupportedf("read from file by GTID") +} + +// Close implements Reader.Close. +func (r *FileReader) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage.Get() == int32(stageClosed) { + return errors.New("already closed") + } + + r.parser.Stop() + r.cancel() + r.stage.Set(int32(stageClosed)) + return nil +} + +// GetEvent implements Reader.GetEvent. +func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { + if r.stage.Get() != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) + } + + select { + case ev := <-r.ch: + return ev, nil + case err := <-r.ech: + return nil, err + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Status implements Reader.Status. +func (r *FileReader) Status() interface{} { + stage := r.stage.Get() + return &TCPReaderStatus{ + Stage: readerStage(stage).String(), + } +} + +func (r *FileReader) onEvent(ev *replication.BinlogEvent) error { + select { + case r.ch <- ev: + return nil + case <-r.ctx.Done(): + return r.ctx.Err() + } +} diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index 0f554663a2..d59dc2d0f6 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -69,7 +69,7 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { defer r.mu.Unlock() if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -88,7 +88,7 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { defer r.mu.Unlock() if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) } if gSet == nil { @@ -136,7 +136,7 @@ func (r *TCPReader) Close() error { // GetEvent implements Reader.GetEvent. func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared) + return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) } return r.streamer.GetEvent(ctx) From 2f9db5bf438634c978335af9b4e12728e6435470 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 12 Apr 2019 10:46:30 +0800 Subject: [PATCH 10/28] reader: add channel buffer size control for FileReader; add read/send offset status for FileReader --- pkg/binlog/reader/file.go | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index c6682a68bb..82310d6c59 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -33,8 +33,10 @@ import ( type FileReader struct { mu sync.Mutex - stage sync2.AtomicInt32 - parser *replication.BinlogParser + stage sync2.AtomicInt32 + readOffset sync2.AtomicUint32 + sendOffset sync2.AtomicUint32 + parser *replication.BinlogParser ch chan *replication.BinlogEvent ech chan error @@ -45,14 +47,17 @@ type FileReader struct { // FileReaderConfig is the configuration used by a FileReader. type FileReaderConfig struct { - RawMode bool - timezone *time.Location + EnableRawMode bool + Timezone *time.Location + ChBufferSize int // event channel's buffer size + EchBufferSize int // error channel's buffer size } // FileReaderStatus represents the status of a FileReader. type FileReaderStatus struct { - Stage string `json:"stage"` - ConnID uint32 `json:"connection"` + Stage string `json:"stage"` + ReadOffset uint32 `json:"read-offset"` // read event's offset in the file + SendOffset uint32 `json:"send-offset"` // sent event's offset in the file } // NewFileReader creates a FileReader instance. @@ -60,14 +65,14 @@ func NewFileReader(cfg *FileReaderConfig) Reader { parser := replication.NewBinlogParser() parser.SetVerifyChecksum(true) parser.SetUseDecimal(true) - parser.SetRawMode(cfg.RawMode) - if cfg.timezone != nil { - parser.SetTimestampStringLocation(cfg.timezone) + parser.SetRawMode(cfg.EnableRawMode) + if cfg.Timezone != nil { + parser.SetTimestampStringLocation(cfg.Timezone) } return &FileReader{ parser: parser, - ch: make(chan *replication.BinlogEvent), // without buffer now - ech: make(chan error), + ch: make(chan *replication.BinlogEvent, cfg.ChBufferSize), + ech: make(chan error, cfg.EchBufferSize), } } @@ -124,6 +129,7 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er select { case ev := <-r.ch: + r.sendOffset.Set(ev.Header.LogPos) return ev, nil case err := <-r.ech: return nil, err @@ -134,15 +140,17 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er // Status implements Reader.Status. func (r *FileReader) Status() interface{} { - stage := r.stage.Get() - return &TCPReaderStatus{ - Stage: readerStage(stage).String(), + return &FileReaderStatus{ + Stage: readerStage(r.stage.Get()).String(), + ReadOffset: r.sendOffset.Get(), + SendOffset: r.readOffset.Get(), } } func (r *FileReader) onEvent(ev *replication.BinlogEvent) error { select { case r.ch <- ev: + r.readOffset.Set(ev.Header.LogPos) return nil case <-r.ctx.Done(): return r.ctx.Err() From 86a3967e053da9e63c4d8e4845fc55a619e327bb Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 12 Apr 2019 10:48:53 +0800 Subject: [PATCH 11/28] reader: rename struct --- pkg/binlog/reader/mock_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go index 4450c0929d..b2addb652a 100644 --- a/pkg/binlog/reader/mock_test.go +++ b/pkg/binlog/reader/mock_test.go @@ -30,7 +30,7 @@ var ( type testMockReaderSuite struct { } -type testCase struct { +type testMockCase struct { ev *replication.BinlogEvent err error } @@ -54,7 +54,7 @@ func (t *testMockReaderSuite) TestRead(c *C) { c.Assert(r.StartSyncByGTID(nil), Equals, errSpecial) c.Assert(r.Close(), Equals, errSpecial) - cases := []testCase{ + cases := []testMockCase{ { ev: &replication.BinlogEvent{ RawData: []byte{1}, @@ -96,13 +96,13 @@ func (t *testMockReaderSuite) TestRead(c *C) { } }() - obtained := make([]testCase, 0, len(cases)) + obtained := make([]testMockCase, 0, len(cases)) for { ev, err := r.GetEvent(ctx) if err != nil { - obtained = append(obtained, testCase{ev: nil, err: err}) + obtained = append(obtained, testMockCase{ev: nil, err: err}) } else { - obtained = append(obtained, testCase{ev: ev, err: nil}) + obtained = append(obtained, testMockCase{ev: ev, err: nil}) } if len(obtained) == len(cases) || ctx.Err() != nil { break From af31c57c92340f3741ed2f3ce47ace94058daee8 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 12 Apr 2019 19:35:46 +0800 Subject: [PATCH 12/28] reader: tiny modify FileReader; add test case for FileReader --- pkg/binlog/reader/file.go | 12 +- pkg/binlog/reader/file_test.go | 231 +++++++++++++++++++++++++++++++++ pkg/binlog/reader/tcp_test.go | 2 +- 3 files changed, 241 insertions(+), 4 deletions(-) create mode 100644 pkg/binlog/reader/file_test.go diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 82310d6c59..5eaac4f87a 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -32,14 +32,15 @@ import ( // FileReader is a binlog event reader which read binlog events from a file. type FileReader struct { mu sync.Mutex + wg sync.WaitGroup stage sync2.AtomicInt32 readOffset sync2.AtomicUint32 sendOffset sync2.AtomicUint32 - parser *replication.BinlogParser - ch chan *replication.BinlogEvent - ech chan error + parser *replication.BinlogParser + ch chan *replication.BinlogEvent + ech chan error ctx context.Context cancel context.CancelFunc @@ -86,7 +87,9 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { } r.ctx, r.cancel = context.WithCancel(context.Background()) + r.wg.Add(1) go func() { + defer r.wg.Done() err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) if err != nil { select { @@ -117,6 +120,7 @@ func (r *FileReader) Close() error { r.parser.Stop() r.cancel() + r.wg.Wait() r.stage.Set(int32(stageClosed)) return nil } @@ -135,6 +139,8 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er return nil, err case <-ctx.Done(): return nil, ctx.Err() + case <-r.ctx.Done(): // Reader closed + return nil, r.ctx.Err() } } diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go new file mode 100644 index 0000000000..74208e0f50 --- /dev/null +++ b/pkg/binlog/reader/file_test.go @@ -0,0 +1,231 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package reader + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/errors" + gmysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" + + "github.com/pingcap/dm/pkg/binlog/event" + "github.com/pingcap/dm/pkg/gtid" +) + +var ( + _ = Suite(&testFileReaderSuite{}) +) + +type testFileReaderSuite struct { +} + +func (t *testFileReaderSuite) TestInterfaceMethods(c *C) { + var ( + cfg = &FileReaderConfig{} + gSet gtid.Set // nil GTID set + timeoutCtx, timeoutCancel = context.WithTimeout(context.Background(), 10*time.Second) + ) + defer timeoutCancel() + + r := NewFileReader(cfg) + c.Assert(r, NotNil) + + // check status, stageNew + status := r.Status() + frStatus, ok := status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stageNew.String()) + c.Assert(frStatus.ReadOffset, Equals, uint32(0)) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) + + // not prepared + e, err := r.GetEvent(timeoutCtx) + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), stageNew.String()), IsTrue) + c.Assert(e, IsNil) + + // by GTID, not supported yet + err = r.StartSyncByGTID(gSet) + c.Assert(strings.Contains(err.Error(), "not supported"), IsTrue) + + // by pos + err = r.StartSyncByPos(gmysql.Position{}) + c.Assert(err, IsNil) + + // check status, stagePrepared + status = r.Status() + frStatus, ok = status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stagePrepared.String()) + c.Assert(frStatus.ReadOffset, Equals, uint32(0)) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) + + // re-prepare is invalid + err = r.StartSyncByPos(gmysql.Position{}) + c.Assert(err, NotNil) + + // binlog file not exists + e, err = r.GetEvent(timeoutCtx) + c.Assert(os.IsNotExist(errors.Cause(err)), IsTrue) + c.Assert(e, IsNil) + + // close the reader + c.Assert(r.Close(), IsNil) + + // check status, stageClosed + status = r.Status() + frStatus, ok = status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stageClosed.String()) + c.Assert(frStatus.ReadOffset, Equals, uint32(0)) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) + + // re-close is invalid + c.Assert(r.Close(), NotNil) +} + +func (t *testFileReaderSuite) TestGetEvent(c *C) { + timeoutCtx, timeoutCancel := context.WithTimeout(context.Background(), 10*time.Second) + defer timeoutCancel() + + // create a empty file + dir := c.MkDir() + filename := filepath.Join(dir, "mysql-bin-test.000001") + f, err := os.Create(filename) + c.Assert(err, IsNil) + defer f.Close() + + // start from the beginning + startPos := gmysql.Position{Name: filename} + + // no data can be read, EOF + r := NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err := r.GetEvent(timeoutCtx) + c.Assert(errors.Cause(err), Equals, io.EOF) + c.Assert(e, IsNil) + c.Assert(r.Close(), IsNil) // close the reader + + // writer a binlog file header + _, err = f.Write(replication.BinLogFileHeader) + c.Assert(err, IsNil) + + // no valid events can be read, but can cancel it by the context argument + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + e, err = r.GetEvent(ctx) + c.Assert(errors.Cause(err), Equals, context.DeadlineExceeded) + c.Assert(e, IsNil) + c.Assert(r.Close(), IsNil) // close the reader + + // no valid events can be read, but can cancel it by closing the reader + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + e2, err2 := r.GetEvent(timeoutCtx) + c.Assert(errors.Cause(err2), Equals, context.Canceled) + c.Assert(e2, IsNil) + }() + time.Sleep(time.Second) // wait 1 second + c.Assert(r.Close(), IsNil) // close the reader + wg.Wait() + + // writer a FormatDescriptionEvent + header := &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: uint32(101), + } + latestPos := uint32(len(replication.BinLogFileHeader)) + formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) + c.Assert(err, IsNil) + c.Assert(formatDescEv, NotNil) + _, err = f.Write(formatDescEv.RawData) + c.Assert(err, IsNil) + latestPos = formatDescEv.Header.LogPos + + // got a FormatDescriptionEvent + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e, DeepEquals, formatDescEv) + c.Assert(r.Close(), IsNil) // close the reader + + // check status, stageClosed + fStat, err := f.Stat() + c.Assert(err, IsNil) + fSize := uint32(fStat.Size()) + status := r.Status() + frStatus, ok := status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stageClosed.String()) + c.Assert(frStatus.ReadOffset, Equals, fSize) + c.Assert(frStatus.SendOffset, Equals, fSize) + + // write two QueryEvent + var queryEv *replication.BinlogEvent + for i := 0; i < 2; i++ { + queryEv, err = event.GenQueryEvent( + header, latestPos, 0, 0, 0, nil, + []byte(fmt.Sprintf("schema-%d", i)), []byte(fmt.Sprintf("query-%d", i))) + c.Assert(err, IsNil) + c.Assert(queryEv, NotNil) + _, err = f.Write(queryEv.RawData) + c.Assert(err, IsNil) + latestPos = queryEv.Header.LogPos + } + + // read from the middle + startPos.Pos = latestPos - queryEv.Header.EventSize + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) // always got a FormatDescriptionEvent first + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e.RawData, DeepEquals, queryEv.RawData) // the last QueryEvent + c.Assert(r.Close(), IsNil) // close the reader + + // read from an invalid pos + startPos.Pos-- + r = NewFileReader(&FileReaderConfig{}) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(startPos), IsNil) + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) // always got a FormatDescriptionEvent first + e, err = r.GetEvent(timeoutCtx) + c.Assert(err, NotNil) + c.Assert(strings.Contains(err.Error(), "EOF"), IsTrue) +} diff --git a/pkg/binlog/reader/tcp_test.go b/pkg/binlog/reader/tcp_test.go index 8fd3467989..2a41a30eb2 100644 --- a/pkg/binlog/reader/tcp_test.go +++ b/pkg/binlog/reader/tcp_test.go @@ -204,7 +204,7 @@ func (t *testTCPReaderSuite) TestSyncGTID(c *C) { UseDecimal: true, VerifyChecksum: true, } - gSet gtid.Set // nit GTID set + gSet gtid.Set // nil GTID set ) // the first reader From 6bde450e3000a3ab4fc48516933903c31fcaeaa7 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 12 Apr 2019 20:12:25 +0800 Subject: [PATCH 13/28] reader: tiny fix; add more test cases for FileReader --- pkg/binlog/reader/file.go | 4 +- pkg/binlog/reader/file_test.go | 77 ++++++++++++++++++++++++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 5eaac4f87a..6f04a92c31 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -148,8 +148,8 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er func (r *FileReader) Status() interface{} { return &FileReaderStatus{ Stage: readerStage(r.stage.Get()).String(), - ReadOffset: r.sendOffset.Get(), - SendOffset: r.readOffset.Get(), + ReadOffset: r.readOffset.Get(), + SendOffset: r.sendOffset.Get(), } } diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go index 74208e0f50..7f71030933 100644 --- a/pkg/binlog/reader/file_test.go +++ b/pkg/binlog/reader/file_test.go @@ -229,3 +229,80 @@ func (t *testFileReaderSuite) TestGetEvent(c *C) { c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), "EOF"), IsTrue) } + +func (t *testFileReaderSuite) TestWithChannelBuffer(c *C) { + var ( + cfg = &FileReaderConfig{ChBufferSize: 10} + timeoutCtx, timeoutCancel = context.WithTimeout(context.Background(), 10*time.Second) + ) + defer timeoutCancel() + + // create a empty file + dir := c.MkDir() + filename := filepath.Join(dir, "mysql-bin-test.000001") + f, err := os.Create(filename) + c.Assert(err, IsNil) + defer f.Close() + + // writer a binlog file header + _, err = f.Write(replication.BinLogFileHeader) + c.Assert(err, IsNil) + + // writer a FormatDescriptionEvent + header := &replication.EventHeader{ + Timestamp: uint32(time.Now().Unix()), + ServerID: uint32(101), + } + latestPos := uint32(len(replication.BinLogFileHeader)) + formatDescEv, err := event.GenFormatDescriptionEvent(header, latestPos) + c.Assert(err, IsNil) + c.Assert(formatDescEv, NotNil) + _, err = f.Write(formatDescEv.RawData) + c.Assert(err, IsNil) + latestPos = formatDescEv.Header.LogPos + + // write channelBufferSize QueryEvent + var queryEv *replication.BinlogEvent + for i := 0; i < cfg.ChBufferSize; i++ { + queryEv, err = event.GenQueryEvent( + header, latestPos, 0, 0, 0, nil, + []byte(fmt.Sprintf("schema-%d", i)), []byte(fmt.Sprintf("query-%d", i))) + c.Assert(err, IsNil) + c.Assert(queryEv, NotNil) + _, err = f.Write(queryEv.RawData) + c.Assert(err, IsNil) + latestPos = queryEv.Header.LogPos + } + + r := NewFileReader(cfg) + c.Assert(r, NotNil) + c.Assert(r.StartSyncByPos(gmysql.Position{Name: filename}), IsNil) + time.Sleep(time.Second) // wait events to be read + + // check status, stagePrepared + readOffset := latestPos - queryEv.Header.EventSize // an FormatDescriptionEvent in the channel buffer + status := r.Status() + frStatus, ok := status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stagePrepared.String()) + c.Assert(frStatus.ReadOffset, Equals, readOffset) + c.Assert(frStatus.SendOffset, Equals, uint32(0)) // no event sent yet + + // get one event + e, err := r.GetEvent(timeoutCtx) + c.Assert(err, IsNil) + c.Assert(e, NotNil) + c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) + time.Sleep(time.Second) // wait events to be read + + // check status, again + readOffset = latestPos // reach the end + status = r.Status() + frStatus, ok = status.(*FileReaderStatus) + c.Assert(ok, IsTrue) + c.Assert(frStatus.Stage, Equals, stagePrepared.String()) + c.Assert(frStatus.ReadOffset, Equals, readOffset) + c.Assert(frStatus.SendOffset, Equals, formatDescEv.Header.LogPos) // already get formatDescEv + + c.Assert(r.Close(), IsNil) +} From 93dd964be396e137a271e9ddb388f93d8cb0ae52 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 15 Apr 2019 18:24:52 +0800 Subject: [PATCH 14/28] reader: DeadlineExceeded should be retry in the outer caller --- relay/reader/error.go | 10 ---------- relay/reader/error_test.go | 13 ------------- relay/reader/reader.go | 5 ++--- relay/reader/reader_test.go | 6 ++++-- 4 files changed, 6 insertions(+), 28 deletions(-) diff --git a/relay/reader/error.go b/relay/reader/error.go index 466c8c2e6c..39c7bc2dcb 100644 --- a/relay/reader/error.go +++ b/relay/reader/error.go @@ -19,16 +19,6 @@ import ( "github.com/pingcap/errors" ) -// isRetryableError checks whether the error is retryable. -func isRetryableError(err error) bool { - err = errors.Cause(err) - switch err { - case context.DeadlineExceeded: - return true - } - return false -} - // isIgnorableError checks whether the error is ignorable. func isIgnorableError(err error) bool { err = errors.Cause(err) diff --git a/relay/reader/error_test.go b/relay/reader/error_test.go index 67f7f1962b..6fd61bf194 100644 --- a/relay/reader/error_test.go +++ b/relay/reader/error_test.go @@ -27,19 +27,6 @@ var ( type testErrorSuite struct { } -func (t *testErrorSuite) TestRetryable(c *C) { - err := errors.New("custom error") - c.Assert(isRetryableError(err), IsFalse) - - cases := []error{ - context.DeadlineExceeded, - errors.Annotate(context.DeadlineExceeded, "annotated"), - } - for _, cs := range cases { - c.Assert(isRetryableError(cs), IsTrue) - } -} - func (t *testErrorSuite) TestIgnorable(c *C) { err := errors.New("custom error") c.Assert(isIgnorableError(err), IsFalse) diff --git a/relay/reader/reader.go b/relay/reader/reader.go index eccd9e9419..b6df0ee3b7 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -32,6 +32,7 @@ import ( // The reader should support: // 1. handle expected errors // 2. do retry if possible +// NOTE: some errors still need to be handled in the outer caller. type Reader interface { // Start starts the reading process. Start() error @@ -119,13 +120,11 @@ func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) ev, err := r.in.GetEvent(ctx) if err == nil { return ev, nil - } else if isRetryableError(err) { - log.Warnf("[relay] get event with retryable error %s", err) - continue // retry without sleep } else if isIgnorableError(err) { log.Warnf("[relay] get event with ignorable error %s", err) return nil, nil // return without error and also without binlog event } + // NOTE: add retryable error support if needed later return nil, errors.Trace(err) } } diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go index a81f14b5db..596640f057 100644 --- a/relay/reader/reader_test.go +++ b/relay/reader/reader_test.go @@ -159,11 +159,12 @@ func (t *testReaderSuite) TestGetEventWithError(c *C) { errOther := errors.New("other error") in := []error{ - context.DeadlineExceeded, // retryable + context.DeadlineExceeded, // should be handled in the outer context.Canceled, // ignorable errOther, } expected := []error{ + context.DeadlineExceeded, nil, // from ignorable errOther, } @@ -184,9 +185,10 @@ func (t *testReaderSuite) TestGetEventWithError(c *C) { obtained := make([]error, 0, len(expected)) for { ev, err2 := r.GetEvent(ctx) + err2 = errors.Cause(err2) c.Assert(ev, IsNil) obtained = append(obtained, err2) - if errors.Cause(err2) == errOther { + if err2 == errOther { break // all received } } From 1be78cb98a9f8e2cf8ce960e8f0f3d6f8fa09adb Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 15 Apr 2019 19:39:59 +0800 Subject: [PATCH 15/28] reader: address comments --- pkg/binlog/reader/file.go | 8 ++++---- pkg/binlog/reader/mock_test.go | 4 ++-- pkg/binlog/reader/tcp.go | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 6f04a92c31..b685f9f51a 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -83,7 +83,7 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { defer r.mu.Unlock() if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) + return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage.Get()), stageNew) } r.ctx, r.cancel = context.WithCancel(context.Background()) @@ -128,7 +128,7 @@ func (r *FileReader) Close() error { // GetEvent implements Reader.GetEvent. func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) + return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage.Get()), stagePrepared) } select { @@ -138,9 +138,9 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er case err := <-r.ech: return nil, err case <-ctx.Done(): - return nil, ctx.Err() + return nil, errors.Annotatef(ctx.Err(), "canceled from the caller") case <-r.ctx.Done(): // Reader closed - return nil, r.ctx.Err() + return nil, errors.Annotatef(r.ctx.Err(), "the reader closed") } } diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go index b2addb652a..e08b032785 100644 --- a/pkg/binlog/reader/mock_test.go +++ b/pkg/binlog/reader/mock_test.go @@ -104,12 +104,12 @@ func (t *testMockReaderSuite) TestRead(c *C) { } else { obtained = append(obtained, testMockCase{ev: ev, err: nil}) } - if len(obtained) == len(cases) || ctx.Err() != nil { + c.Assert(ctx.Err(), IsNil) + if len(obtained) == len(cases) { break } } - c.Assert(ctx.Err(), IsNil) c.Assert(obtained, DeepEquals, cases) cancel() // cancel manually diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index d59dc2d0f6..ffaed081ca 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -69,7 +69,7 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { defer r.mu.Unlock() if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) + return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage.Get()), stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -88,7 +88,7 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { defer r.mu.Unlock() if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) + return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage.Get()), stageNew) } if gSet == nil { @@ -136,7 +136,7 @@ func (r *TCPReader) Close() error { // GetEvent implements Reader.GetEvent. func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) + return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage.Get()), stagePrepared) } return r.streamer.GetEvent(ctx) From 83fc175efcf5cb86d8f9ba10c32880abd5c39996 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 15 Apr 2019 19:43:05 +0800 Subject: [PATCH 16/28] reader: address comments --- pkg/binlog/reader/file.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index b685f9f51a..7a5c0c4f32 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -27,6 +27,7 @@ import ( "github.com/siddontang/go/sync2" "github.com/pingcap/dm/pkg/gtid" + "github.com/pingcap/dm/pkg/log" ) // FileReader is a binlog event reader which read binlog events from a file. @@ -92,6 +93,7 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { defer r.wg.Done() err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) if err != nil { + log.Errorf("[file reader] parse binlog file with error %s", errors.ErrorStack(err)) select { case r.ech <- err: case <-r.ctx.Done(): From 43a7808fb984f35a0f137dd3504a42b3343569ad Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 15 Apr 2019 19:59:49 +0800 Subject: [PATCH 17/28] reader: address comments --- relay/reader/reader.go | 37 +++++++++++++++++++------------------ relay/reader/reader_test.go | 22 ---------------------- 2 files changed, 19 insertions(+), 40 deletions(-) diff --git a/relay/reader/reader.go b/relay/reader/reader.go index b6df0ee3b7..24b62b6df2 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -20,7 +20,6 @@ import ( "github.com/pingcap/errors" "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" - "github.com/siddontang/go/sync2" br "github.com/pingcap/dm/pkg/binlog/reader" "github.com/pingcap/dm/pkg/gtid" @@ -56,9 +55,10 @@ type Config struct { // reader implements Reader interface. type reader struct { - cfg *Config - mu sync.Mutex - stage sync2.AtomicInt32 + cfg *Config + + mu sync.RWMutex + stage int32 in br.Reader // the underlying reader used to read binlog events. out chan *replication.BinlogEvent @@ -78,8 +78,8 @@ func (r *reader) Start() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stageNew) + if r.stage != int32(stageNew) { + return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) } defer func() { @@ -94,26 +94,33 @@ func (r *reader) Start() error { err = r.setUpReaderByPos() } - r.stage.Set(int32(stagePrepared)) + r.stage = int32(stagePrepared) return err } // Close implements Reader.Close. func (r *reader) Close() error { - if r.stage.Get() == int32(stageClosed) { + r.mu.Lock() + defer r.mu.Unlock() + + if r.stage == int32(stageClosed) { return errors.New("already closed") } err := r.in.Close() - r.stage.Set(int32(stageClosed)) + r.stage = int32(stageClosed) return errors.Trace(err) } // GetEvent implements Reader.GetEvent. // If some ignorable error occurred, the returned event and error both are nil. +// NOTE: can only close the reader after this returned. func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { - if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) + r.mu.RLock() + defer r.mu.RUnlock() + + if r.stage != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared) } for { @@ -132,13 +139,7 @@ func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) func (r *reader) setUpReaderByGTID() error { gs := r.cfg.GTIDs log.Infof("[relay] start sync for master %s from GTID set %s", r.cfg.MasterID, gs) - err := r.in.StartSyncByGTID(gs) - if err != nil { - log.Errorf("[relay] start sync for master %s from GTID set %s error %v", - r.cfg.MasterID, gs, errors.ErrorStack(err)) - return r.setUpReaderByPos() - } - return nil + return r.in.StartSyncByGTID(gs) } func (r *reader) setUpReaderByPos() error { diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go index 596640f057..56eed7aa99 100644 --- a/relay/reader/reader_test.go +++ b/relay/reader/reader_test.go @@ -105,28 +105,6 @@ func (t *testReaderSuite) testInterfaceWithReader(c *C, r Reader, cases []*repli c.Assert(ev, IsNil) } -func (t *testReaderSuite) TestBackOffGTID(c *C) { - cfg := &Config{ - SyncConfig: replication.BinlogSyncerConfig{ - ServerID: 101, - }, - MasterID: "test-master", - } - errByPos := errors.New("start sync by pos error") - errByGTID := errors.New("start sync by GTID error") - - // test with position - r := NewReader(cfg) - err := t.testBackOffWithReader(c, r, errByPos, errByGTID) - c.Assert(err, Equals, errByPos) - - // test with GTID - cfg.EnableGTID = true - r = NewReader(cfg) - err = t.testBackOffWithReader(c, r, errByPos, errByGTID) - c.Assert(err, Equals, errByPos) // also returned errByPos because backoff to position mode -} - func (t *testReaderSuite) testBackOffWithReader(c *C, r Reader, errByPos error, errByGTID error) error { // replace underlying reader with a mock reader for testing concreteR := r.(*reader) From 59a6a7b32a23bb23ede18af5850bcba8f1a2b5a2 Mon Sep 17 00:00:00 2001 From: Ian Date: Mon, 15 Apr 2019 20:08:40 +0800 Subject: [PATCH 18/28] Update relay/transformer/transformer.go Co-Authored-By: csuzhangxc --- relay/transformer/transformer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay/transformer/transformer.go b/relay/transformer/transformer.go index fbc21e5fbb..10b700725f 100644 --- a/relay/transformer/transformer.go +++ b/relay/transformer/transformer.go @@ -38,7 +38,7 @@ type Result struct { // 1. extract binlog position, GTID info from the event. // 2. decide the event whether needed by a downstream writer. // - the downstream writer may also drop some events according to its strategy. -// NOTE: more works maybe moved from outer into Transformer later. +// NOTE: more features maybe moved from outer into Transformer later. type Transformer interface { // Transform transforms a binlog event. Transform(e *replication.BinlogEvent) *Result From d0b6697a77741df4e5dc957cd45c07c636d8fbaf Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 15 Apr 2019 20:31:17 +0800 Subject: [PATCH 19/28] reader: change Mutex and Atomic to RWMutex --- pkg/binlog/reader/file.go | 31 +++++++++++++++------------ pkg/binlog/reader/file_test.go | 17 --------------- pkg/binlog/reader/reader.go | 1 + pkg/binlog/reader/tcp.go | 39 +++++++++++++++++++--------------- 4 files changed, 41 insertions(+), 47 deletions(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 7a5c0c4f32..05b808d670 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -32,10 +32,10 @@ import ( // FileReader is a binlog event reader which read binlog events from a file. type FileReader struct { - mu sync.Mutex + mu sync.RWMutex wg sync.WaitGroup - stage sync2.AtomicInt32 + stage int32 readOffset sync2.AtomicUint32 sendOffset sync2.AtomicUint32 @@ -83,8 +83,8 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage.Get()), stageNew) + if r.stage != int32(stageNew) { + return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage), stageNew) } r.ctx, r.cancel = context.WithCancel(context.Background()) @@ -101,7 +101,7 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { } }() - r.stage.Set(int32(stagePrepared)) + r.stage = int32(stagePrepared) return nil } @@ -116,21 +116,24 @@ func (r *FileReader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() == int32(stageClosed) { + if r.stage == int32(stageClosed) { return errors.New("already closed") } r.parser.Stop() r.cancel() r.wg.Wait() - r.stage.Set(int32(stageClosed)) + r.stage = int32(stageClosed) return nil } // GetEvent implements Reader.GetEvent. func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { - if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage.Get()), stagePrepared) + r.mu.RLock() + defer r.mu.RUnlock() + + if r.stage != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage), stagePrepared) } select { @@ -140,16 +143,18 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er case err := <-r.ech: return nil, err case <-ctx.Done(): - return nil, errors.Annotatef(ctx.Err(), "canceled from the caller") - case <-r.ctx.Done(): // Reader closed - return nil, errors.Annotatef(r.ctx.Err(), "the reader closed") + return nil, ctx.Err() } } // Status implements Reader.Status. func (r *FileReader) Status() interface{} { + r.mu.RLock() + stage := r.stage + r.mu.RUnlock() + return &FileReaderStatus{ - Stage: readerStage(r.stage.Get()).String(), + Stage: readerStage(stage).String(), ReadOffset: r.readOffset.Get(), SendOffset: r.sendOffset.Get(), } diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go index 7f71030933..d73d545f9b 100644 --- a/pkg/binlog/reader/file_test.go +++ b/pkg/binlog/reader/file_test.go @@ -20,7 +20,6 @@ import ( "os" "path/filepath" "strings" - "sync" "time" . "github.com/pingcap/check" @@ -142,22 +141,6 @@ func (t *testFileReaderSuite) TestGetEvent(c *C) { c.Assert(e, IsNil) c.Assert(r.Close(), IsNil) // close the reader - // no valid events can be read, but can cancel it by closing the reader - r = NewFileReader(&FileReaderConfig{}) - c.Assert(r, NotNil) - c.Assert(r.StartSyncByPos(startPos), IsNil) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - e2, err2 := r.GetEvent(timeoutCtx) - c.Assert(errors.Cause(err2), Equals, context.Canceled) - c.Assert(e2, IsNil) - }() - time.Sleep(time.Second) // wait 1 second - c.Assert(r.Close(), IsNil) // close the reader - wg.Wait() - // writer a FormatDescriptionEvent header := &replication.EventHeader{ Timestamp: uint32(time.Now().Unix()), diff --git a/pkg/binlog/reader/reader.go b/pkg/binlog/reader/reader.go index ddf91b8680..eba9de6b82 100644 --- a/pkg/binlog/reader/reader.go +++ b/pkg/binlog/reader/reader.go @@ -54,6 +54,7 @@ type Reader interface { StartSyncByGTID(gSet gtid.Set) error // Close closes the reader and release the resource. + // Close will be blocked if `GetEvent` has not returned. Close() error // GetEvent gets the binlog event one by one, it will block if no event can be read. diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index ffaed081ca..35a2cccffd 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" gmysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" - "github.com/siddontang/go/sync2" "github.com/pingcap/dm/pkg/gtid" "github.com/pingcap/dm/pkg/log" @@ -32,12 +31,13 @@ import ( // TCPReader is a binlog event reader which read binlog events from a TCP stream. type TCPReader struct { - mu sync.Mutex - - stage sync2.AtomicInt32 syncerCfg replication.BinlogSyncerConfig - syncer *replication.BinlogSyncer - streamer *replication.BinlogStreamer + + mu sync.RWMutex + stage int32 + + syncer *replication.BinlogSyncer + streamer *replication.BinlogStreamer } // TCPReaderStatus represents the status of a TCPReader. @@ -68,8 +68,8 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage.Get()), stageNew) + if r.stage != int32(stageNew) { + return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage), stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -78,7 +78,7 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { } r.streamer = streamer - r.stage.Set(int32(stagePrepared)) + r.stage = int32(stagePrepared) return nil } @@ -87,8 +87,8 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() != int32(stageNew) { - return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage.Get()), stageNew) + if r.stage != int32(stageNew) { + return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage), stageNew) } if gSet == nil { @@ -101,7 +101,7 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { } r.streamer = streamer - r.stage.Set(int32(stagePrepared)) + r.stage = int32(stagePrepared) return nil } @@ -110,7 +110,7 @@ func (r *TCPReader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage.Get() == int32(stageClosed) { + if r.stage == int32(stageClosed) { return errors.New("already closed") } @@ -129,14 +129,17 @@ func (r *TCPReader) Close() error { } } - r.stage.Set(int32(stageClosed)) + r.stage = int32(stageClosed) return nil } // GetEvent implements Reader.GetEvent. func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { - if r.stage.Get() != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage.Get()), stagePrepared) + r.mu.RLock() + defer r.mu.RUnlock() + + if r.stage != int32(stagePrepared) { + return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage), stagePrepared) } return r.streamer.GetEvent(ctx) @@ -144,7 +147,9 @@ func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, err // Status implements Reader.Status. func (r *TCPReader) Status() interface{} { - stage := r.stage.Get() + r.mu.RLock() + stage := r.stage + r.mu.RUnlock() var connID uint32 if stage != int32(stageNew) { From 208aa36ed8c7828b138cddeefaf06d82d3447188 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 16 Apr 2019 16:36:57 +0800 Subject: [PATCH 20/28] *: refine code --- pkg/binlog/reader/file.go | 18 +++++++++--------- pkg/binlog/reader/tcp.go | 26 +++++++++++++------------- relay/reader/reader.go | 16 ++++++++-------- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 05b808d670..fe45c30191 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -35,7 +35,7 @@ type FileReader struct { mu sync.RWMutex wg sync.WaitGroup - stage int32 + stage readerStage readOffset sync2.AtomicUint32 sendOffset sync2.AtomicUint32 @@ -83,8 +83,8 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage != int32(stageNew) { - return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage), stageNew) + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } r.ctx, r.cancel = context.WithCancel(context.Background()) @@ -101,7 +101,7 @@ func (r *FileReader) StartSyncByPos(pos gmysql.Position) error { } }() - r.stage = int32(stagePrepared) + r.stage = stagePrepared return nil } @@ -116,14 +116,14 @@ func (r *FileReader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage == int32(stageClosed) { + if r.stage == stageClosed { return errors.New("already closed") } r.parser.Stop() r.cancel() r.wg.Wait() - r.stage = int32(stageClosed) + r.stage = stageClosed return nil } @@ -132,8 +132,8 @@ func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, er r.mu.RLock() defer r.mu.RUnlock() - if r.stage != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage), stagePrepared) + if r.stage != stagePrepared { + return nil, errors.Errorf("stage %s, expect %s, please start sync first", r.stage, stagePrepared) } select { @@ -154,7 +154,7 @@ func (r *FileReader) Status() interface{} { r.mu.RUnlock() return &FileReaderStatus{ - Stage: readerStage(stage).String(), + Stage: stage.String(), ReadOffset: r.readOffset.Get(), SendOffset: r.sendOffset.Get(), } diff --git a/pkg/binlog/reader/tcp.go b/pkg/binlog/reader/tcp.go index 35a2cccffd..da20113a08 100644 --- a/pkg/binlog/reader/tcp.go +++ b/pkg/binlog/reader/tcp.go @@ -34,7 +34,7 @@ type TCPReader struct { syncerCfg replication.BinlogSyncerConfig mu sync.RWMutex - stage int32 + stage readerStage syncer *replication.BinlogSyncer streamer *replication.BinlogStreamer @@ -68,8 +68,8 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage != int32(stageNew) { - return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage), stageNew) + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } streamer, err := r.syncer.StartSync(pos) @@ -78,7 +78,7 @@ func (r *TCPReader) StartSyncByPos(pos gmysql.Position) error { } r.streamer = streamer - r.stage = int32(stagePrepared) + r.stage = stagePrepared return nil } @@ -87,8 +87,8 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { r.mu.Lock() defer r.mu.Unlock() - if r.stage != int32(stageNew) { - return errors.Errorf("stage %s, expect %s, already started", readerStage(r.stage), stageNew) + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } if gSet == nil { @@ -101,7 +101,7 @@ func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { } r.streamer = streamer - r.stage = int32(stagePrepared) + r.stage = stagePrepared return nil } @@ -110,7 +110,7 @@ func (r *TCPReader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage == int32(stageClosed) { + if r.stage == stageClosed { return errors.New("already closed") } @@ -129,7 +129,7 @@ func (r *TCPReader) Close() error { } } - r.stage = int32(stageClosed) + r.stage = stageClosed return nil } @@ -138,8 +138,8 @@ func (r *TCPReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, err r.mu.RLock() defer r.mu.RUnlock() - if r.stage != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s, please start sync first", readerStage(r.stage), stagePrepared) + if r.stage != stagePrepared { + return nil, errors.Errorf("stage %s, expect %s, please start sync first", r.stage, stagePrepared) } return r.streamer.GetEvent(ctx) @@ -152,11 +152,11 @@ func (r *TCPReader) Status() interface{} { r.mu.RUnlock() var connID uint32 - if stage != int32(stageNew) { + if stage != stageNew { connID = r.syncer.LastConnectionID() } return &TCPReaderStatus{ - Stage: readerStage(stage).String(), + Stage: stage.String(), ConnID: connID, } } diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 24b62b6df2..9d893c6e59 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -58,7 +58,7 @@ type reader struct { cfg *Config mu sync.RWMutex - stage int32 + stage readerStage in br.Reader // the underlying reader used to read binlog events. out chan *replication.BinlogEvent @@ -78,8 +78,8 @@ func (r *reader) Start() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage != int32(stageNew) { - return errors.Errorf("stage %s, expect %s", readerStage(r.stage), stageNew) + if r.stage != stageNew { + return errors.Errorf("stage %s, expect %s", r.stage, stageNew) } defer func() { @@ -94,7 +94,7 @@ func (r *reader) Start() error { err = r.setUpReaderByPos() } - r.stage = int32(stagePrepared) + r.stage = stagePrepared return err } @@ -103,12 +103,12 @@ func (r *reader) Close() error { r.mu.Lock() defer r.mu.Unlock() - if r.stage == int32(stageClosed) { + if r.stage == stageClosed { return errors.New("already closed") } err := r.in.Close() - r.stage = int32(stageClosed) + r.stage = stageClosed return errors.Trace(err) } @@ -119,8 +119,8 @@ func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) r.mu.RLock() defer r.mu.RUnlock() - if r.stage != int32(stagePrepared) { - return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage), stagePrepared) + if r.stage != stagePrepared { + return nil, errors.Errorf("stage %s, expect %s", r.stage, stagePrepared) } for { From 1184ffa9588e1ee7c1bbfb8f3c7b4b8ce26a50c7 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 18 Apr 2019 11:01:32 +0800 Subject: [PATCH 21/28] Update pkg/binlog/reader/file.go Co-Authored-By: csuzhangxc --- pkg/binlog/reader/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index fe45c30191..9199ad1926 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -30,7 +30,7 @@ import ( "github.com/pingcap/dm/pkg/log" ) -// FileReader is a binlog event reader which read binlog events from a file. +// FileReader is a binlog event reader which reads binlog events from a file. type FileReader struct { mu sync.RWMutex wg sync.WaitGroup From ab3418ca43029b1330080058ae6193d61e9cf980 Mon Sep 17 00:00:00 2001 From: amyangfei Date: Thu, 18 Apr 2019 11:02:45 +0800 Subject: [PATCH 22/28] Update relay/reader/reader.go Co-Authored-By: csuzhangxc --- relay/reader/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 9d893c6e59..9f3a6e5c53 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -95,7 +95,7 @@ func (r *reader) Start() error { } r.stage = stagePrepared - return err + return errors.Trace(err) } // Close implements Reader.Close. From 5edf31be6379499d1684515f0af1ffb9051d2320 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Thu, 18 Apr 2019 11:08:54 +0800 Subject: [PATCH 23/28] *: address comments --- pkg/binlog/reader/mock_test.go | 16 +++++++++------- relay/reader/reader.go | 2 +- relay/reader/reader_test.go | 15 --------------- 3 files changed, 10 insertions(+), 23 deletions(-) diff --git a/pkg/binlog/reader/mock_test.go b/pkg/binlog/reader/mock_test.go index e08b032785..8a4e8387ac 100644 --- a/pkg/binlog/reader/mock_test.go +++ b/pkg/binlog/reader/mock_test.go @@ -46,13 +46,15 @@ func (t *testMockReaderSuite) TestRead(c *C) { // replace with special error mockR := r.(*MockReader) - errSpecial := errors.New("special error for methods") - mockR.ErrStartByPos = errSpecial - mockR.ErrStartByGTID = errSpecial - mockR.ErrClose = errSpecial - c.Assert(r.StartSyncByPos(mysql.Position{}), Equals, errSpecial) - c.Assert(r.StartSyncByGTID(nil), Equals, errSpecial) - c.Assert(r.Close(), Equals, errSpecial) + errStartByPos := errors.New("special error for start by pos") + errStartByGTID := errors.New("special error for start by GTID") + errClose := errors.New("special error for close") + mockR.ErrStartByPos = errStartByPos + mockR.ErrStartByGTID = errStartByGTID + mockR.ErrClose = errClose + c.Assert(r.StartSyncByPos(mysql.Position{}), Equals, errStartByPos) + c.Assert(r.StartSyncByGTID(nil), Equals, errStartByGTID) + c.Assert(r.Close(), Equals, errClose) cases := []testMockCase{ { diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 9f3a6e5c53..bcac74a846 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -125,13 +125,13 @@ func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) for { ev, err := r.in.GetEvent(ctx) + // NOTE: add retryable error support if needed later if err == nil { return ev, nil } else if isIgnorableError(err) { log.Warnf("[relay] get event with ignorable error %s", err) return nil, nil // return without error and also without binlog event } - // NOTE: add retryable error support if needed later return nil, errors.Trace(err) } } diff --git a/relay/reader/reader_test.go b/relay/reader/reader_test.go index 56eed7aa99..5135faace9 100644 --- a/relay/reader/reader_test.go +++ b/relay/reader/reader_test.go @@ -105,21 +105,6 @@ func (t *testReaderSuite) testInterfaceWithReader(c *C, r Reader, cases []*repli c.Assert(ev, IsNil) } -func (t *testReaderSuite) testBackOffWithReader(c *C, r Reader, errByPos error, errByGTID error) error { - // replace underlying reader with a mock reader for testing - concreteR := r.(*reader) - c.Assert(concreteR, NotNil) - mockR := br.NewMockReader() - concreteR.in = mockR - - // specify an error for StartSyncByPos/StartSyncByGTID - concreteMR := mockR.(*br.MockReader) - concreteMR.ErrStartByPos = errByPos - concreteMR.ErrStartByGTID = errByGTID - - return r.Start() -} - func (t *testReaderSuite) TestGetEventWithError(c *C) { cfg := &Config{ SyncConfig: replication.BinlogSyncerConfig{ From b8c006a5a49521b7403e864c45083edcf4e46601 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Thu, 18 Apr 2019 16:01:18 +0800 Subject: [PATCH 24/28] reader: refine error msg --- relay/reader/reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relay/reader/reader.go b/relay/reader/reader.go index bcac74a846..3f1e9f3be3 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -79,7 +79,7 @@ func (r *reader) Start() error { defer r.mu.Unlock() if r.stage != stageNew { - return errors.Errorf("stage %s, expect %s", r.stage, stageNew) + return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } defer func() { @@ -120,7 +120,7 @@ func (r *reader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) defer r.mu.RUnlock() if r.stage != stagePrepared { - return nil, errors.Errorf("stage %s, expect %s", r.stage, stagePrepared) + return nil, errors.Errorf("stage %s, expect %s, please start the reader first", r.stage, stagePrepared) } for { From a5e98b77915bfa4c07feaae9a6bb592b62961de0 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 19 Apr 2019 10:35:59 +0800 Subject: [PATCH 25/28] *: refine code --- pkg/binlog/reader/file_test.go | 9 +++------ relay/reader/reader.go | 4 ++-- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go index d73d545f9b..161cce28be 100644 --- a/pkg/binlog/reader/file_test.go +++ b/pkg/binlog/reader/file_test.go @@ -19,7 +19,6 @@ import ( "io" "os" "path/filepath" - "strings" "time" . "github.com/pingcap/check" @@ -59,13 +58,12 @@ func (t *testFileReaderSuite) TestInterfaceMethods(c *C) { // not prepared e, err := r.GetEvent(timeoutCtx) - c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), stageNew.String()), IsTrue) + c.Assert(err, ErrorMatches, fmt.Sprintf(".*%s.*", stageNew)) c.Assert(e, IsNil) // by GTID, not supported yet err = r.StartSyncByGTID(gSet) - c.Assert(strings.Contains(err.Error(), "not supported"), IsTrue) + c.Assert(err, ErrorMatches, ".*not supported.*") // by pos err = r.StartSyncByPos(gmysql.Position{}) @@ -209,8 +207,7 @@ func (t *testFileReaderSuite) TestGetEvent(c *C) { c.Assert(err, IsNil) c.Assert(e.RawData, DeepEquals, formatDescEv.RawData) // always got a FormatDescriptionEvent first e, err = r.GetEvent(timeoutCtx) - c.Assert(err, NotNil) - c.Assert(strings.Contains(err.Error(), "EOF"), IsTrue) + c.Assert(err, ErrorMatches, ".*EOF.*") } func (t *testFileReaderSuite) TestWithChannelBuffer(c *C) { diff --git a/relay/reader/reader.go b/relay/reader/reader.go index 3f1e9f3be3..68036f026d 100644 --- a/relay/reader/reader.go +++ b/relay/reader/reader.go @@ -81,6 +81,7 @@ func (r *reader) Start() error { if r.stage != stageNew { return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew) } + r.stage = stagePrepared defer func() { status := r.in.Status() @@ -94,7 +95,6 @@ func (r *reader) Start() error { err = r.setUpReaderByPos() } - r.stage = stagePrepared return errors.Trace(err) } @@ -106,9 +106,9 @@ func (r *reader) Close() error { if r.stage == stageClosed { return errors.New("already closed") } + r.stage = stageClosed err := r.in.Close() - r.stage = stageClosed return errors.Trace(err) } From 521ad4001cbfc3ccd8946129a15d2a5f8770e995 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 19 Apr 2019 15:35:39 +0800 Subject: [PATCH 26/28] reader: add FileReaderStatus string representation --- pkg/binlog/reader/file.go | 10 ++++++++++ pkg/binlog/reader/file_test.go | 3 +++ 2 files changed, 13 insertions(+) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 9199ad1926..78f59ae162 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -18,6 +18,7 @@ package reader import ( "context" + "encoding/json" "sync" "time" @@ -62,6 +63,15 @@ type FileReaderStatus struct { SendOffset uint32 `json:"send-offset"` // sent event's offset in the file } +// String implements Stringer.String. +func (s *FileReaderStatus) String() string { + data, err := json.Marshal(s) + if err != nil { + log.Errorf("[FileReaderStatus] marshal status to json error %v", err) + } + return string(data) +} + // NewFileReader creates a FileReader instance. func NewFileReader(cfg *FileReaderConfig) Reader { parser := replication.NewBinlogParser() diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go index 161cce28be..fcf7b4e01e 100644 --- a/pkg/binlog/reader/file_test.go +++ b/pkg/binlog/reader/file_test.go @@ -19,6 +19,7 @@ import ( "io" "os" "path/filepath" + "strings" "time" . "github.com/pingcap/check" @@ -55,6 +56,8 @@ func (t *testFileReaderSuite) TestInterfaceMethods(c *C) { c.Assert(frStatus.Stage, Equals, stageNew.String()) c.Assert(frStatus.ReadOffset, Equals, uint32(0)) c.Assert(frStatus.SendOffset, Equals, uint32(0)) + frStatusStr := frStatus.String() + c.Assert(strings.Contains(frStatusStr, stageNew.String()), IsTrue) // not prepared e, err := r.GetEvent(timeoutCtx) From 0e318d11a85a29feab9c66618c1636a742a84ed4 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 22 Apr 2019 10:11:20 +0800 Subject: [PATCH 27/28] reader: address comment --- pkg/binlog/reader/file_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/binlog/reader/file_test.go b/pkg/binlog/reader/file_test.go index fcf7b4e01e..5e21bfea30 100644 --- a/pkg/binlog/reader/file_test.go +++ b/pkg/binlog/reader/file_test.go @@ -19,7 +19,6 @@ import ( "io" "os" "path/filepath" - "strings" "time" . "github.com/pingcap/check" @@ -57,7 +56,7 @@ func (t *testFileReaderSuite) TestInterfaceMethods(c *C) { c.Assert(frStatus.ReadOffset, Equals, uint32(0)) c.Assert(frStatus.SendOffset, Equals, uint32(0)) frStatusStr := frStatus.String() - c.Assert(strings.Contains(frStatusStr, stageNew.String()), IsTrue) + c.Assert(frStatusStr, Matches, fmt.Sprintf(`.*"stage":"%s".*`, stageNew)) // not prepared e, err := r.GetEvent(timeoutCtx) From 3f146d0356d32da7a3f132fd23846015fba8c873 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 22 Apr 2019 13:24:35 +0800 Subject: [PATCH 28/28] reader: address comment --- pkg/binlog/reader/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/binlog/reader/file.go b/pkg/binlog/reader/file.go index 78f59ae162..30b3c1fb91 100644 --- a/pkg/binlog/reader/file.go +++ b/pkg/binlog/reader/file.go @@ -130,9 +130,9 @@ func (r *FileReader) Close() error { return errors.New("already closed") } - r.parser.Stop() r.cancel() r.wg.Wait() + r.parser.Stop() r.stage = stageClosed return nil }