Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: add Reader, Transformer for relay log #108

Merged
merged 30 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
81d0b46
reader: simple wrapper for TCPReader to reader binlog events from master
csuzhangxc Apr 9, 2019
0e7747e
reader: add a MockReader
csuzhangxc Apr 10, 2019
9a6553a
reader: test reader with MockReader
csuzhangxc Apr 10, 2019
ef76773
*: test GTID backoff to position
csuzhangxc Apr 10, 2019
9fa15ad
reader: get event handle retryable and ignorable error
csuzhangxc Apr 10, 2019
33e22e0
reader: add test cases for error
csuzhangxc Apr 10, 2019
1f5bc1e
event: add GenRotateEvent
csuzhangxc Apr 11, 2019
76ba02c
*: add a transformer
csuzhangxc Apr 11, 2019
0500128
reader: add a FileReader
csuzhangxc Apr 11, 2019
2f9db5b
reader: add channel buffer size control for FileReader; add read/send…
csuzhangxc Apr 12, 2019
86a3967
reader: rename struct
csuzhangxc Apr 12, 2019
af31c57
reader: tiny modify FileReader; add test case for FileReader
csuzhangxc Apr 12, 2019
de5dbf6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer
csuzhangxc Apr 12, 2019
6bde450
reader: tiny fix; add more test cases for FileReader
csuzhangxc Apr 12, 2019
93dd964
reader: DeadlineExceeded should be retry in the outer caller
csuzhangxc Apr 15, 2019
1be78cb
reader: address comments
csuzhangxc Apr 15, 2019
83fc175
reader: address comments
csuzhangxc Apr 15, 2019
43a7808
reader: address comments
csuzhangxc Apr 15, 2019
59a6a7b
Update relay/transformer/transformer.go
IANTHEREAL Apr 15, 2019
d0b6697
reader: change Mutex and Atomic to RWMutex
csuzhangxc Apr 15, 2019
ef1f976
Merge remote-tracking branch 'origin/relay-writer' into relay-writer
csuzhangxc Apr 15, 2019
208aa36
*: refine code
csuzhangxc Apr 16, 2019
1184ffa
Update pkg/binlog/reader/file.go
amyangfei Apr 18, 2019
ab3418c
Update relay/reader/reader.go
amyangfei Apr 18, 2019
5edf31b
*: address comments
csuzhangxc Apr 18, 2019
b8c006a
reader: refine error msg
csuzhangxc Apr 18, 2019
a5e98b7
*: refine code
csuzhangxc Apr 19, 2019
521ad40
reader: add FileReaderStatus string representation
csuzhangxc Apr 19, 2019
0e318d1
reader: address comment
csuzhangxc Apr 22, 2019
3f146d0
reader: address comment
csuzhangxc Apr 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,33 @@ func GenFormatDescriptionEvent(header *replication.EventHeader, latestPos uint32
return ev, errors.Trace(err)
}

// GenRotateEvent generates a RotateEvent.
// ref: https://dev.mysql.com/doc/internals/en/rotate-event.html
func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogName []byte, position uint64) (*replication.BinlogEvent, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's difference between latestPos and position?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

latestPos is the previous end position of the latest events or the start position of this event.
position is the rotate target position for this event or the start position of the next event.

if len(nextLogName) == 0 {
return nil, errors.NotValidf("empty next binlog name")
}

// Post-header
postHeader := new(bytes.Buffer)
err := binary.Write(postHeader, binary.LittleEndian, position)
if err != nil {
return nil, errors.Annotatef(err, "write position %d", position)
}

// Payload
payload := new(bytes.Buffer)
err = binary.Write(payload, binary.LittleEndian, nextLogName)
if err != nil {
return nil, errors.Annotatef(err, "write next binlog name % X", nextLogName)
}

buf := new(bytes.Buffer)
event := &replication.RotateEvent{}
ev, err := assembleEvent(buf, event, false, *header, replication.ROTATE_EVENT, latestPos, postHeader.Bytes(), payload.Bytes())
return ev, errors.Trace(err)
}

// GenPreviousGTIDsEvent generates a PreviousGTIDsEvent.
// go-mysql has no PreviousGTIDsEvent struct defined, so return the event's raw data instead.
// MySQL has no internal doc for PREVIOUS_GTIDS_EVENT.
Expand Down
34 changes: 34 additions & 0 deletions pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,40 @@ func (t *testEventSuite) TestGenFormatDescriptionEvent(c *C) {
c.Assert(err, IsNil)
}

func (t *testEventSuite) TestGenRotateEvent(c *C) {
var (
header = &replication.EventHeader{
Timestamp: uint32(time.Now().Unix()),
ServerID: 11,
Flags: 0x01,
}
latestPos uint32 = 4
nextLogName []byte // nil
position uint64 = 123
)

// empty nextLogName, invalid
rotateEv, err := GenRotateEvent(header, latestPos, nextLogName, position)
c.Assert(err, NotNil)
c.Assert(rotateEv, IsNil)

// valid nextLogName
nextLogName = []byte("mysql-bin.000010")
rotateEv, err = GenRotateEvent(header, latestPos, nextLogName, position)
c.Assert(err, IsNil)
c.Assert(rotateEv, NotNil)

// verify the header
verifyHeader(c, rotateEv.Header, header, replication.ROTATE_EVENT, latestPos, uint32(len(rotateEv.RawData)))

// verify the body
rotateEvBody, ok := rotateEv.Event.(*replication.RotateEvent)
c.Assert(ok, IsTrue)
c.Assert(rotateEvBody, NotNil)
c.Assert(rotateEvBody.NextLogName, DeepEquals, nextLogName)
c.Assert(rotateEvBody.Position, Equals, position)
}

