Skip to content

Commit

Permalink
debug-tools: Binlog event blackhole (pingcap#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Nov 19, 2019
1 parent a64b26d commit e37b039
Show file tree
Hide file tree
Showing 6 changed files with 514 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ else
endif

.PHONY: build test unit_test dm_integration_test_build integration_test \
coverage check dm-worker dm-master dm-tracer dmctl
coverage check dm-worker dm-master dm-tracer dmctl debug-tools

build: check dm-worker dm-master dm-tracer dmctl

Expand All @@ -54,6 +54,9 @@ dm-tracer:
dmctl:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dmctl ./cmd/dm-ctl

debug-tools:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./debug-tools/binlog-event-blackhole

test: unit_test integration_test

unit_test:
Expand Down
46 changes: 46 additions & 0 deletions debug-tools/binlog-event-blackhole/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

## Running

```bash
./binlog-event-blackhole -addr=127.0.0.1:3306 -u root -server-id=101 -binlog-name=mysql-bin.000003 -mode=1
```

```
-L string
log level: debug, info, warn, error, fatal (default "info")
-addr string
master's address
-binlog-name string
startup binlog filename
-binlog-pos int
startup binlog position (default 4)
-log-file string
log file path
-mode int
event read mode.
1: read packet with go-mysql;
2: read packet without go-mysql;
3: read binary data but do nothing
-p username
password for username
-server-id int
slave's server-id
-u string
master's username
```

## Result

When exiting, the result will be output to the log as following

```log
[2019/08/12 15:44:19.269 +08:00] [INFO] [main.go:95] ["binlog-event-blackhole exit"] [event-count=35] [byte-count=2360] [duration=705.627314ms] [tps=49.601254522865595] ["throughput (byte/s)"=3344.541733541794]
```

| Item | Description |
|:------|:---- |
| event-count | The total events have received from the upstream master |
| byte-count | The total bytes have received from the upstream master |
| duration | The duration has be taken to fetch binlog events |
| tps | The events have received per second |
| throughput | The throughput of fetching binlog event data (bytes/second) |
65 changes: 65 additions & 0 deletions debug-tools/binlog-event-blackhole/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"

"github.com/pingcap/errors"
)

// config is the configuration used by this binlog-event-blackhole.
type config struct {
*flag.FlagSet

logLevel string
logFile string

mode int

addr string
username string
password string
serverID int
binlogName string
binlogPos int
}

// newConfig creates a new config instance.
func newConfig() *config {
cfg := &config{
FlagSet: flag.NewFlagSet("binlog-event-blackhole", flag.ContinueOnError),
}
fs := cfg.FlagSet

fs.StringVar(&cfg.logLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.logFile, "log-file", "", "log file path")

fs.IntVar(&cfg.mode, "mode", 0, "event read mode.\n1: read packet with go-mysql;\n2: read packet without go-mysql;\n3: read binary data but do nothing")

fs.StringVar(&cfg.addr, "addr", "", "master's address")
fs.StringVar(&cfg.username, "u", "", "master's username")
fs.StringVar(&cfg.password, "p", "", "password for `username`")
fs.IntVar(&cfg.serverID, "server-id", 0, "slave's server-id")
fs.StringVar(&cfg.binlogName, "binlog-name", "", "startup binlog filename")
fs.IntVar(&cfg.binlogPos, "binlog-pos", 4, "startup binlog position")

return cfg
}

// parse parses flag definitions from the argument list.
func (c *config) parse(args []string) error {
err := c.FlagSet.Parse(args)
return errors.Trace(err)
}
171 changes: 171 additions & 0 deletions debug-tools/binlog-event-blackhole/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
)

// registerSlave register a slave connection on the master.
func registerSlave(addr, username, password string, serverID uint32) (*client.Conn, error) {
conn, err := client.Connect(addr, username, password, "", func(c *client.Conn) {
})
if err != nil {
return nil, errors.Annotate(err, "connect to the master")
}

// takes as an indication that the client is checksum-aware.
// ref https://dev.mysql.com/doc/refman/8.0/en/c-api-binary-log-functions.html.
_, err = conn.Execute(`SET @master_binlog_checksum='NONE'`)
if err != nil {
return nil, errors.Annotate(err, `SET @master_binlog_checksum='NONE'`)
}

conn.ResetSequence()
packet := registerSlaveCommand(username, password, serverID)
err = conn.WritePacket(packet)
if err != nil {
return nil, errors.Annotatef(err, "write COM_REGISTER_SLAVE packet %v", packet)
}

_, err = conn.ReadOKPacket()
if err != nil {
return nil, errors.Annotate(err, "read OK packet")
}

return conn, nil
}

// startSync starts to sync binlog event from <name, pos>.
func startSync(conn *client.Conn, serverID uint32, name string, pos uint32) error {
conn.ResetSequence()
packet := dumpCommand(serverID, name, pos)
err := conn.WritePacket(packet)
return errors.Annotatef(err, "write COM_BINLOG_DUMP %v", packet)
}

// closeConn closes the connection to the master server.
func closeConn(conn *client.Conn) error {
deadline := time.Now().Add(time.Millisecond)
err := conn.SetReadDeadline(deadline)
if err != nil {
return errors.Annotatef(err, "set connection read deadline to %v", deadline)
}

return errors.Trace(conn.Close())
}

// readEventsWithGoMySQL reads binlog events from the master server with `go-mysql` pkg.
func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) {
var (
eventCount sync2.AtomicUint64
byteCount sync2.AtomicUint64
startTime = time.Now()
)
for {
select {
case <-ctx.Done():
return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil
default:
}

data, err := conn.ReadPacket()
if err != nil {
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet")
}

switch data[0] {
case 0x00: // OK_HEADER
eventCount.Add(1) // got one more event
byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header
continue
case 0xff: // ERR_HEADER
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header")
case 0xfe: // EOF_HEADER
// Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
log.L().Warn("receive EOF packet, retrying")
continue
default:
log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0])))
continue
}
}
}

// readEventsWithoutGoMySQL reads binlog events from master server without `go-mysql` pkg.
func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) {
var (
eventCount sync2.AtomicUint64
byteCount sync2.AtomicUint64
startTime = time.Now()
)
for {
select {
case <-ctx.Done():
return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil
default:
}

_, data, err := readPacket(conn)
if err != nil {
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet")
}

switch data[0] {
case 0x00: // OK_HEADER
eventCount.Add(1) // got one more event
byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header
continue
case 0xff: // ERR_HEADER
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header")
case 0xfe: // EOF_HEADER
// Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
log.L().Warn("receive EOF packet, retrying")
continue
default:
log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0])))
continue
}
}
}

// readDataOnly reads the binary data only and does not parse packet or binlog event.
func readDataOnly(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) {
var (
buf = make([]byte, 10240)
byteCount sync2.AtomicUint64
startTime = time.Now()
)
for {
select {
case <-ctx.Done():
return 0, byteCount.Get(), time.Since(startTime), nil
default:
}

n, err := conn.Conn.Conn.Read(buf)
if err != nil {
return 0, byteCount.Get(), time.Since(startTime), errors.Annotatef(err, "read binary data")
}
byteCount.Add(uint64(n))
}
}
Loading

0 comments on commit e37b039

Please sign in to comment.