Skip to content

Commit

Permalink
Merge branch 'master' into update-go-mysql
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Dec 17, 2021
2 parents 0cc9853 + 730e627 commit 757db12
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 282 deletions.
23 changes: 13 additions & 10 deletions cdc/scheduler/schedule_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ func (s *BaseScheduleDispatcher) Tick(
// (from Etcd in the current implementation).
s.captures = captures

// We trigger an automatic rebalance if the capture count has changed.
// This logic is the same as in the older implementation of scheduler.
// TODO a better criterion is needed.
// NOTE: We need to check whether the capture count has changed in every tick,
// and set needRebalance to true if it has. If we miss a capture count change,
// the workload may never be balanced until user manually triggers a rebalance.
if s.lastTickCaptureCount != captureCountUninitialized &&
s.lastTickCaptureCount != len(captures) {

s.needRebalance = true
}
s.lastTickCaptureCount = len(captures)

// Checks for checkpoint regression as a safety measure.
if s.checkpointTs > checkpointTs {
s.logger.Panic("checkpointTs regressed",
Expand Down Expand Up @@ -246,16 +259,6 @@ func (s *BaseScheduleDispatcher) Tick(
return CheckpointCannotProceed, CheckpointCannotProceed, nil
}

// We trigger an automatic rebalance if the capture count has changed.
// This logic is the same as in the older implementation of scheduler.
// TODO a better criterion is needed.
if s.lastTickCaptureCount != captureCountUninitialized &&
s.lastTickCaptureCount != len(captures) {

s.needRebalance = true
}
s.lastTickCaptureCount = len(captures)

if s.needRebalance {
ok, err := s.rebalance(ctx)
if err != nil {
Expand Down
119 changes: 119 additions & 0 deletions cdc/scheduler/schedule_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,3 +777,122 @@ func TestManualMoveTableWhileAddingTable(t *testing.T) {
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)
}

func TestAutoRebalanceOnCaptureOnline(t *testing.T) {
// This test case tests the following scenario:
// 1. Capture-1 and Capture-2 are online.
// 2. Owner dispatches three tables to these two captures.
// 3. While the pending dispatches are in progress, Capture-3 goes online.
// 4. Capture-1 and Capture-2 finish the dispatches.
//
// We expect that the workload is eventually balanced by migrating
// a table to Capture-3.

t.Parallel()

ctx := cdcContext.NewBackendContext4Test(false)
communicator := NewMockScheduleDispatcherCommunicator()
dispatcher := NewBaseScheduleDispatcher("cf-1", communicator, 1000)

captureList := map[model.CaptureID]*model.CaptureInfo{
"capture-1": {
ID: "capture-1",
AdvertiseAddr: "fakeip:1",
},
"capture-2": {
ID: "capture-2",
AdvertiseAddr: "fakeip:2",
},
}

communicator.On("Announce", mock.Anything, "cf-1", "capture-1").Return(true, nil)
communicator.On("Announce", mock.Anything, "cf-1", "capture-2").Return(true, nil)
checkpointTs, resolvedTs, err := dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)

dispatcher.OnAgentSyncTaskStatuses("capture-1", []model.TableID{}, []model.TableID{}, []model.TableID{})
dispatcher.OnAgentSyncTaskStatuses("capture-2", []model.TableID{}, []model.TableID{}, []model.TableID{})

communicator.Reset()
communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(1), mock.Anything, false).
Return(true, nil)
communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(2), mock.Anything, false).
Return(true, nil)
communicator.On("DispatchTable", mock.Anything, "cf-1", model.TableID(3), mock.Anything, false).
Return(true, nil)
checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)
require.NotEqual(t, 0, len(communicator.addTableRecords["capture-1"]))
require.NotEqual(t, 0, len(communicator.addTableRecords["capture-2"]))
require.Equal(t, 0, len(communicator.removeTableRecords["capture-1"]))
require.Equal(t, 0, len(communicator.removeTableRecords["capture-2"]))

dispatcher.OnAgentCheckpoint("capture-1", 2000, 2000)
dispatcher.OnAgentCheckpoint("capture-1", 2001, 2001)

communicator.ExpectedCalls = nil
checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)

communicator.AssertExpectations(t)

captureList["capture-3"] = &model.CaptureInfo{
ID: "capture-3",
AdvertiseAddr: "fakeip:3",
}
communicator.ExpectedCalls = nil
communicator.On("Announce", mock.Anything, "cf-1", "capture-3").Return(true, nil)
checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)

communicator.ExpectedCalls = nil
dispatcher.OnAgentSyncTaskStatuses("capture-3", []model.TableID{}, []model.TableID{}, []model.TableID{})
checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)

for captureID, tables := range communicator.addTableRecords {
for _, tableID := range tables {
dispatcher.OnAgentFinishedTableOperation(captureID, tableID)
}
}

communicator.Reset()
var removeTableFromCapture model.CaptureID
communicator.On("DispatchTable", mock.Anything, "cf-1", mock.Anything, mock.Anything, true).
Return(true, nil).Run(func(args mock.Arguments) {
removeTableFromCapture = args.Get(3).(model.CaptureID)
})
checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)

removedTableID := communicator.removeTableRecords[removeTableFromCapture][0]

