diff --git a/Makefile b/Makefile index dbe7d016d0..e15a80aaac 100644 --- a/Makefile +++ b/Makefile @@ -38,7 +38,7 @@ else endif .PHONY: build test unit_test dm_integration_test_build integration_test \ - coverage check dm-worker dm-master dm-tracer dmctl + coverage check dm-worker dm-master dm-tracer dmctl debug-tools build: check dm-worker dm-master dm-tracer dmctl @@ -54,6 +54,9 @@ dm-tracer: dmctl: $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dmctl ./cmd/dm-ctl +debug-tools: + $(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./debug-tools/binlog-event-blackhole + test: unit_test integration_test unit_test: diff --git a/debug-tools/binlog-event-blackhole/README.md b/debug-tools/binlog-event-blackhole/README.md new file mode 100644 index 0000000000..ec379f5554 --- /dev/null +++ b/debug-tools/binlog-event-blackhole/README.md @@ -0,0 +1,46 @@ + +## Running + +```bash +./binlog-event-blackhole -addr=127.0.0.1:3306 -u root -server-id=101 -binlog-name=mysql-bin.000003 -mode=1 +``` + +``` + -L string + log level: debug, info, warn, error, fatal (default "info") + -addr string + master's address + -binlog-name string + startup binlog filename + -binlog-pos int + startup binlog position (default 4) + -log-file string + log file path + -mode int + event read mode. + 1: read packet with go-mysql; + 2: read packet without go-mysql; + 3: read binary data but do nothing + -p username + password for username + -server-id int + slave's server-id + -u string + master's username +``` + +## Result + +When exiting, the result will be output to the log as following + +```log +[2019/08/12 15:44:19.269 +08:00] [INFO] [main.go:95] ["binlog-event-blackhole exit"] [event-count=35] [byte-count=2360] [duration=705.627314ms] [tps=49.601254522865595] ["throughput (byte/s)"=3344.541733541794] +``` + +| Item | Description | +|:------|:---- | +| event-count | The total events have received from the upstream master | +| byte-count | The total bytes have received from the upstream master | +| duration | The duration has be taken to fetch binlog events | +| tps | The events have received per second | +| throughput | The throughput of fetching binlog event data (bytes/second) | diff --git a/debug-tools/binlog-event-blackhole/config.go b/debug-tools/binlog-event-blackhole/config.go new file mode 100644 index 0000000000..e196233310 --- /dev/null +++ b/debug-tools/binlog-event-blackhole/config.go @@ -0,0 +1,65 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + + "github.com/pingcap/errors" +) + +// config is the configuration used by this binlog-event-blackhole. +type config struct { + *flag.FlagSet + + logLevel string + logFile string + + mode int + + addr string + username string + password string + serverID int + binlogName string + binlogPos int +} + +// newConfig creates a new config instance. +func newConfig() *config { + cfg := &config{ + FlagSet: flag.NewFlagSet("binlog-event-blackhole", flag.ContinueOnError), + } + fs := cfg.FlagSet + + fs.StringVar(&cfg.logLevel, "L", "info", "log level: debug, info, warn, error, fatal") + fs.StringVar(&cfg.logFile, "log-file", "", "log file path") + + fs.IntVar(&cfg.mode, "mode", 0, "event read mode.\n1: read packet with go-mysql;\n2: read packet without go-mysql;\n3: read binary data but do nothing") + + fs.StringVar(&cfg.addr, "addr", "", "master's address") + fs.StringVar(&cfg.username, "u", "", "master's username") + fs.StringVar(&cfg.password, "p", "", "password for `username`") + fs.IntVar(&cfg.serverID, "server-id", 0, "slave's server-id") + fs.StringVar(&cfg.binlogName, "binlog-name", "", "startup binlog filename") + fs.IntVar(&cfg.binlogPos, "binlog-pos", 4, "startup binlog position") + + return cfg +} + +// parse parses flag definitions from the argument list. +func (c *config) parse(args []string) error { + err := c.FlagSet.Parse(args) + return errors.Trace(err) +} diff --git a/debug-tools/binlog-event-blackhole/fetcher.go b/debug-tools/binlog-event-blackhole/fetcher.go new file mode 100644 index 0000000000..56a8a3ec5b --- /dev/null +++ b/debug-tools/binlog-event-blackhole/fetcher.go @@ -0,0 +1,171 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "time" + + "github.com/pingcap/errors" + "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go/sync2" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" +) + +// registerSlave register a slave connection on the master. +func registerSlave(addr, username, password string, serverID uint32) (*client.Conn, error) { + conn, err := client.Connect(addr, username, password, "", func(c *client.Conn) { + }) + if err != nil { + return nil, errors.Annotate(err, "connect to the master") + } + + // takes as an indication that the client is checksum-aware. + // ref https://dev.mysql.com/doc/refman/8.0/en/c-api-binary-log-functions.html. + _, err = conn.Execute(`SET @master_binlog_checksum='NONE'`) + if err != nil { + return nil, errors.Annotate(err, `SET @master_binlog_checksum='NONE'`) + } + + conn.ResetSequence() + packet := registerSlaveCommand(username, password, serverID) + err = conn.WritePacket(packet) + if err != nil { + return nil, errors.Annotatef(err, "write COM_REGISTER_SLAVE packet %v", packet) + } + + _, err = conn.ReadOKPacket() + if err != nil { + return nil, errors.Annotate(err, "read OK packet") + } + + return conn, nil +} + +// startSync starts to sync binlog event from . +func startSync(conn *client.Conn, serverID uint32, name string, pos uint32) error { + conn.ResetSequence() + packet := dumpCommand(serverID, name, pos) + err := conn.WritePacket(packet) + return errors.Annotatef(err, "write COM_BINLOG_DUMP %v", packet) +} + +// closeConn closes the connection to the master server. +func closeConn(conn *client.Conn) error { + deadline := time.Now().Add(time.Millisecond) + err := conn.SetReadDeadline(deadline) + if err != nil { + return errors.Annotatef(err, "set connection read deadline to %v", deadline) + } + + return errors.Trace(conn.Close()) +} + +// readEventsWithGoMySQL reads binlog events from the master server with `go-mysql` pkg. +func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) { + var ( + eventCount sync2.AtomicUint64 + byteCount sync2.AtomicUint64 + startTime = time.Now() + ) + for { + select { + case <-ctx.Done(): + return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil + default: + } + + data, err := conn.ReadPacket() + if err != nil { + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") + } + + switch data[0] { + case 0x00: // OK_HEADER + eventCount.Add(1) // got one more event + byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header + continue + case 0xff: // ERR_HEADER + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") + case 0xfe: // EOF_HEADER + // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html + log.L().Warn("receive EOF packet, retrying") + continue + default: + log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0]))) + continue + } + } +} + +// readEventsWithoutGoMySQL reads binlog events from master server without `go-mysql` pkg. +func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) { + var ( + eventCount sync2.AtomicUint64 + byteCount sync2.AtomicUint64 + startTime = time.Now() + ) + for { + select { + case <-ctx.Done(): + return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil + default: + } + + _, data, err := readPacket(conn) + if err != nil { + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet") + } + + switch data[0] { + case 0x00: // OK_HEADER + eventCount.Add(1) // got one more event + byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header + continue + case 0xff: // ERR_HEADER + return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header") + case 0xfe: // EOF_HEADER + // Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html + log.L().Warn("receive EOF packet, retrying") + continue + default: + log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0]))) + continue + } + } +} + +// readDataOnly reads the binary data only and does not parse packet or binlog event. +func readDataOnly(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) { + var ( + buf = make([]byte, 10240) + byteCount sync2.AtomicUint64 + startTime = time.Now() + ) + for { + select { + case <-ctx.Done(): + return 0, byteCount.Get(), time.Since(startTime), nil + default: + } + + n, err := conn.Conn.Conn.Read(buf) + if err != nil { + return 0, byteCount.Get(), time.Since(startTime), errors.Annotatef(err, "read binary data") + } + byteCount.Add(uint64(n)) + } +} diff --git a/debug-tools/binlog-event-blackhole/main.go b/debug-tools/binlog-event-blackhole/main.go new file mode 100644 index 0000000000..17b726193b --- /dev/null +++ b/debug-tools/binlog-event-blackhole/main.go @@ -0,0 +1,111 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "strings" + "syscall" + "time" + + "github.com/pingcap/errors" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" +) + +func main() { + cfg := newConfig() + err := cfg.parse(os.Args[1:]) + switch errors.Cause(err) { + case nil: + case flag.ErrHelp: + os.Exit(0) + default: + fmt.Printf("parse cmd flags err %s", err) + os.Exit(2) + } + + err = log.InitLogger(&log.Config{ + File: cfg.logFile, + Level: strings.ToLower(cfg.logLevel), + }) + if err != nil { + fmt.Printf("init logger error %v", errors.ErrorStack(err)) + os.Exit(2) + } + + conn, err := registerSlave(cfg.addr, cfg.username, cfg.password, uint32(cfg.serverID)) + if err != nil { + log.L().Error("register slave", zap.Error(err)) + os.Exit(2) + } + log.L().Info("registered slave", zap.Uint32("connection ID", conn.GetConnectionID())) + + ctx, cancel := context.WithCancel(context.Background()) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + go func() { + sig := <-sc + cancel() + log.L().Info("got signal to exit", zap.Stringer("signal", sig)) + err2 := closeConn(conn) + if err2 != nil { + log.L().Error("close connection", zap.Error(err2)) + } + }() + + err = startSync(conn, uint32(cfg.serverID), cfg.binlogName, uint32(cfg.binlogPos)) + if err != nil { + log.L().Error("start sync", zap.Error(err)) + os.Exit(2) + } + log.L().Info("start sync", + zap.Int("server-id", cfg.serverID), zap.String("binlog-name", cfg.binlogName), + zap.Int("binlog-pos", cfg.binlogPos)) + + var ( + eventCount uint64 + byteCount uint64 + duration time.Duration + ) + switch cfg.mode { + case 1: + eventCount, byteCount, duration, err = readEventsWithGoMySQL(ctx, conn) + case 2: + eventCount, byteCount, duration, err = readEventsWithoutGoMySQL(ctx, conn) + case 3: + eventCount, byteCount, duration, err = readDataOnly(ctx, conn) + default: + log.L().Error("invalid mode specified`", zap.Int("mode", cfg.mode)) + } + if err != nil { + log.L().Error("read events", zap.Error(err)) + } + + tps := float64(eventCount) / duration.Seconds() + speed := float64(byteCount) / duration.Seconds() + log.L().Info("binlog-event-blackhole exit", + zap.Uint64("event-count", eventCount), zap.Uint64("byte-count", byteCount), + zap.Duration("duration", duration), zap.Float64("tps", tps), zap.Float64("throughput (byte/s)", speed)) +} diff --git a/debug-tools/binlog-event-blackhole/packet.go b/debug-tools/binlog-event-blackhole/packet.go new file mode 100644 index 0000000000..6913e21dc0 --- /dev/null +++ b/debug-tools/binlog-event-blackhole/packet.go @@ -0,0 +1,117 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "encoding/binary" + "io" + "os" +) + +// registerSlaveCommand generates a COM_REGISTER_SLAVE command (with uninitialized header). +// ref https://dev.mysql.com/doc/internals/en/com-register-slave.html. +func registerSlaveCommand(username, password string, serverID uint32) []byte { + hn, _ := os.Hostname() + + packet := make([]byte, 4+1+4+1+len(hn)+1+len(username)+1+len(password)+2+4+4) + offset := 4 // 4 bytes header + + packet[offset] = 0x15 // COM_REGISTER_SLAVE + offset++ + + binary.LittleEndian.PutUint32(packet[offset:], serverID) + offset += 4 + + packet[offset] = uint8(len(hn)) + offset++ + n := copy(packet[offset:], hn) + offset += n + + packet[offset] = uint8(len(username)) + offset++ + n = copy(packet[offset:], username) + offset += n + + packet[offset] = uint8(len(password)) + offset++ + n = copy(packet[offset:], password) + offset += n + + binary.LittleEndian.PutUint16(packet[offset:], 0) + offset += 2 + + binary.LittleEndian.PutUint32(packet[offset:], 0) + offset += 4 + + binary.LittleEndian.PutUint32(packet[offset:], 0) + + setCommandHeader(packet, 0) + + return packet +} + +// dumpCommand generates a COM_BINLOG_DUMP command. +// ref https://dev.mysql.com/doc/internals/en/com-binlog-dump.html. +func dumpCommand(serverID uint32, name string, pos uint32) []byte { + packet := make([]byte, 4+1+4+2+4+len(name)) + offset := 4 // 4 bytes header + + packet[offset] = 0x12 // COM_BINLOG_DUMP + offset++ + + binary.LittleEndian.PutUint32(packet[offset:], pos) + offset += 4 + + binary.LittleEndian.PutUint16(packet[offset:], 0x00) // BINLOG_DUMP_NEVER_STOP + offset += 2 + + binary.LittleEndian.PutUint32(packet[offset:], serverID) + offset += 4 + + copy(packet[offset:], name) + + setCommandHeader(packet, 0) + + return packet +} + +// setCommandHeader set the header of the command inplace. +// ref https://dev.mysql.com/doc/internals/en/mysql-packet.html. +func setCommandHeader(packet []byte, seqID uint8) { + // do not handle a payload which is larger than or equal to 2^24−1 bytes (16 MB) now. + length := len(packet) - 4 + packet[0] = byte(length) + packet[1] = byte(length >> 8) + packet[2] = byte(length >> 16) + + packet[3] = seqID +} + +// readPacket tries to read a MySQL packet (header & payload) from the Reader. +func readPacket(r io.Reader) ([]byte, []byte, error) { + header := []byte{0, 0, 0, 0} + _, err := io.ReadFull(r, header) + if err != nil { + return nil, nil, err + } + + length := int(uint32(header[0]) | uint32(header[1])<<8 | uint32(header[2])<<16) + payload := make([]byte, length) + _, err = io.ReadFull(r, payload) + if err != nil { + return nil, nil, err + } + + return header, payload, nil +}