Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

debug-tools: Binlog event blackhole #235

Merged
merged 15 commits into from
Nov 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ else
endif

.PHONY: build test unit_test dm_integration_test_build integration_test \
coverage check dm-worker dm-master dm-tracer dmctl
coverage check dm-worker dm-master dm-tracer dmctl debug-tools

build: check dm-worker dm-master dm-tracer dmctl

Expand All @@ -54,6 +54,9 @@ dm-tracer:
dmctl:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/dmctl ./cmd/dm-ctl

debug-tools:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/binlog-event-blackhole ./debug-tools/binlog-event-blackhole

test: unit_test integration_test

unit_test:
Expand Down
46 changes: 46 additions & 0 deletions debug-tools/binlog-event-blackhole/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@

## Running

```bash
./binlog-event-blackhole -addr=127.0.0.1:3306 -u root -server-id=101 -binlog-name=mysql-bin.000003 -mode=1
```

```
-L string
log level: debug, info, warn, error, fatal (default "info")
-addr string
master's address
-binlog-name string
startup binlog filename
-binlog-pos int
startup binlog position (default 4)
-log-file string
log file path
-mode int
event read mode.
1: read packet with go-mysql;
2: read packet without go-mysql;
3: read binary data but do nothing
-p username
password for username
-server-id int
slave's server-id
-u string
master's username
```

## Result

When exiting, the result will be output to the log as following

```log
[2019/08/12 15:44:19.269 +08:00] [INFO] [main.go:95] ["binlog-event-blackhole exit"] [event-count=35] [byte-count=2360] [duration=705.627314ms] [tps=49.601254522865595] ["throughput (byte/s)"=3344.541733541794]
```

| Item | Description |
|:------|:---- |
| event-count | The total events have received from the upstream master |
| byte-count | The total bytes have received from the upstream master |
| duration | The duration has be taken to fetch binlog events |
| tps | The events have received per second |
| throughput | The throughput of fetching binlog event data (bytes/second) |
65 changes: 65 additions & 0 deletions debug-tools/binlog-event-blackhole/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"

"github.com/pingcap/errors"
)

// config is the configuration used by this binlog-event-blackhole.
type config struct {
*flag.FlagSet

logLevel string
logFile string

mode int

addr string
username string
password string
serverID int
binlogName string
binlogPos int
}

// newConfig creates a new config instance.
func newConfig() *config {
cfg := &config{
FlagSet: flag.NewFlagSet("binlog-event-blackhole", flag.ContinueOnError),
}
fs := cfg.FlagSet

fs.StringVar(&cfg.logLevel, "L", "info", "log level: debug, info, warn, error, fatal")
fs.StringVar(&cfg.logFile, "log-file", "", "log file path")

fs.IntVar(&cfg.mode, "mode", 0, "event read mode.\n1: read packet with go-mysql;\n2: read packet without go-mysql;\n3: read binary data but do nothing")

fs.StringVar(&cfg.addr, "addr", "", "master's address")
fs.StringVar(&cfg.username, "u", "", "master's username")
fs.StringVar(&cfg.password, "p", "", "password for `username`")
fs.IntVar(&cfg.serverID, "server-id", 0, "slave's server-id")
fs.StringVar(&cfg.binlogName, "binlog-name", "", "startup binlog filename")
fs.IntVar(&cfg.binlogPos, "binlog-pos", 4, "startup binlog position")

return cfg
}

// parse parses flag definitions from the argument list.
func (c *config) parse(args []string) error {
err := c.FlagSet.Parse(args)
return errors.Trace(err)
}
171 changes: 171 additions & 0 deletions debug-tools/binlog-event-blackhole/fetcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go/sync2"
"go.uber.org/zap"

"github.com/pingcap/dm/pkg/log"
)