func (t *testEventSuite) TestGenPreviousGTIDsEvent(c *C) {
var (
header = &replication.EventHeader{
Expand Down
181 changes: 181 additions & 0 deletions pkg/binlog/reader/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// binlog events generator for MySQL used to generate some binlog events for tests.
// Readability takes precedence over performance.

package reader

import (
"context"
"encoding/json"
"sync"
"time"

"github.com/pingcap/errors"
gmysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go/sync2"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"
)

// FileReader is a binlog event reader which reads binlog events from a file.
type FileReader struct {
mu sync.RWMutex
wg sync.WaitGroup

stage readerStage
readOffset sync2.AtomicUint32
sendOffset sync2.AtomicUint32

parser *replication.BinlogParser
ch chan *replication.BinlogEvent
ech chan error

ctx context.Context
cancel context.CancelFunc
}

// FileReaderConfig is the configuration used by a FileReader.
type FileReaderConfig struct {
EnableRawMode bool
Timezone *time.Location
ChBufferSize int // event channel's buffer size
EchBufferSize int // error channel's buffer size
}

// FileReaderStatus represents the status of a FileReader.
type FileReaderStatus struct {
Stage string `json:"stage"`
ReadOffset uint32 `json:"read-offset"` // read event's offset in the file
SendOffset uint32 `json:"send-offset"` // sent event's offset in the file
}

// String implements Stringer.String.
func (s *FileReaderStatus) String() string {
data, err := json.Marshal(s)
if err != nil {
log.Errorf("[FileReaderStatus] marshal status to json error %v", err)
}
return string(data)
}

// NewFileReader creates a FileReader instance.
func NewFileReader(cfg *FileReaderConfig) Reader {
parser := replication.NewBinlogParser()
parser.SetVerifyChecksum(true)
parser.SetUseDecimal(true)
parser.SetRawMode(cfg.EnableRawMode)
if cfg.Timezone != nil {
parser.SetTimestampStringLocation(cfg.Timezone)
}
return &FileReader{
parser: parser,
ch: make(chan *replication.BinlogEvent, cfg.ChBufferSize),
ech: make(chan error, cfg.EchBufferSize),
}
}

// StartSyncByPos implements Reader.StartSyncByPos.
func (r *FileReader) StartSyncByPos(pos gmysql.Position) error {
r.mu.Lock()
defer r.mu.Unlock()

if r.stage != stageNew {
return errors.Errorf("stage %s, expect %s, already started", r.stage, stageNew)
}

r.ctx, r.cancel = context.WithCancel(context.Background())
r.wg.Add(1)
go func() {
defer r.wg.Done()
err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent)
if err != nil {
log.Errorf("[file reader] parse binlog file with error %s", errors.ErrorStack(err))
select {
case r.ech <- err:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at least output error information

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed. log the error before the select.

case <-r.ctx.Done():
}
}
}()

r.stage = stagePrepared
return nil
}

// StartSyncByGTID implements Reader.StartSyncByGTID.
func (r *FileReader) StartSyncByGTID(gSet gtid.Set) error {
// NOTE: may be supported later.
return errors.NotSupportedf("read from file by GTID")
}

// Close implements Reader.Close.
func (r *FileReader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()

if r.stage == stageClosed {
return errors.New("already closed")
}

r.parser.Stop()
Copy link
Collaborator

@IANTHEREAL IANTHEREAL Apr 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's better to put it after L135, even I know it's ok after I review its implementation

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

r.cancel()
r.wg.Wait()
r.stage = stageClosed
return nil
}

// GetEvent implements Reader.GetEvent.
func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) {
r.mu.RLock()
defer r.mu.RUnlock()

if r.stage != stagePrepared {
return nil, errors.Errorf("stage %s, expect %s, please start sync first", r.stage, stagePrepared)
}

select {
case ev := <-r.ch:
r.sendOffset.Set(ev.Header.LogPos)
return ev, nil
case err := <-r.ech:
return nil, err
case <-ctx.Done():
return nil, ctx.Err()
}
}

// Status implements Reader.Status.
func (r *FileReader) Status() interface{} {
r.mu.RLock()
stage := r.stage
r.mu.RUnlock()

return &FileReaderStatus{
Stage: stage.String(),
ReadOffset: r.readOffset.Get(),
SendOffset: r.sendOffset.Get(),
}
}

func (r *FileReader) onEvent(ev *replication.BinlogEvent) error {
select {
case r.ch <- ev:
r.readOffset.Set(ev.Header.LogPos)
return nil
case <-r.ctx.Done():
return r.ctx.Err()
}
}
Loading