Skip to content

Commit

Permalink
Merge branch 'pingcap:master' into fix_2980_nil_panic
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Dec 22, 2021
2 parents 36698c4 + d84f15b commit ce6cb53
Show file tree
Hide file tree
Showing 23 changed files with 357 additions and 142 deletions.
62 changes: 41 additions & 21 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,26 +127,20 @@ func (s *schedulerV2) DispatchTable(
captureID model.CaptureID,
isDelete bool,
) (done bool, err error) {
client, ok := s.GetClient(ctx, captureID)
if !ok {
return false, nil
}

topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
}

_, err = client.TrySendMessage(ctx, topic, message)
ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
log.Warn("scheduler: send message failed, retry later", zap.Error(err))
return false, nil
}
return false, errors.Trace(err)
}
if !ok {
return false, nil
}

s.stats.RecordDispatch()
log.Debug("send message successfully",
Expand All @@ -161,25 +155,19 @@ func (s *schedulerV2) Announce(
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
client, ok := s.GetClient(ctx, captureID)
if !ok {
return false, nil
}

topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

_, err := client.TrySendMessage(ctx, topic, message)
ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
log.Warn("scheduler: send message failed, retry later", zap.Error(err))
return false, nil
}
return false, errors.Trace(err)
}
if !ok {
return false, nil
}

