From fa8892be4c22f53f50fc109b2a1429a3e7028614 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 9 Aug 2019 20:10:36 +0800 Subject: [PATCH 1/8] debug-tools: add a simple binlog event black hole tool --- debug-tools/binlog-event-blackhole/config.go | 42 +++++++ debug-tools/binlog-event-blackhole/fetcher.go | 84 ++++++++++++++ debug-tools/binlog-event-blackhole/main.go | 55 +++++++++ debug-tools/binlog-event-blackhole/packet.go | 104 ++++++++++++++++++ 4 files changed, 285 insertions(+) create mode 100644 debug-tools/binlog-event-blackhole/config.go create mode 100644 debug-tools/binlog-event-blackhole/fetcher.go create mode 100644 debug-tools/binlog-event-blackhole/main.go create mode 100644 debug-tools/binlog-event-blackhole/packet.go diff --git a/debug-tools/binlog-event-blackhole/config.go b/debug-tools/binlog-event-blackhole/config.go new file mode 100644 index 0000000000..077a5eaca3 --- /dev/null +++ b/debug-tools/binlog-event-blackhole/config.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + + "github.com/pingcap/errors" +) + +// config is the configuration used by this binlog-event-blackhole. +type config struct { + *flag.FlagSet + + addr string + username string + password string + serverID int + binlogName string + binlogPos int +} + +// newConfig creates a new config instance. +func newConfig() *config { + cfg := &config{ + FlagSet: flag.NewFlagSet("binlog-event-blackhole", flag.ContinueOnError), + } + fs := cfg.FlagSet + + fs.StringVar(&cfg.addr, "addr", "", "master's address") + fs.StringVar(&cfg.username, "username", "", "master's username") + fs.StringVar(&cfg.password, "password", "", "password for `username`") + fs.IntVar(&cfg.serverID, "server-id", 0, "slave's server-id") + fs.StringVar(&cfg.binlogName, "binlog-name", "", "startup binlog filename") + fs.IntVar(&cfg.binlogPos, "binlog-pos", 0, "startup binlog position") + + return cfg +} + +// parse parses flag definitions from the argument list. +func (c *config) parse(args []string) error { + err := c.FlagSet.Parse(args) + return errors.Trace(err) +} diff --git a/debug-tools/binlog-event-blackhole/fetcher.go b/debug-tools/binlog-event-blackhole/fetcher.go new file mode 100644 index 0000000000..efcc9235aa --- /dev/null +++ b/debug-tools/binlog-event-blackhole/fetcher.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "fmt" + + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/client" +) + +// registerSlave register a fake slave on the master. +func registerSlave(addr, username, password string, serverID uint32) (*client.Conn, error) { + conn, err := client.Connect(addr, username, password, "", func(c *client.Conn) { + }) + if err != nil { + return nil, errors.Trace(err) + } + + // takes as an indication that the client is checksum-aware. + // ref https://dev.mysql.com/doc/refman/8.0/en/c-api-binary-log-functions.html. + _, err = conn.Execute(`SET @master_binlog_checksum='NONE'`) + if err != nil { + return nil, errors.Annotate(err, `SET @master_binlog_checksum='NONE'`) + } + + conn.ResetSequence() + packet := registerSlaveCommand(username, password, serverID) + err = conn.WritePacket(packet) + if err != nil { + return nil, errors.Annotatef(err, "write COM_REGISTER_SLAVE packet %v", packet) + } + + _, err = conn.ReadOKPacket() + if err != nil { + return nil, errors.Annotate(err, "read OK packet") + } + + return conn, nil +} + +// startSync starts to sync binlog event from . +func startSync(conn *client.Conn, serverID uint32, name string, pos uint32) error { + conn.ResetSequence() + packet := dumpCommand(serverID, name, pos) + err := conn.WritePacket(packet) + return errors.Trace(err) +} + +// killConn kills the connection to the master server. +func killConn(conn *client.Conn) error { + query := fmt.Sprintf(`KILL %d`, conn.GetConnectionID()) + _, err := conn.Execute(query) + return errors.Annotate(err, query) +} + +// readEventsWithGoMySQL reads binlog events from the master server with `go-mysql` pkg. +func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) error { + for { + select { + case <-ctx.Done(): + return nil + default: + } + + data, err := conn.ReadPacket() + if err != nil { + return errors.Trace(err) + } + + switch data[0] { + case 0x00: + continue // count event + case 0xff: + return errors.New("read event fail") + case 0xfe: + // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html + fmt.Println("receive EOF packet, retry ReadPacket") + continue + default: + fmt.Printf("invalid stream header %c\n", data[0]) + continue + } + } +} diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go new file mode 100644 index 0000000000..5cfead896e --- /dev/null +++ b/debug-tools/binlog-event-blackhole/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" +) + +func main() { + cfg := newConfig() + err := cfg.parse(os.Args[1:]) + if err != nil { + panic(err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + go func() { + <-sc + cancel() + }() + + conn, err := registerSlave(cfg.addr, cfg.username, cfg.password, uint32(cfg.serverID)) + if err != nil { + panic(err) + } + defer func() { + killConn(conn) + conn.Close() + }() + + fmt.Printf("registered slave as %d\n", conn.GetConnectionID()) + + err = startSync(conn, uint32(cfg.serverID), cfg.binlogName, uint32(cfg.binlogPos)) + if err != nil { + panic(err) + } + + fmt.Println("okok") + + err = readEventsWithGoMySQL(ctx, conn) + if err != nil { + panic(err) + } + + fmt.Println("haha") +} diff --git a/debug-tools/binlog-event-blackhole/packet.go b/debug-tools/binlog-event-blackhole/packet.go new file mode 100644 index 0000000000..7d773488c1 --- /dev/null +++ b/debug-tools/binlog-event-blackhole/packet.go @@ -0,0 +1,104 @@ +package main + +import ( + "encoding/binary" + "io" + "os" +) + +// registerSlaveCommand generates a COM_REGISTER_SLAVE command (with uninitialized header). +// ref https://dev.mysql.com/doc/internals/en/com-register-slave.html. +func registerSlaveCommand(username, password string, serverID uint32) []byte { + hn, _ := os.Hostname() + + packet := make([]byte, 4+1+4+1+len(hn)+1+len(username)+1+len(password)+2+4+4) + offset := 4 // 4 bytes header + + packet[offset] = 0x15 // COM_REGISTER_SLAVE + offset++ + + binary.LittleEndian.PutUint32(packet[offset:], serverID) + offset += 4 + + packet[offset] = uint8(len(hn)) + offset++ + n := copy(packet[offset:], hn) + offset += n + + packet[offset] = uint8(len(username)) + offset++ + n = copy(packet[offset:], username) + offset += n + + packet[offset] = uint8(len(password)) + offset++ + n = copy(packet[offset:], password) + offset += n + + binary.LittleEndian.PutUint16(packet[offset:], 0) + offset += 2 + + binary.LittleEndian.PutUint32(packet[offset:], 0) + offset += 4 + + binary.LittleEndian.PutUint32(packet[offset:], 0) + + setCommandHeader(packet, 0) + + return packet +} + +// dumpCommand generates a COM_BINLOG_DUMP command. +// ref https://dev.mysql.com/doc/internals/en/com-binlog-dump.html. +func dumpCommand(serverID uint32, name string, pos uint32) []byte { + packet := make([]byte, 4+1+4+2+4+len(name)) + offset := 4 // 4 bytes header + + packet[offset] = 0x12 // COM_BINLOG_DUMP + offset++ + + binary.LittleEndian.PutUint32(packet[offset:], pos) + offset += 4 + + binary.LittleEndian.PutUint16(packet[offset:], 0x00) // BINLOG_DUMP_NEVER_STOP + offset += 2 + + binary.LittleEndian.PutUint32(packet[offset:], serverID) + offset += 4 + + copy(packet[offset:], name) + + setCommandHeader(packet, 0) + + return packet +} + +// setCommandHeader set the header of the command inplace. +// ref https://dev.mysql.com/doc/internals/en/mysql-packet.html. +func setCommandHeader(packet []byte, seqID uint8) { + // do not handle a payload which is larger than or equal to 2^24−1 bytes (16 MB) now. + length := len(packet) - 4 + packet[0] = byte(length) + packet[1] = byte(length >> 8) + packet[2] = byte(length >> 16) + + packet[3] = seqID +} + +// readPacket tries to read a MySQL packet (header & payload) from the Reader. +func readPacket(r io.Reader) ([]byte, []byte, error) { + header := []byte{0, 0, 0, 0} + _, err := io.ReadFull(r, header) + if err != nil { + return nil, nil, err + } + + length := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) + payload := make([]byte, length) + _, err = io.ReadFull(r, payload) + if err != nil { + return nil, nil, err + } + + return header, payload, nil +} From b4b942ce42e4208b2f40b1d6ef96f50dec57a3b1 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 12 Aug 2019 14:30:31 +0800 Subject: [PATCH 2/8] debug-tools: refine code; count binlog event --- debug-tools/binlog-event-blackhole/config.go | 10 ++- debug-tools/binlog-event-blackhole/fetcher.go | 51 +++++++++------ debug-tools/binlog-event-blackhole/main.go | 64 +++++++++++++------ 3 files changed, 85 insertions(+), 40 deletions(-) diff --git a/debug-tools/binlog-event-blackhole/config.go b/debug-tools/binlog-event-blackhole/config.go index 077a5eaca3..d5157270da 100644 --- a/debug-tools/binlog-event-blackhole/config.go +++ b/debug-tools/binlog-event-blackhole/config.go @@ -10,6 +10,9 @@ import ( type config struct { *flag.FlagSet + logLevel string + logFile string + addr string username string password string @@ -25,9 +28,12 @@ func newConfig() *config { } fs := cfg.FlagSet + fs.StringVar(&cfg.logLevel, "L", "info", "log level: debug, info, warn, error, fatal") + fs.StringVar(&cfg.logFile, "log-file", "", "log file path") + fs.StringVar(&cfg.addr, "addr", "", "master's address") - fs.StringVar(&cfg.username, "username", "", "master's username") - fs.StringVar(&cfg.password, "password", "", "password for `username`") + fs.StringVar(&cfg.username, "u", "", "master's username") + fs.StringVar(&cfg.password, "p", "", "password for `username`") fs.IntVar(&cfg.serverID, "server-id", 0, "slave's server-id") fs.StringVar(&cfg.binlogName, "binlog-name", "", "startup binlog filename") fs.IntVar(&cfg.binlogPos, "binlog-pos", 0, "startup binlog position") diff --git a/debug-tools/binlog-event-blackhole/fetcher.go b/debug-tools/binlog-event-blackhole/fetcher.go index efcc9235aa..a8211ab909 100644 --- a/debug-tools/binlog-event-blackhole/fetcher.go +++ b/debug-tools/binlog-event-blackhole/fetcher.go @@ -2,18 +2,22 @@ package main import ( "context" - "fmt" + "time" "github.com/pingcap/errors" "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go/sync2" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" ) -// registerSlave register a fake slave on the master. +// registerSlave register a slave connection on the master. func registerSlave(addr, username, password string, serverID uint32) (*client.Conn, error) { conn, err := client.Connect(addr, username, password, "", func(c *client.Conn) { }) if err != nil { - return nil, errors.Trace(err) + return nil, errors.Annotate(err, "connect to the master") } // takes as an indication that the client is checksum-aware. @@ -43,41 +47,50 @@ func startSync(conn *client.Conn, serverID uint32, name string, pos uint32) erro conn.ResetSequence() packet := dumpCommand(serverID, name, pos) err := conn.WritePacket(packet) - return errors.Trace(err) + return errors.Annotatef(err, "write COM_BINLOG_DUMP %v", packet) } -// killConn kills the connection to the master server. -func killConn(conn *client.Conn) error { - query := fmt.Sprintf(`KILL %d`, conn.GetConnectionID()) - _, err := conn.Execute(query) - return errors.Annotate(err, query) +// closeConn closes the connection to the master server. +func closeConn(conn *client.Conn) error { + deadline := time.Now().Add(time.Millisecond) + err := conn.SetReadDeadline(deadline) + if err != nil { + return errors.Annotatef(err, "set connection read deadline to %v", deadline) + } + + return errors.Trace(conn.Close()) } // readEventsWithGoMySQL reads binlog events from the master server with `go-mysql` pkg. -func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) error { +func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, time.Duration, error) { + var ( + count sync2.AtomicUint64 + startTime = time.Now() + ) for { select { case <-ctx.Done(): - return nil + return count.Get(), time.Since(startTime), nil default: } data, err := conn.ReadPacket() if err != nil { - return errors.Trace(err) + return count.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") } switch data[0] { - case 0x00: - continue // count event - case 0xff: - return errors.New("read event fail") - case 0xfe: + case 0x00: // OK_HEADER + count.Add(1) // got one more event + continue + case 0xff: // ERR_HEADER + return count.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") + case 0xfe: // EOF_HEADER // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html - fmt.Println("receive EOF packet, retry ReadPacket") + log.L().Warn("receive EOF packet, retrying") continue default: - fmt.Printf("invalid stream header %c\n", data[0]) + log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0]))) continue } } diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go index 5cfead896e..6889626272 100644 --- a/debug-tools/binlog-event-blackhole/main.go +++ b/debug-tools/binlog-event-blackhole/main.go @@ -2,18 +2,46 @@ package main import ( "context" + "flag" "fmt" "os" "os/signal" + "strings" "syscall" + + "github.com/pingcap/errors" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" ) func main() { cfg := newConfig() err := cfg.parse(os.Args[1:]) + switch errors.Cause(err) { + case nil: + case flag.ErrHelp: + os.Exit(0) + default: + fmt.Printf("parse cmd flags err %s", err) + os.Exit(2) + } + + err = log.InitLogger(&log.Config{ + File: cfg.logFile, + Level: strings.ToLower(cfg.logLevel), + }) + if err != nil { + fmt.Printf("init logger error %v", errors.ErrorStack(err)) + os.Exit(2) + } + + conn, err := registerSlave(cfg.addr, cfg.username, cfg.password, uint32(cfg.serverID)) if err != nil { - panic(err) + log.L().Error("register slave", zap.Error(err)) + os.Exit(2) } + log.L().Info("registered slave", zap.Uint32("connection ID", conn.GetConnectionID())) ctx, cancel := context.WithCancel(context.Background()) @@ -24,32 +52,30 @@ func main() { syscall.SIGTERM, syscall.SIGQUIT) go func() { - <-sc + sig := <-sc cancel() + log.L().Info("got signal to exit", zap.Stringer("signal", sig)) + err2 := closeConn(conn) + if err2 != nil { + log.L().Error("close connection", zap.Error(err2)) + } }() - conn, err := registerSlave(cfg.addr, cfg.username, cfg.password, uint32(cfg.serverID)) - if err != nil { - panic(err) - } - defer func() { - killConn(conn) - conn.Close() - }() - - fmt.Printf("registered slave as %d\n", conn.GetConnectionID()) - err = startSync(conn, uint32(cfg.serverID), cfg.binlogName, uint32(cfg.binlogPos)) if err != nil { - panic(err) + log.L().Error("start sync", zap.Error(err)) + os.Exit(2) } + log.L().Info("start sync", + zap.Int("server-id", cfg.serverID), zap.String("binlog-name", cfg.binlogName), + zap.Int("binlog-pos", cfg.binlogPos)) - fmt.Println("okok") - - err = readEventsWithGoMySQL(ctx, conn) + count, duration, err := readEventsWithGoMySQL(ctx, conn) if err != nil { - panic(err) + log.L().Error("read events", zap.Error(err)) } - fmt.Println("haha") + tps := count / uint64(duration.Seconds()) + log.L().Info("binlog-event-blackhole exit", zap.Uint64("count", count), + zap.Duration("duration", duration), zap.Uint64("tps", tps)) } From a5b52bc1f66ae13340490124e9ad271cc0b26c8f Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 12 Aug 2019 14:35:57 +0800 Subject: [PATCH 3/8] debug-tools: fix default binlog-pos; fix tps --- debug-tools/binlog-event-blackhole/config.go | 2 +- debug-tools/binlog-event-blackhole/main.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/debug-tools/binlog-event-blackhole/config.go b/debug-tools/binlog-event-blackhole/config.go index d5157270da..1924914dbe 100644 --- a/debug-tools/binlog-event-blackhole/config.go +++ b/debug-tools/binlog-event-blackhole/config.go @@ -36,7 +36,7 @@ func newConfig() *config { fs.StringVar(&cfg.password, "p", "", "password for `username`") fs.IntVar(&cfg.serverID, "server-id", 0, "slave's server-id") fs.StringVar(&cfg.binlogName, "binlog-name", "", "startup binlog filename") - fs.IntVar(&cfg.binlogPos, "binlog-pos", 0, "startup binlog position") + fs.IntVar(&cfg.binlogPos, "binlog-pos", 4, "startup binlog position") return cfg } diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go index 6889626272..0eb5a9bd18 100644 --- a/debug-tools/binlog-event-blackhole/main.go +++ b/debug-tools/binlog-event-blackhole/main.go @@ -75,7 +75,7 @@ func main() { log.L().Error("read events", zap.Error(err)) } - tps := count / uint64(duration.Seconds()) + tps := float64(count) / duration.Seconds() log.L().Info("binlog-event-blackhole exit", zap.Uint64("count", count), - zap.Duration("duration", duration), zap.Uint64("tps", tps)) + zap.Duration("duration", duration), zap.Float64("tps", tps)) } From 929fd76c644d1c793562ff11ee7380f5e6517255 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 12 Aug 2019 15:12:47 +0800 Subject: [PATCH 4/8] debug-tools: read event without go-mysql --- debug-tools/binlog-event-blackhole/config.go | 4 +++ debug-tools/binlog-event-blackhole/fetcher.go | 35 +++++++++++++++++++ debug-tools/binlog-event-blackhole/main.go | 14 +++++++- 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/debug-tools/binlog-event-blackhole/config.go b/debug-tools/binlog-event-blackhole/config.go index 1924914dbe..226bfee2ec 100644 --- a/debug-tools/binlog-event-blackhole/config.go +++ b/debug-tools/binlog-event-blackhole/config.go @@ -13,6 +13,8 @@ type config struct { logLevel string logFile string + mode int + addr string username string password string @@ -31,6 +33,8 @@ func newConfig() *config { fs.StringVar(&cfg.logLevel, "L", "info", "log level: debug, info, warn, error, fatal") fs.StringVar(&cfg.logFile, "log-file", "", "log file path") + fs.IntVar(&cfg.mode, "mode", 0, "event read mode.\n1: read packet with go-mysql;\n2: read packet without go-mysql;\n3: read binary data but do nothing") + fs.StringVar(&cfg.addr, "addr", "", "master's address") fs.StringVar(&cfg.username, "u", "", "master's username") fs.StringVar(&cfg.password, "p", "", "password for `username`") diff --git a/debug-tools/binlog-event-blackhole/fetcher.go b/debug-tools/binlog-event-blackhole/fetcher.go index a8211ab909..c60e72879b 100644 --- a/debug-tools/binlog-event-blackhole/fetcher.go +++ b/debug-tools/binlog-event-blackhole/fetcher.go @@ -95,3 +95,38 @@ func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, time } } } + +// readEventsWithoutGoMySQL reads binlog events from master server without `go-mysql` pkg. +func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, time.Duration, error) { + var ( + count sync2.AtomicUint64 + startTime = time.Now() + ) + for { + select { + case <-ctx.Done(): + return count.Get(), time.Since(startTime), nil + default: + } + + _, data, err := readPacket(conn) + if err != nil { + return count.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") + } + + switch data[0] { + case 0x00: // OK_HEADER + count.Add(1) // got one more event + continue + case 0xff: // ERR_HEADER + return count.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") + case 0xfe: // EOF_HEADER + // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html + log.L().Warn("receive EOF packet, retrying") + continue + default: + log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0]))) + continue + } + } +} diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go index 0eb5a9bd18..34bb648464 100644 --- a/debug-tools/binlog-event-blackhole/main.go +++ b/debug-tools/binlog-event-blackhole/main.go @@ -8,6 +8,7 @@ import ( "os/signal" "strings" "syscall" + "time" "github.com/pingcap/errors" "go.uber.org/zap" @@ -70,7 +71,18 @@ func main() { zap.Int("server-id", cfg.serverID), zap.String("binlog-name", cfg.binlogName), zap.Int("binlog-pos", cfg.binlogPos)) - count, duration, err := readEventsWithGoMySQL(ctx, conn) + var ( + count uint64 + duration time.Duration + ) + switch cfg.mode { + case 1: + count, duration, err = readEventsWithGoMySQL(ctx, conn) + case 2: + count, duration, err = readEventsWithoutGoMySQL(ctx, conn) + default: + log.L().Error("invalid mode specified`", zap.Int("mode", cfg.mode)) + } if err != nil { log.L().Error("read events", zap.Error(err)) } From c0db84dc37602dcf21c3dbfab528a970edd6bc30 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 12 Aug 2019 15:31:30 +0800 Subject: [PATCH 5/8] debug-tools: read data only --- debug-tools/binlog-event-blackhole/fetcher.go | 54 ++++++++++++++----- debug-tools/binlog-event-blackhole/main.go | 19 ++++--- 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/debug-tools/binlog-event-blackhole/fetcher.go b/debug-tools/binlog-event-blackhole/fetcher.go index c60e72879b..4851e5b3e1 100644 --- a/debug-tools/binlog-event-blackhole/fetcher.go +++ b/debug-tools/binlog-event-blackhole/fetcher.go @@ -62,29 +62,31 @@ func closeConn(conn *client.Conn) error { } // readEventsWithGoMySQL reads binlog events from the master server with `go-mysql` pkg. -func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, time.Duration, error) { +func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) { var ( - count sync2.AtomicUint64 - startTime = time.Now() + eventCount sync2.AtomicUint64 + byteCount sync2.AtomicUint64 + startTime = time.Now() ) for { select { case <-ctx.Done(): - return count.Get(), time.Since(startTime), nil + return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil default: } data, err := conn.ReadPacket() if err != nil { - return count.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") } switch data[0] { case 0x00: // OK_HEADER - count.Add(1) // got one more event + eventCount.Add(1) // got one more event + byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header continue case 0xff: // ERR_HEADER - return count.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") case 0xfe: // EOF_HEADER // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html log.L().Warn("receive EOF packet, retrying") @@ -97,29 +99,31 @@ func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, time } // readEventsWithoutGoMySQL reads binlog events from master server without `go-mysql` pkg. -func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, time.Duration, error) { +func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) { var ( - count sync2.AtomicUint64 - startTime = time.Now() + eventCount sync2.AtomicUint64 + byteCount sync2.AtomicUint64 + startTime = time.Now() ) for { select { case <-ctx.Done(): - return count.Get(), time.Since(startTime), nil + return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil default: } _, data, err := readPacket(conn) if err != nil { - return count.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") } switch data[0] { case 0x00: // OK_HEADER - count.Add(1) // got one more event + eventCount.Add(1) // got one more event + byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header continue case 0xff: // ERR_HEADER - return count.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") case 0xfe: // EOF_HEADER // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html log.L().Warn("receive EOF packet, retrying") @@ -130,3 +134,25 @@ func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, t } } } + +// readDataOnly reads the binary data only and does not parse packet or binlog event. +func readDataOnly(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) { + var ( + buf = make([]byte, 10240) + byteCount sync2.AtomicUint64 + startTime = time.Now() + ) + for { + select { + case <-ctx.Done(): + return 0, byteCount.Get(), time.Since(startTime), nil + default: + } + + n, err := conn.Conn.Conn.Read(buf) + if err != nil { + return 0, byteCount.Get(), time.Since(startTime), errors.Annotatef(err, "read binary data") + } + byteCount.Add(uint64(n)) + } +} diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go index 34bb648464..ad5091fdc2 100644 --- a/debug-tools/binlog-event-blackhole/main.go +++ b/debug-tools/binlog-event-blackhole/main.go @@ -72,14 +72,17 @@ func main() { zap.Int("binlog-pos", cfg.binlogPos)) var ( - count uint64 - duration time.Duration + eventCount uint64 + byteCount uint64 + duration time.Duration ) switch cfg.mode { case 1: - count, duration, err = readEventsWithGoMySQL(ctx, conn) + eventCount, byteCount, duration, err = readEventsWithGoMySQL(ctx, conn) case 2: - count, duration, err = readEventsWithoutGoMySQL(ctx, conn) + eventCount, byteCount, duration, err = readEventsWithoutGoMySQL(ctx, conn) + case 3: + eventCount, byteCount, duration, err = readDataOnly(ctx, conn) default: log.L().Error("invalid mode specified`", zap.Int("mode", cfg.mode)) } @@ -87,7 +90,9 @@ func main() { log.L().Error("read events", zap.Error(err)) } - tps := float64(count) / duration.Seconds() - log.L().Info("binlog-event-blackhole exit", zap.Uint64("count", count), - zap.Duration("duration", duration), zap.Float64("tps", tps)) + tps := float64(eventCount) / duration.Seconds() + speed := float64(byteCount) / duration.Seconds() + log.L().Info("binlog-event-blackhole exit", + zap.Uint64("event-count", eventCount), zap.Uint64("byte-count", byteCount), + zap.Duration("duration", duration), zap.Float64("tps", tps), zap.Float64("speed (byte/s)", speed)) } From 7336c41913d76955083bfeccf65af65ac6eb2842 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Mon, 12 Aug 2019 15:59:18 +0800 Subject: [PATCH 6/8] debug-tools: add README for binlog-event-blackhole --- debug-tools/binlog-event-blackhole/README.md | 46 ++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 debug-tools/binlog-event-blackhole/README.md diff --git a/debug-tools/binlog-event-blackhole/README.md b/debug-tools/binlog-event-blackhole/README.md new file mode 100644 index 0000000000..ff96831179 --- /dev/null +++ b/debug-tools/binlog-event-blackhole/README.md @@ -0,0 +1,46 @@ + +## Running + +```bash +./binlog-event-blackhole -addr=127.0.0.1:3306 -u root -server-id=101 -binlog-name=mysql-bin.000003 -mode=1 +``` + +``` + -L string + log level: debug, info, warn, error, fatal (default "info") + -addr string + master's address + -binlog-name string + startup binlog filename + -binlog-pos int + startup binlog position (default 4) + -log-file string + log file path + -mode int + event read mode. + 1: read packet with go-mysql; + 2: read packet without go-mysql; + 3: read binary data but do nothing + -p username + password for username + -server-id int + slave's server-id + -u string + master's username +``` + +## Result + +When exiting, the result will be output to the log as following + +```log +[2019/08/12 15:44:19.269 +08:00] [INFO] [main.go:95] ["binlog-event-blackhole exit"] [event-count=35] [byte-count=2360] [duration=705.627314ms] [tps=49.601254522865595] ["speed (byte/s)"=3344.541733541794] +``` + +| Item | Description | +|:------|:---- | +| event-count | The total events have received from the upstream master | +| byte-count | The total bytes have received from the upstream master | +| duration | The duration has be taken to fetch binlog events | +| tps | The events have received per second | +| speed | The speed of fetching binlog event data (bytes/second) | From 9224e5e6934b6e4e1e3eceb29a2a5dd8e4e671c8 Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Tue, 13 Aug 2019 10:15:47 +0800 Subject: [PATCH 7/8] *: add license header; add build target in Makefile --- Makefile | 5 ++++- debug-tools/binlog-event-blackhole/config.go | 13 +++++++++++++ debug-tools/binlog-event-blackhole/fetcher.go | 13 +++++++++++++ debug-tools/binlog-event-blackhole/main.go | 13 +++++++++++++ debug-tools/binlog-event-blackhole/packet.go | 13 +++++++++++++ 5 files changed, 56 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 86cf01bc00..36486cf1f9 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ else endif .PHONY: build test unit_test dm_integration_test_build integration_test \ - coverage check dm-worker dm-master dm-tracer dmctl + coverage check dm-worker dm-master dm-tracer dmctl debug-tools build: check dm-worker dm-master dm-tracer dmctl @@ -54,6 +54,9 @@ dm-tracer: dmctl: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dmctl ./cmd/dm-ctl +debug-tools: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./debug-tools/binlog-event-blackhole + test: unit_test integration_test unit_test: diff --git a/debug-tools/binlog-event-blackhole/config.go b/debug-tools/binlog-event-blackhole/config.go index 226bfee2ec..e196233310 100644 --- a/debug-tools/binlog-event-blackhole/config.go +++ b/debug-tools/binlog-event-blackhole/config.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( diff --git a/debug-tools/binlog-event-blackhole/fetcher.go b/debug-tools/binlog-event-blackhole/fetcher.go index 4851e5b3e1..56a8a3ec5b 100644 --- a/debug-tools/binlog-event-blackhole/fetcher.go +++ b/debug-tools/binlog-event-blackhole/fetcher.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go index ad5091fdc2..bfa5db245e 100644 --- a/debug-tools/binlog-event-blackhole/main.go +++ b/debug-tools/binlog-event-blackhole/main.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( diff --git a/debug-tools/binlog-event-blackhole/packet.go b/debug-tools/binlog-event-blackhole/packet.go index 7d773488c1..6913e21dc0 100644 --- a/debug-tools/binlog-event-blackhole/packet.go +++ b/debug-tools/binlog-event-blackhole/packet.go @@ -1,3 +1,16 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + package main import ( From 50c2b563b27b719c3458d5ddbafd95d27595bbed Mon Sep 17 00:00:00 2001 From: csuzhangxc Date: Fri, 15 Nov 2019 19:38:36 +0800 Subject: [PATCH 8/8] debug-tools: address comments --- debug-tools/binlog-event-blackhole/README.md | 4 ++-- debug-tools/binlog-event-blackhole/main.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/debug-tools/binlog-event-blackhole/README.md b/debug-tools/binlog-event-blackhole/README.md index ff96831179..ec379f5554 100644 --- a/debug-tools/binlog-event-blackhole/README.md +++ b/debug-tools/binlog-event-blackhole/README.md @@ -34,7 +34,7 @@ When exiting, the result will be output to the log as following ```log -[2019/08/12 15:44:19.269 +08:00] [INFO] [main.go:95] ["binlog-event-blackhole exit"] [event-count=35] [byte-count=2360] [duration=705.627314ms] [tps=49.601254522865595] ["speed (byte/s)"=3344.541733541794] +[2019/08/12 15:44:19.269 +08:00] [INFO] [main.go:95] ["binlog-event-blackhole exit"] [event-count=35] [byte-count=2360] [duration=705.627314ms] [tps=49.601254522865595] ["throughput (byte/s)"=3344.541733541794] ``` | Item | Description | @@ -43,4 +43,4 @@ When exiting, the result will be output to the log as following | byte-count | The total bytes have received from the upstream master | | duration | The duration has be taken to fetch binlog events | | tps | The events have received per second | -| speed | The speed of fetching binlog event data (bytes/second) | +| throughput | The throughput of fetching binlog event data (bytes/second) | diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go index bfa5db245e..17b726193b 100644 --- a/debug-tools/binlog-event-blackhole/main.go +++ b/debug-tools/binlog-event-blackhole/main.go @@ -107,5 +107,5 @@ func main() { speed := float64(byteCount) / duration.Seconds() log.L().Info("binlog-event-blackhole exit", zap.Uint64("event-count", eventCount), zap.Uint64("byte-count", byteCount), - zap.Duration("duration", duration), zap.Float64("tps", tps), zap.Float64("speed (byte/s)", speed)) + zap.Duration("duration", duration), zap.Float64("tps", tps), zap.Float64("throughput (byte/s)", speed)) }