diff --git a/cdc/capture/capture.go b/cdc/capture/capture.go index 848df79ca10..6f9162f726a 100644 --- a/cdc/capture/capture.go +++ b/cdc/capture/capture.go @@ -182,13 +182,12 @@ func (c *captureImpl) GetEtcdClient() etcd.CDCEtcdClient { func (c *captureImpl) reset(ctx context.Context) error { lease, err := c.EtcdClient.GetEtcdClient().Grant(ctx, int64(c.config.CaptureSessionTTL)) if err != nil { - return cerror.WrapError(cerror.ErrNewCaptureFailed, err) + return errors.Trace(err) } sess, err := concurrency.NewSession( - c.EtcdClient.GetEtcdClient().Unwrap(), - concurrency.WithLease(lease.ID)) + c.EtcdClient.GetEtcdClient().Unwrap(), concurrency.WithLease(lease.ID)) if err != nil { - return cerror.WrapError(cerror.ErrNewCaptureFailed, err) + return errors.Trace(err) } c.captureMu.Lock() @@ -205,7 +204,7 @@ func (c *captureImpl) reset(ctx context.Context) error { c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID()) _, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security) if err != nil { - return cerror.WrapError(cerror.ErrNewCaptureFailed, err) + return errors.Trace(err) } c.processorManager = c.newProcessorManager( @@ -262,18 +261,23 @@ func (c *captureImpl) Run(ctx context.Context) error { } return errors.Trace(err) } - err = c.reset(ctx) - if err != nil { - log.Error("reset capture failed", zap.Error(err)) - return errors.Trace(err) - } err = c.run(ctx) // if capture suicided, reset the capture and run again. // if the canceled error throw, there are two possible scenarios: - // 1. the internal context canceled, it means some error happened in the internal, and the routine is exited, we should restart the capture - // 2. the parent context canceled, it means that the caller of the capture hope the capture to exit, and this loop will return in the above `select` block - // TODO: make sure the internal cancel should return the real error instead of context.Canceled - if cerror.ErrCaptureSuicide.Equal(err) || context.Canceled == errors.Cause(err) { + // 1. the internal context canceled, it means some error happened in + // the internal, and the routine is exited, we should restart + // the capture. + // 2. the parent context canceled, it means that the caller of + // the capture hope the capture to exit, and this loop will return + // in the above `select` block. + // if there are some **internal** context deadline exceeded (IO/network + // timeout), reset the capture and run again. + // + // TODO: make sure the internal cancel should return the real error + // instead of context.Canceled. + if cerror.ErrCaptureSuicide.Equal(err) || + context.Canceled == errors.Cause(err) || + context.DeadlineExceeded == errors.Cause(err) { log.Info("capture recovered", zap.String("captureID", c.info.ID)) continue } @@ -282,7 +286,13 @@ func (c *captureImpl) Run(ctx context.Context) error { } func (c *captureImpl) run(stdCtx context.Context) error { - err := c.register(stdCtx) + err := c.reset(stdCtx) + if err != nil { + log.Error("reset capture failed", zap.Error(err)) + return errors.Trace(err) + } + + err = c.register(stdCtx) if err != nil { return errors.Trace(err) } diff --git a/cdc/capture/capture_test.go b/cdc/capture/capture_test.go index 97b8309ee09..1fba45a5a3e 100644 --- a/cdc/capture/capture_test.go +++ b/cdc/capture/capture_test.go @@ -27,6 +27,7 @@ import ( cdcContext "github.com/pingcap/tiflow/pkg/context" "github.com/pingcap/tiflow/pkg/etcd" mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock" + "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "go.etcd.io/etcd/client/pkg/v3/logutil" clientv3 "go.etcd.io/etcd/client/v3" @@ -77,6 +78,55 @@ func TestReset(t *testing.T) { wg.Wait() } +type mockEtcdClient struct { + etcd.CDCEtcdClient + clientv3.Lease + called chan struct{} +} + +func (m *mockEtcdClient) GetEtcdClient() *etcd.Client { + cli := &clientv3.Client{Lease: m} + return etcd.Wrap(cli, map[string]prometheus.Counter{}) +} + +func (m *mockEtcdClient) Grant(_ context.Context, _ int64) (*clientv3.LeaseGrantResponse, error) { + select { + case m.called <- struct{}{}: + default: + } + return nil, context.DeadlineExceeded +} + +func TestRetryInternalContextDeadlineExceeded(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + called := make(chan struct{}, 2) + cp := NewCapture4Test(nil) + // In the current implementation, the first RPC is grant. + // the mock client always retry DeadlineExceeded for the RPC. + cp.EtcdClient = &mockEtcdClient{called: called} + + errCh := make(chan error, 1) + go func() { + errCh <- cp.Run(ctx) + }() + time.Sleep(100 * time.Millisecond) + // Waiting for Grant to be called. + <-called + time.Sleep(100 * time.Millisecond) + // Make sure it retrys + <-called + + // Do not retry context canceled. + cancel() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(5 * time.Second): + require.Fail(t, "timeout") + } +} + func TestInfo(t *testing.T) { cp := NewCapture4Test(nil) cp.info = nil diff --git a/cdc/server/server.go b/cdc/server/server.go index 124f1499863..bb9a7d846a6 100644 --- a/cdc/server/server.go +++ b/cdc/server/server.go @@ -164,12 +164,12 @@ func (s *server) prepare(ctx context.Context) error { }, }) if err != nil { - return cerror.WrapError(cerror.ErrNewCaptureFailed, err) + return errors.Trace(err) } cdcEtcdClient, err := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID) if err != nil { - return cerror.WrapError(cerror.ErrNewCaptureFailed, err) + return errors.Trace(err) } s.etcdClient = cdcEtcdClient diff --git a/errors.toml b/errors.toml index ee09e9ca3d6..4b867eac131 100755 --- a/errors.toml +++ b/errors.toml @@ -526,11 +526,6 @@ error = ''' MySQL worker panic ''' -["CDC:ErrNewCaptureFailed"] -error = ''' -new capture failed -''' - ["CDC:ErrNewSemVersion"] error = ''' create sem version diff --git a/pkg/errors/cdc_errors.go b/pkg/errors/cdc_errors.go index d4622d20888..1bb7a1447ab 100644 --- a/pkg/errors/cdc_errors.go +++ b/pkg/errors/cdc_errors.go @@ -499,10 +499,6 @@ var ( "capture suicide", errors.RFCCodeText("CDC:ErrCaptureSuicide"), ) - ErrNewCaptureFailed = errors.Normalize( - "new capture failed", - errors.RFCCodeText("CDC:ErrNewCaptureFailed"), - ) ErrCaptureRegister = errors.Normalize( "capture register to etcd failed", errors.RFCCodeText("CDC:ErrCaptureRegister"),