dispatcher.OnAgentFinishedTableOperation(removeTableFromCapture, removedTableID)
dispatcher.OnAgentCheckpoint("capture-1", 1100, 1400)
dispatcher.OnAgentCheckpoint("capture-2", 1200, 1300)
communicator.ExpectedCalls = nil
communicator.On("DispatchTable", mock.Anything, "cf-1", removedTableID, "capture-3", false).
Return(true, nil)
checkpointTs, resolvedTs, err = dispatcher.Tick(ctx, 1000, []model.TableID{1, 2, 3}, captureList)
require.NoError(t, err)
require.Equal(t, CheckpointCannotProceed, checkpointTs)
require.Equal(t, CheckpointCannotProceed, resolvedTs)
communicator.AssertExpectations(t)
}
2 changes: 1 addition & 1 deletion dm/dumpling/dumpling.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (m *Dumpling) constructArgs(ctx context.Context) (*export.Config, error) {

extraArgs := strings.Fields(cfg.ExtraArgs)
if len(extraArgs) > 0 {
err := parseExtraArgs(&m.logger, dumpConfig, ParseArgLikeBash(extraArgs))
err := dutils.ParseExtraArgs(&m.logger, dumpConfig, dutils.ParseArgLikeBash(extraArgs))
if err != nil {
m.logger.Warn("parsed some unsupported arguments", zap.Error(err))
}
Expand Down
62 changes: 56 additions & 6 deletions dm/dumpling/dumpling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ import (
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/docker/go-units"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/filter"
tfilter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/pingcap/tidb/dumpling/export"

"github.com/pingcap/ticdc/dm/dm/config"
"github.com/pingcap/ticdc/dm/dm/pb"
"github.com/pingcap/ticdc/dm/pkg/conn"
"github.com/pingcap/ticdc/dm/pkg/log"

. "github.com/pingcap/check"
Expand All @@ -44,9 +48,9 @@ type testDumplingSuite struct {
cfg *config.SubTaskConfig
}

func (d *testDumplingSuite) SetUpSuite(c *C) {
func (t *testDumplingSuite) SetUpSuite(c *C) {
dir := c.MkDir()
d.cfg = &config.SubTaskConfig{
t.cfg = &config.SubTaskConfig{
Name: "dumpling_ut",
Timezone: "UTC",
From: config.GetDBConfigForTest(),
Expand All @@ -64,8 +68,8 @@ func (d *testDumplingSuite) SetUpSuite(c *C) {
c.Assert(log.InitLogger(&log.Config{}), IsNil)
}

func (d *testDumplingSuite) TestDumpling(c *C) {
dumpling := NewDumpling(d.cfg)
func (t *testDumplingSuite) TestDumpling(c *C) {
dumpling := NewDumpling(t.cfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down Expand Up @@ -117,10 +121,56 @@ func (d *testDumplingSuite) TestDumpling(c *C) {
c.Assert(result.Errors[0].String(), Matches, ".*context deadline exceeded.*")
}

func (d *testDumplingSuite) TestDefaultConfig(c *C) {
dumpling := NewDumpling(d.cfg)
func (t *testDumplingSuite) TestDefaultConfig(c *C) {
dumpling := NewDumpling(t.cfg)
ctx := context.Background()
c.Assert(dumpling.Init(ctx), IsNil)
c.Assert(dumpling.dumpConfig.StatementSize, Not(Equals), export.UnspecifiedSize)
c.Assert(dumpling.dumpConfig.Rows, Not(Equals), export.UnspecifiedSize)
}

func (t *testDumplingSuite) TestParseArgsWontOverwrite(c *C) {
cfg := &config.SubTaskConfig{
Timezone: "UTC",
}
cfg.ChunkFilesize = "1"
rules := &filter.Rules{
DoDBs: []string{"unit_test"},
}
cfg.BAList = rules
// make sure we enter `parseExtraArgs`
cfg.ExtraArgs = "-s=4000 --consistency lock"

d := NewDumpling(cfg)
exportCfg, err := d.constructArgs(context.Background())
c.Assert(err, IsNil)

c.Assert(exportCfg.StatementSize, Equals, uint64(4000))
c.Assert(exportCfg.FileSize, Equals, uint64(1*units.MiB))

f, err2 := tfilter.ParseMySQLReplicationRules(rules)
c.Assert(err2, IsNil)
c.Assert(exportCfg.TableFilter, DeepEquals, tfilter.CaseInsensitive(f))

c.Assert(exportCfg.Consistency, Equals, "lock")
}

func (t *testDumplingSuite) TestConstructArgs(c *C) {
ctx := context.Background()

mock := conn.InitMockDB(c)
mock.ExpectQuery("SELECT cast\\(TIMEDIFF\\(NOW\\(6\\), UTC_TIMESTAMP\\(6\\)\\) as time\\);").
WillReturnRows(sqlmock.NewRows([]string{""}).AddRow("01:00:00"))

cfg := &config.SubTaskConfig{}
cfg.ExtraArgs = `--statement-size=100 --where "t>10" --threads 8 -F 50B`
d := NewDumpling(cfg)
exportCfg, err := d.constructArgs(ctx)
c.Assert(err, IsNil)
c.Assert(exportCfg.StatementSize, Equals, uint64(100))
c.Assert(exportCfg.Where, Equals, "t>10")
c.Assert(exportCfg.Threads, Equals, 8)
c.Assert(exportCfg.FileSize, Equals, uint64(50))
c.Assert(exportCfg.SessionParams, NotNil)
c.Assert(exportCfg.SessionParams["time_zone"], Equals, "+01:00")
}
Loading

0 comments on commit 757db12

Please sign in to comment.