Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

drainer: remove pb compress config && change 'pb' to 'file' #559

Merged
merged 10 commits into from
Apr 26, 2019
2 changes: 1 addition & 1 deletion cmd/drainer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Usage of drainer:
-data-dir string
drainer data directory path (default data.drainer) (default "data.drainer")
-dest-db-type string
target db type: mysql or pb; see syncer section in conf/drainer.toml (default "mysql")
target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml (default "mysql")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about file? @suzaku @july2993

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think file's OK, better than pb for an end user.

-detect-interval int
the interval time (in seconds) of detect pumps' status (default 10)
-disable-detect
Expand Down
9 changes: 3 additions & 6 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ disable-dispatch = false
safe-mode = false

# downstream storage, equal to --dest-db-type
# valid values are "mysql", "pb", "tidb", "flash", "kafka"
# valid values are "mysql", "file", "tidb", "flash", "kafka"
db-type = "mysql"

# disable sync these schema
Expand Down Expand Up @@ -81,13 +81,10 @@ port = 3306
# you can uncomment this to change the database to save checkpoint when the downstream is mysql or tidb
#schema = "tidb_binlog"

# Uncomment this if you want to use pb or sql as db-type.
# Compress compresses output file, like pb and sql file. Now it supports "gzip" algorithm only.
# Values can be "gzip". Leave it empty to disable compression.
# Uncomment this if you want to use file as db-type.
#[syncer.to]
# directory to save pb file, default same as data-dir(save checkpoint file) if this is not configured.
# directory to save binlog file, default same as data-dir(save checkpoint file) if this is not configured.
# dir = "data.drainer"
# compression = "gzip"