// registerSlave register a slave connection on the master.
func registerSlave(addr, username, password string, serverID uint32) (*client.Conn, error) {
conn, err := client.Connect(addr, username, password, "", func(c *client.Conn) {
})
if err != nil {
return nil, errors.Annotate(err, "connect to the master")
}

// takes as an indication that the client is checksum-aware.
// ref https://dev.mysql.com/doc/refman/8.0/en/c-api-binary-log-functions.html.
_, err = conn.Execute(`SET @master_binlog_checksum='NONE'`)
if err != nil {
return nil, errors.Annotate(err, `SET @master_binlog_checksum='NONE'`)
}

conn.ResetSequence()
packet := registerSlaveCommand(username, password, serverID)
err = conn.WritePacket(packet)
if err != nil {
return nil, errors.Annotatef(err, "write COM_REGISTER_SLAVE packet %v", packet)
}

_, err = conn.ReadOKPacket()
if err != nil {
return nil, errors.Annotate(err, "read OK packet")
}

return conn, nil
}

// startSync starts to sync binlog event from <name, pos>.
func startSync(conn *client.Conn, serverID uint32, name string, pos uint32) error {
conn.ResetSequence()
packet := dumpCommand(serverID, name, pos)
err := conn.WritePacket(packet)
return errors.Annotatef(err, "write COM_BINLOG_DUMP %v", packet)
}

// closeConn closes the connection to the master server.
func closeConn(conn *client.Conn) error {
deadline := time.Now().Add(time.Millisecond)
err := conn.SetReadDeadline(deadline)
if err != nil {
return errors.Annotatef(err, "set connection read deadline to %v", deadline)
}

return errors.Trace(conn.Close())
}

// readEventsWithGoMySQL reads binlog events from the master server with `go-mysql` pkg.
func readEventsWithGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the differences with readEventsWithoutGoMySQL is the "github.com/siddontang/go-mysql/client".Conn use a buffer, I originally thought that it also contains more logic, such as decoding/parsing.

Let's do this now, you can add the complete go-mysql logic later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh. after doing some benchmark later, I think more (fine-grained) things we can do.

var (
eventCount sync2.AtomicUint64
byteCount sync2.AtomicUint64
startTime = time.Now()
)
for {
select {
case <-ctx.Done():
return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil
default:
}

data, err := conn.ReadPacket()
if err != nil {
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet")
}

switch data[0] {
case 0x00: // OK_HEADER
eventCount.Add(1) // got one more event
byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header
continue
case 0xff: // ERR_HEADER
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header")
case 0xfe: // EOF_HEADER
// Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
log.L().Warn("receive EOF packet, retrying")
continue
default:
log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0])))
continue
}
}
}

// readEventsWithoutGoMySQL reads binlog events from master server without `go-mysql` pkg.
func readEventsWithoutGoMySQL(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) {
var (
eventCount sync2.AtomicUint64
byteCount sync2.AtomicUint64
startTime = time.Now()
)
for {
select {
case <-ctx.Done():
return eventCount.Get(), byteCount.Get(), time.Since(startTime), nil
default:
}

_, data, err := readPacket(conn)
if err != nil {
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.Annotate(err, "read event packet")
}

switch data[0] {
case 0x00: // OK_HEADER
eventCount.Add(1) // got one more event
byteCount.Add(4 + uint64(len(data))) // with 4 bytes packet header
continue
case 0xff: // ERR_HEADER
return eventCount.Get(), byteCount.Get(), time.Since(startTime), errors.New("read event fail with 0xFF header")
case 0xfe: // EOF_HEADER
// Refer http://dev.mysql.com/doc/internals/en/packet-EOF_Packet.html
log.L().Warn("receive EOF packet, retrying")
continue
default:
log.L().Warn("invalid stream header, retrying", zap.Uint8("header", uint8(data[0])))
continue
}
}
}

// readDataOnly reads the binary data only and does not parse packet or binlog event.
func readDataOnly(ctx context.Context, conn *client.Conn) (uint64, uint64, time.Duration, error) {
var (
buf = make([]byte, 10240)
byteCount sync2.AtomicUint64
startTime = time.Now()
)
for {
select {
case <-ctx.Done():
return 0, byteCount.Get(), time.Since(startTime), nil
default:
}

n, err := conn.Conn.Conn.Read(buf)
if err != nil {
return 0, byteCount.Get(), time.Since(startTime), errors.Annotatef(err, "read binary data")
}
byteCount.Add(uint64(n))
}
}
Loading