-
Notifications
You must be signed in to change notification settings - Fork 188
*: add Reader
, Transformer
for relay log
#108
Conversation
… offset status for FileReader
@GregoryIan @amyangfei PTAL |
Reader
, Transformer
for relay log
@@ -159,6 +159,33 @@ func GenFormatDescriptionEvent(header *replication.EventHeader, latestPos uint32 | |||
return ev, errors.Trace(err) | |||
} | |||
|
|||
// GenRotateEvent generates a RotateEvent. | |||
// ref: https://dev.mysql.com/doc/internals/en/rotate-event.html | |||
func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogName []byte, position uint64) (*replication.BinlogEvent, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's difference between latestPos
and position
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestPos
is the previous end position of the latest events or the start position of this event.
position
is the rotate target position for this event or the start position of the next event.
err := r.parser.ParseFile(pos.Name, int64(pos.Pos), r.onEvent) | ||
if err != nil { | ||
select { | ||
case r.ech <- err: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
at least output error information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed. log the error before the select
.
pkg/binlog/reader/file.go
Outdated
// GetEvent implements Reader.GetEvent. | ||
func (r *FileReader) GetEvent(ctx context.Context) (*replication.BinlogEvent, error) { | ||
if r.stage.Get() != int32(stagePrepared) { | ||
return nil, errors.Errorf("stage %s, expect %s", readerStage(r.stage.Get()), stagePrepared) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
user/debuger need to know these stage very well, it's better to add a little annotations, like "please start sync"
pkg/binlog/reader/file.go
Outdated
case <-ctx.Done(): | ||
return nil, ctx.Err() | ||
case <-r.ctx.Done(): // Reader closed | ||
return nil, r.ctx.Err() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should add more annotations to distinguish errorr.ctx.Err()
with errorctx.Err()
at L141
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added removed r.ctx.Done()
. changed the definition. Close
will be blocked if GetEvent
has not returned (use RWMutex
rather than Mutex
and AtomicInt32
)
pkg/binlog/reader/mock_test.go
Outdated
} | ||
} | ||
|
||
c.Assert(ctx.Err(), IsNil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's contradict with || ctx.Err() != nil
at L107, can we allow it's timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|| ctx.Err() != nil
is used to break the for
loop if there are bugs.
if timemout, then the test should be failed in this c.Assert(ctx.Err(), IsNil)
.
we can put c.Assert(ctx.Err(), IsNil)
into the for
loop.
} | ||
|
||
// Close implements Reader.Close. | ||
func (r *reader) Close() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lack of lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️ added. and change Mutex
to RWMutex
, and remove the atomic variable check
relay/reader/reader.go
Outdated
ev, err := r.in.GetEvent(ctx) | ||
if err == nil { | ||
return ev, nil | ||
} else if isRetryableError(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if ctx
is timeout, it would be infinite loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed timeout error handing in this reader, that should be handled in the outer caller.
relay/reader/reader.go
Outdated
if err != nil { | ||
log.Errorf("[relay] start sync for master %s from GTID set %s error %v", | ||
r.cfg.MasterID, gs, errors.ErrorStack(err)) | ||
return r.setUpReaderByPos() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can't try setUpReaderByPos
on some errors, like stage is not right. And if we need to try setUpReaderByPos
, we should check position to avoid it read binlogs from oldest position 💀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, let's put this error to be handled by the outer or even the user.
Ignore bool // whether the event should be ignored | ||
LogPos uint32 // binlog event's End_log_pos or Position in RotateEvent | ||
NextLogName string // next binlog filename, only valid for RotateEvent | ||
GTIDSet mysql.GTIDSet // GTIDSet got from QueryEvent and XIDEvent when RawModeEnabled not true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's gtid or gtidset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 It's gtid set. It's GTID in one binlog event, but go-mysql returns a GTID set (including the previous/passed in GTID set)
) | ||
|
||
// Result represents a transform result. | ||
type Result struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this design very well, like
- if some binlog is ignored, should writer write it?
- what's its transform rules?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
binlog event may be ignored in multiple steps in the processing chain, at least:
- some fake/artificial events, which will be marked as ignored in this transformer
- some obsolete events (like FormatDescriptionEvent), which will be ignored by the writer
maybe the name of transformer
is not accurate, and in fact, more features may be added into this later.
Co-Authored-By: csuzhangxc <csuzhangxc@gmail.com>
/run-all-tests |
@GregoryIan PTAL again |
Co-Authored-By: csuzhangxc <csuzhangxc@gmail.com>
Co-Authored-By: csuzhangxc <csuzhangxc@gmail.com>
@GregoryIan @amyangfei PTAL again |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pkg/binlog/reader/file.go
Outdated
return errors.New("already closed") | ||
} | ||
|
||
r.parser.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to put it after L135, even I know it's ok after I review its implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed
LGTM |
What problem does this PR solve?
this PR is one part of #91
What is changed and how it works?
MockReader
used for testingFileReader
used for:GenRotateEvent
to generateRoateEvent
for testingReader
(wrapTCPReader
) used to read binlog events from masterTransformer
used to transform binlog eventsCheck List
Tests
for relay log, at least following things needed to do:
Writer
used to write binlog events to relay log fileReader
,Transformer
andWriter
together to pull binlog events from a master server and write them to filesI'll do them in some PRs later.
the flow of relay log (without master-slave switch) will be:
TCPReader
read binlog events from the masterReader
handle possible errorsTransfomer
get events fromReader
and transform themWriter
get events fromTransformer
and write them to disk (with aFileWriter
)