s.stats.RecordAnnounce()
log.Debug("send message successfully",
Expand All @@ -189,7 +177,7 @@ func (s *schedulerV2) Announce(
return true, nil
}

func (s *schedulerV2) GetClient(ctx context.Context, target model.CaptureID) (*p2p.MessageClient, bool) {
func (s *schedulerV2) getClient(target model.CaptureID) (*p2p.MessageClient, bool) {
client := s.messageRouter.GetClient(target)
if client == nil {
log.Warn("scheduler: no message client found, retry later",
Expand All @@ -199,6 +187,38 @@ func (s *schedulerV2) GetClient(ctx context.Context, target model.CaptureID) (*p
return client, true
}

func (s *schedulerV2) trySendMessage(
ctx context.Context,
target model.CaptureID,
topic p2p.Topic,
value interface{},
) (bool, error) {
// TODO (zixiong): abstract this function out together with the similar method in cdc/processor/agent.go
// We probably need more advanced logic to handle and mitigate complex failure situations.

client, ok := s.getClient(target)
if !ok {
return false, nil
}

_, err := client.TrySendMessage(ctx, topic, value)
if err != nil {
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
return false, nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
log.Warn("peer messaging client is closed while trying to send a message through it. "+
"Report a bug if this warning repeats",
zap.String("changefeed-id", s.changeFeedID),
zap.String("target", target))
return false, nil
}
return false, errors.Trace(err)
}

return true, nil
}

func (s *schedulerV2) Close(ctx context.Context) {
log.Debug("scheduler closed", zap.String("changefeed-id", s.changeFeedID))
s.deregisterPeerMessageHandlers(ctx)
Expand Down
5 changes: 5 additions & 0 deletions cdc/owner/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ func TestSchedulerBasics(t *testing.T) {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectSendMessageTryAgain")
}()

_ = failpoint.Enable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed", "5*return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed")
}()

stdCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

Expand Down
10 changes: 10 additions & 0 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ func (a *agentImpl) trySendMessage(
topic p2p.Topic,
value interface{},
) (bool, error) {
// TODO (zixiong): abstract this function out together with the similar method in cdc/owner/scheduler.go
// We probably need more advanced logic to handle and mitigate complex failure situations.

client := a.messageRouter.GetClient(target)
if client == nil {
a.printNoClientWarning(target)
Expand All @@ -299,6 +302,13 @@ func (a *agentImpl) trySendMessage(
if cerror.ErrPeerMessageSendTryAgain.Equal(err) {
return false, nil
}
if cerror.ErrPeerMessageClientClosed.Equal(err) {
log.Warn("peer messaging client is closed while trying to send a message through it. "+
"Report a bug if this warning repeats",
zap.String("changefeed-id", a.changeFeed),
zap.String("target", target))
return false, nil
}
return false, errors.Trace(err)
}

Expand Down
43 changes: 43 additions & 0 deletions cdc/processor/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tiflow/cdc/model"
pscheduler "github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand Down Expand Up @@ -334,3 +335,45 @@ func TestAgentNoOwnerAtStartUp(t *testing.T) {
err = agent.Close()
require.NoError(t, err)
}

func TestAgentTolerateClientClosed(t *testing.T) {
suite := newAgentTestSuite(t)
defer suite.Close()

suite.etcdKVClient.On("Get", mock.Anything, etcd.CaptureOwnerKey, mock.Anything).Return(&clientv3.GetResponse{
Kvs: []*mvccpb.KeyValue{
{
Key: []byte(etcd.CaptureOwnerKey),
Value: []byte(ownerCaptureID),
ModRevision: 1,
},
},
}, nil).Once()

// Test Point 1: Create an agent.
agent, err := suite.CreateAgent(t)
require.NoError(t, err)

_ = failpoint.Enable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed", "5*return(true)")
defer func() {
_ = failpoint.Disable("github.com/pingcap/tiflow/pkg/p2p/ClientInjectClosed")
}()

// Test Point 2: We should tolerate the error ErrPeerMessageClientClosed
for i := 0; i < 6; i++ {
err = agent.Tick(suite.cdcCtx)
require.NoError(t, err)
}

select {
case <-suite.ctx.Done():
require.Fail(t, "context should not be canceled")
case syncMsg := <-suite.syncCh:
require.Equal(t, &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Running: nil,
Adding: nil,
Removing: nil,
}, syncMsg)
}
}
2 changes: 2 additions & 0 deletions dm/dm/master/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1534,6 +1534,8 @@ func (s *Server) removeMetaData(ctx context.Context, taskName, metaSchema string
// clear loader and syncer checkpoints
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.LoaderCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.LightningCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dbutil.TableName(metaSchema, cputil.SyncerCheckpoint(taskName))))
sqls = append(sqls, fmt.Sprintf("DROP TABLE IF EXISTS %s",
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/master/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
mock := conn.InitMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LightningCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down Expand Up @@ -1077,6 +1078,7 @@ func (t *testMaster) TestStartTaskWithRemoveMeta(c *check.C) {
mock = conn.InitMockDB(c)
mock.ExpectBegin()
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LoaderCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.LightningCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerCheckpoint(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerShardMeta(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec(fmt.Sprintf("DROP TABLE IF EXISTS `%s`.`%s`", cfg.MetaSchema, cputil.SyncerOnlineDDL(cfg.Name))).WillReturnResult(sqlmock.NewResult(1, 1))
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ type Config struct {
KeepAliveTTL int64 `toml:"keepalive-ttl" json:"keepalive-ttl"`
RelayKeepAliveTTL int64 `toml:"relay-keepalive-ttl" json:"relay-keepalive-ttl"`

RelayDir string `toml:"relay-dir" json:"relay-dir"`

// tls config
config.Security

Expand Down
2 changes: 2 additions & 0 deletions dm/dm/worker/dm-worker.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ log-file = "dm-worker.log"
worker-addr = ":8262"
advertise-addr = "127.0.0.1:8262"
join = "127.0.0.1:8261"

relay-dir = "/tmp/relay"
2 changes: 1 addition & 1 deletion dm/dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,7 +822,7 @@ func (s *Server) getOrStartWorker(cfg *config.SourceConfig, needLock bool) (*Sou
}

log.L().Info("will start a new worker", zap.String("sourceID", cfg.SourceID))
w, err := NewSourceWorker(cfg, s.etcdClient, s.cfg.Name)
w, err := NewSourceWorker(cfg, s.etcdClient, s.cfg.Name, s.cfg.RelayDir)
if err != nil {
return nil, err
}
Expand Down
17 changes: 16 additions & 1 deletion dm/dm/worker/source_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package worker
import (
"context"
"fmt"
"path/filepath"
"sync"
"time"

Expand Down Expand Up @@ -77,6 +78,7 @@ type SourceWorker struct {
relayWg sync.WaitGroup
relayHolder RelayHolder
relayPurger relay.Purger
relayDir string

startedRelayBySourceCfg bool

Expand All @@ -89,13 +91,19 @@ type SourceWorker struct {

// NewSourceWorker creates a new SourceWorker. The functionality of relay and subtask is disabled by default, need call EnableRelay
// and EnableSubtask later.
func NewSourceWorker(cfg *config.SourceConfig, etcdClient *clientv3.Client, name string) (w *SourceWorker, err error) {
func NewSourceWorker(
cfg *config.SourceConfig,
etcdClient *clientv3.Client,
name string,
relayDir string,
) (w *SourceWorker, err error) {
w = &SourceWorker{
cfg: cfg,
subTaskHolder: newSubTaskHolder(),
l: log.With(zap.String("component", "worker controller")),
etcdClient: etcdClient,
name: name,
relayDir: relayDir,
}
// keep running until canceled in `Close`.
w.ctx, w.cancel = context.WithCancel(context.Background())
Expand Down Expand Up @@ -334,6 +342,13 @@ func (w *SourceWorker) EnableRelay(startBySourceCfg bool) (err error) {
}

// 2. initial relay holder, the cfg's password need decrypt
// worker's relay-dir has higher priority than source's relay-dir
if w.relayDir != "" {
workerRelayDir := filepath.Join(w.relayDir, w.name)
log.L().Info("use worker's relay-dir", zap.String("RelayDir", workerRelayDir))
w.cfg.RelayDir = workerRelayDir
}

w.relayHolder = NewRelayHolder(w.cfg)
relayPurger, err := w.relayHolder.Init(w.relayCtx, []relay.PurgeInterceptor{
w,
Expand Down
10 changes: 5 additions & 5 deletions dm/dm/worker/source_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,12 @@ func (t *testServer) testWorker(c *C) {
defer func() {
NewRelayHolder = NewRealRelayHolder
}()
w, err := NewSourceWorker(cfg, etcdCli, "")
w, err := NewSourceWorker(cfg, etcdCli, "", "")
c.Assert(err, IsNil)
c.Assert(w.EnableRelay(false), ErrorMatches, "init error")

NewRelayHolder = NewDummyRelayHolder
w, err = NewSourceWorker(cfg, etcdCli, "")
w, err = NewSourceWorker(cfg, etcdCli, "", "")
c.Assert(err, IsNil)
c.Assert(w.GetUnitAndSourceStatusJSON("", nil), HasLen, emptyWorkerStatusInfoJSONLength)

Expand Down Expand Up @@ -292,7 +292,7 @@ func (t *testWorkerFunctionalities) TestWorkerFunctionalities(c *C) {
c.Assert(err, IsNil)

// start worker
w, err := NewSourceWorker(sourceCfg, etcdCli, "")
w, err := NewSourceWorker(sourceCfg, etcdCli, "", "")
c.Assert(err, IsNil)
defer w.Close()
go func() {
Expand Down Expand Up @@ -463,7 +463,7 @@ func (t *testWorkerEtcdCompact) TestWatchSubtaskStageEtcdCompact(c *C) {
sourceCfg.EnableRelay = false

// step 1: start worker
w, err := NewSourceWorker(sourceCfg, etcdCli, "")
w, err := NewSourceWorker(sourceCfg, etcdCli, "", "")
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -582,7 +582,7 @@ func (t *testWorkerEtcdCompact) TestWatchRelayStageEtcdCompact(c *C) {
sourceCfg.MetaDir = c.MkDir()

// step 1: start worker
w, err := NewSourceWorker(sourceCfg, etcdCli, "")
w, err := NewSourceWorker(sourceCfg, etcdCli, "", "")
c.Assert(err, IsNil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/worker/task_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (s *testTaskCheckerSuite) TestCheck(c *check.C) {
cfg := loadSourceConfigWithoutPassword(c)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewSourceWorker(cfg, nil, "")
w, err := NewSourceWorker(cfg, nil, "", "")
c.Assert(err, check.IsNil)
w.closed.Store(false)

Expand Down Expand Up @@ -204,7 +204,7 @@ func (s *testTaskCheckerSuite) TestCheckTaskIndependent(c *check.C) {
cfg := loadSourceConfigWithoutPassword(c)
cfg.RelayDir = dir
cfg.MetaDir = dir
w, err := NewSourceWorker(cfg, nil, "")
w, err := NewSourceWorker(cfg, nil, "", "")
c.Assert(err, check.IsNil)
w.closed.Store(false)

Expand Down
Loading

0 comments on commit ce6cb53

Please sign in to comment.