# when db-type is kafka, you can uncomment this to config the down stream kafka, it will be the globle config kafka default
Expand Down
2 changes: 1 addition & 1 deletion drainer/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewCheckPoint(name string, cfg *Config) (CheckPoint, error) {
switch name {
case "mysql", "tidb":
cp, err = newMysql(name, cfg)
case "pb":
case "file":
cp, err = NewPb(cfg)
case "kafka":
cp, err = newKafka(cfg)
Expand Down
15 changes: 9 additions & 6 deletions drainer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func NewConfig() *Config {
fs.IntVar(&cfg.SyncerCfg.TxnBatch, "txn-batch", 20, "number of binlog events in a transaction batch")
fs.StringVar(&cfg.SyncerCfg.IgnoreSchemas, "ignore-schemas", "INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql", "disable sync those schemas")
fs.IntVar(&cfg.SyncerCfg.WorkerCount, "c", 16, "parallel worker count")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or pb or flash or kafka; see syncer section in conf/drainer.toml")
fs.StringVar(&cfg.SyncerCfg.DestDBType, "dest-db-type", "mysql", "target db type: mysql or tidb or file or kafka; see syncer section in conf/drainer.toml")
fs.BoolVar(&cfg.SyncerCfg.DisableDispatch, "disable-dispatch", false, "disable dispatching sqls that in one same binlog; if set true, work-count and txn-batch would be useless")
fs.BoolVar(&cfg.SyncerCfg.SafeMode, "safe-mode", false, "enable safe mode to make syncer reentrant")
fs.BoolVar(&cfg.SyncerCfg.DisableCausality, "disable-detect", false, "disbale detect causality")
Expand Down Expand Up @@ -197,7 +197,7 @@ func (cfg *Config) Parse(args []string) error {
}

func (c *SyncerConfig) adjustWorkCount() {
if c.DestDBType == "pb" || c.DestDBType == "kafka" {
if c.DestDBType == "file" || c.DestDBType == "kafka" {
c.DisableDispatch = true
c.WorkerCount = 1
} else if c.DisableDispatch {
Expand Down Expand Up @@ -277,8 +277,6 @@ func (cfg *Config) adjustConfig() error {
cfg.ListenAddr = "http://" + cfg.ListenAddr // add 'http:' scheme to facilitate parsing
adjustString(&cfg.DataDir, defaultDataDir)
adjustInt(&cfg.DetectInterval, defaultDetectInterval)
cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()

// add default syncer.to configuration if need
if cfg.SyncerCfg.To == nil {
Expand Down Expand Up @@ -318,11 +316,13 @@ func (cfg *Config) adjustConfig() error {
if cfg.SyncerCfg.To.KafkaMaxMessages <= 0 {
cfg.SyncerCfg.To.KafkaMaxMessages = 1024
}
} else if cfg.SyncerCfg.DestDBType == "pb" {
} else if cfg.SyncerCfg.DestDBType == "pb" || cfg.SyncerCfg.DestDBType == "file" {
suzaku marked this conversation as resolved.
Show resolved Hide resolved
if len(cfg.SyncerCfg.To.BinlogFileDir) == 0 {
cfg.SyncerCfg.To.BinlogFileDir = cfg.DataDir
log.Infof("use default downstream pb directory: %s", cfg.DataDir)
log.Infof("use default downstream file directory: %s", cfg.DataDir)
}
// pb is an alias of file, use file instead
cfg.SyncerCfg.DestDBType = "file"
july2993 marked this conversation as resolved.
Show resolved Hide resolved
} else if cfg.SyncerCfg.DestDBType == "mysql" || cfg.SyncerCfg.DestDBType == "tidb" {
if len(cfg.SyncerCfg.To.Host) == 0 {
host := os.Getenv("MYSQL_HOST")
Expand Down Expand Up @@ -350,5 +350,8 @@ func (cfg *Config) adjustConfig() error {
}
}

cfg.SyncerCfg.adjustWorkCount()
cfg.SyncerCfg.adjustDoDBAndTable()

return nil
}
13 changes: 13 additions & 0 deletions drainer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,16 @@ func (t *testDrainerSuite) TestValidate(c *C) {
err = cfg.validate()
c.Assert(err, IsNil)
}

func (t *testDrainerSuite) TestAdjustConfig(c *C) {
cfg := NewConfig()
cfg.SyncerCfg.DestDBType = "pb"
cfg.SyncerCfg.WorkerCount = 10
cfg.SyncerCfg.DisableDispatch = true

err := cfg.adjustConfig()
c.Assert(err, IsNil)
c.Assert(cfg.SyncerCfg.DestDBType, Equals, "file")
c.Assert(cfg.SyncerCfg.WorkerCount, Equals, 1)
c.Assert(cfg.SyncerCfg.DisableDispatch, IsTrue)
}
46 changes: 23 additions & 23 deletions drainer/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (t *schemaSuite) TestSchema(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 6,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
ID: 6,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 3, FinishedTS: 123},
Query: "drop database test",
Query: "drop database test",
},
)
schema, err = NewSchema(jobs, false)
Expand All @@ -96,12 +96,12 @@ func (t *schemaSuite) TestSchema(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 9,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
ID: 9,
State: model.JobStateSynced,
SchemaID: 1,
Type: model.ActionDropSchema,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 1, FinishedTS: 123},
Query: "drop database test",
Query: "drop database test",
},
)
schema, err = NewSchema(jobs, false)
Expand Down Expand Up @@ -170,7 +170,7 @@ func (*schemaSuite) TestTable(c *C) {
SchemaID: 3,
TableID: 2,
Type: model.ActionCreateTable,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 123},
BinlogInfo: &model.HistoryInfo{SchemaVersion: 2, TableInfo: tblInfo, FinishedTS: 123},
Query: "create table " + tbName.O,
}
jobs = append(jobs, job)
Expand Down Expand Up @@ -224,13 +224,13 @@ func (*schemaSuite) TestTable(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 2,
Type: model.ActionTruncateTable,
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 2,
Type: model.ActionTruncateTable,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 5, TableInfo: tblInfo1, FinishedTS: 123},
Query: "truncate table " + tbName.O,
Query: "truncate table " + tbName.O,
},
)
schema1, err := NewSchema(jobs, false)
Expand All @@ -246,13 +246,13 @@ func (*schemaSuite) TestTable(c *C) {
jobs = append(
jobs,
&model.Job{
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 9,
Type: model.ActionDropTable,
ID: 9,
State: model.JobStateSynced,
SchemaID: 3,
TableID: 9,
Type: model.ActionDropTable,
BinlogInfo: &model.HistoryInfo{SchemaVersion: 6, FinishedTS: 123},
Query: "drop table " + tbName.O,
Query: "drop table " + tbName.O,
},
)
schema2, err := NewSchema(jobs, false)
Expand Down
6 changes: 2 additions & 4 deletions drainer/sync/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/pingcap/tidb-binlog/pkg/compress"
pb "github.com/pingcap/tidb-binlog/proto/binlog"
tb "github.com/pingcap/tipb/go-binlog"
)
Expand All @@ -29,9 +28,8 @@ type pbSyncer struct {
*baseSyncer
}

func NewPBSyncer(dir string, compression string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
codec := compress.ToCompressionCodec(compression)
binlogger, err := binlogfile.OpenBinlogger(dir, codec)
func NewPBSyncer(dir string, tableInfoGetter translator.TableInfoGetter) (*pbSyncer, error) {
binlogger, err := binlogfile.OpenBinlogger(dir)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *syncerSuite) SetUpTest(c *check.C) {
}

// create pb syncer
pb, err := NewPBSyncer(c.MkDir(), "", infoGetter)
pb, err := NewPBSyncer(c.MkDir(), infoGetter)
c.Assert(err, check.IsNil)

s.syncers = append(s.syncers, pb)
Expand Down
1 change: 0 additions & 1 deletion drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type DBConfig struct {
Port int `toml:"port" json:"port"`
Checkpoint CheckpointConfig `toml:"checkpoint" json:"checkpoint"`
BinlogFileDir string `toml:"dir" json:"dir"`
Compression string `toml:"compression" json:"compression"`
TimeLimit string `toml:"time-limit" json:"time-limit"`
SizeLimit string `toml:"size-limit" json:"size-limit"`

Expand Down
4 changes: 2 additions & 2 deletions drainer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema) (dsyncer dsync.Syncer, err
if err != nil {
return nil, errors.Annotate(err, "fail to create kafka dsyncer")
}
case "pb":
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, cfg.To.Compression, schema)
case "file":
dsyncer, err = dsync.NewPBSyncer(cfg.To.BinlogFileDir, schema)
if err != nil {
return nil, errors.Annotate(err, "fail to create pb dsyncer")
}
Expand Down
10 changes: 3 additions & 7 deletions pkg/binlogfile/binlogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/ngaut/log"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/compress"
"github.com/pingcap/tidb-binlog/pkg/file"
"github.com/pingcap/tipb/go-binlog"
"golang.org/x/net/context"
Expand Down Expand Up @@ -71,8 +70,6 @@ type binlogger struct {
// encoder encodes binlog payload into bytes, and write to file
encoder Encoder

codec compress.CompressionCodec

lastSuffix uint64
lastOffset int64

Expand All @@ -83,7 +80,7 @@ type binlogger struct {
}

// OpenBinlogger returns a binlogger for write, then it can be appended
func OpenBinlogger(dirpath string, codec compress.CompressionCodec) (Binlogger, error) {
func OpenBinlogger(dirpath string) (Binlogger, error) {
log.Infof("open binlog directory %s", dirpath)
var (
err error
Expand Down Expand Up @@ -145,8 +142,7 @@ func OpenBinlogger(dirpath string, codec compress.CompressionCodec) (Binlogger,
binlog := &binlogger{
dir: dirpath,
file: fileLock,
encoder: NewEncoder(fileLock, offset, codec),
codec: codec,
encoder: NewEncoder(fileLock, offset),
dirLock: dirLock,
lastSuffix: lastFileSuffix,
lastOffset: offset,
Expand Down Expand Up @@ -414,7 +410,7 @@ func (b *binlogger) rotate() error {
}
b.file = newTail

b.encoder = NewEncoder(b.file, 0, b.codec)
b.encoder = NewEncoder(b.file, 0)
log.Infof("segmented binlog file %v is created", fpath)
return nil
}
Expand Down
21 changes: 10 additions & 11 deletions pkg/binlogfile/binlogger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/tidb-binlog/pkg/compress"
"github.com/pingcap/tipb/go-binlog"
)

Expand All @@ -42,7 +41,7 @@ func (s *testBinloggerSuite) TestCreate(c *C) {
}

func checkTest(c *C, dir string) {
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer CloseBinlogger(bl)

Expand All @@ -54,7 +53,7 @@ func checkTest(c *C, dir string) {

func (s *testBinloggerSuite) TestOpenForWrite(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)

b, ok := bl.(*binlogger)
Expand All @@ -65,7 +64,7 @@ func (s *testBinloggerSuite) TestOpenForWrite(c *C) {
c.Assert(err, IsNil)
bl.Close()

bl, err = OpenBinlogger(dir, compress.CompressionNone)
bl, err = OpenBinlogger(dir)
c.Assert(err, IsNil)

b, ok = bl.(*binlogger)
Expand All @@ -90,7 +89,7 @@ func (s *testBinloggerSuite) TestOpenForWrite(c *C) {

func (s *testBinloggerSuite) TestRotateFile(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)

payload := []byte("binlogtest")
Expand All @@ -110,7 +109,7 @@ func (s *testBinloggerSuite) TestRotateFile(c *C) {

bl.Close()

bl, err = OpenBinlogger(dir, compress.CompressionNone)
bl, err = OpenBinlogger(dir)
c.Assert(err, IsNil)

binlogs, err := bl.ReadFrom(binlog.Pos{}, 1)
Expand All @@ -129,7 +128,7 @@ func (s *testBinloggerSuite) TestRotateFile(c *C) {

func (s *testBinloggerSuite) TestRead(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer bl.Close()

Expand Down Expand Up @@ -168,7 +167,7 @@ func (s *testBinloggerSuite) TestRead(c *C) {

func (s *testBinloggerSuite) TestCourruption(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer bl.Close()

Expand Down Expand Up @@ -201,7 +200,7 @@ func (s *testBinloggerSuite) TestCourruption(c *C) {

func (s *testBinloggerSuite) TestGC(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer CloseBinlogger(bl)

Expand All @@ -227,7 +226,7 @@ func (s *testBinloggerSuite) TestSeekBinlog(c *C) {
os.Remove(f.Name())
}()

encoder := NewEncoder(f, 0, compress.CompressionNone)
encoder := NewEncoder(f, 0)
_, err = encoder.Encode([]byte("binlogtest"))
c.Assert(err, IsNil)

Expand Down Expand Up @@ -270,7 +269,7 @@ func (s *testBinloggerSuite) TestSeekBinlog(c *C) {

func (s *testBinloggerSuite) TestSkipCRCRead(c *C) {
dir := c.MkDir()
bl, err := OpenBinlogger(dir, compress.CompressionNone)
bl, err := OpenBinlogger(dir)
c.Assert(err, IsNil)
defer bl.Close()

Expand Down
Loading