Skip to content

Commit

Permalink
This is an automated cherry-pick of #8602
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
overvenus authored and ti-chi-bot committed Mar 22, 2023
1 parent 1b0f286 commit 03be1ff
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 21 deletions.
40 changes: 25 additions & 15 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,12 @@ func (c *captureImpl) GetEtcdClient() etcd.CDCEtcdClient {
func (c *captureImpl) reset(ctx context.Context) error {
lease, err := c.EtcdClient.GetEtcdClient().Grant(ctx, int64(c.config.CaptureSessionTTL))
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}
sess, err := concurrency.NewSession(
c.EtcdClient.GetEtcdClient().Unwrap(),
concurrency.WithLease(lease.ID))
c.EtcdClient.GetEtcdClient().Unwrap(), concurrency.WithLease(lease.ID))
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}

c.captureMu.Lock()
Expand All @@ -218,7 +217,7 @@ func (c *captureImpl) reset(ctx context.Context) error {
c.upstreamManager = upstream.NewManager(ctx, c.EtcdClient.GetGCServiceID())
_, err = c.upstreamManager.AddDefaultUpstream(c.pdEndpoints, c.config.Security)
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}

c.processorManager = c.newProcessorManager(c.info, c.upstreamManager, &c.liveness)
Expand Down Expand Up @@ -274,18 +273,23 @@ func (c *captureImpl) Run(ctx context.Context) error {
}
return errors.Trace(err)
}
err = c.reset(ctx)
if err != nil {
log.Error("reset capture failed", zap.Error(err))
return errors.Trace(err)
}
err = c.run(ctx)
// if capture suicided, reset the capture and run again.
// if the canceled error throw, there are two possible scenarios:
// 1. the internal context canceled, it means some error happened in the internal, and the routine is exited, we should restart the capture
// 2. the parent context canceled, it means that the caller of the capture hope the capture to exit, and this loop will return in the above `select` block
// TODO: make sure the internal cancel should return the real error instead of context.Canceled
if cerror.ErrCaptureSuicide.Equal(err) || context.Canceled == errors.Cause(err) {
// 1. the internal context canceled, it means some error happened in
// the internal, and the routine is exited, we should restart
// the capture.
// 2. the parent context canceled, it means that the caller of
// the capture hope the capture to exit, and this loop will return
// in the above `select` block.
// if there are some **internal** context deadline exceeded (IO/network
// timeout), reset the capture and run again.
//
// TODO: make sure the internal cancel should return the real error
// instead of context.Canceled.
if cerror.ErrCaptureSuicide.Equal(err) ||
context.Canceled == errors.Cause(err) ||
context.DeadlineExceeded == errors.Cause(err) {
log.Info("capture recovered", zap.String("captureID", c.info.ID))
continue
}
Expand All @@ -294,7 +298,13 @@ func (c *captureImpl) Run(ctx context.Context) error {
}

func (c *captureImpl) run(stdCtx context.Context) error {
err := c.register(stdCtx)
err := c.reset(stdCtx)
if err != nil {
log.Error("reset capture failed", zap.Error(err))
return errors.Trace(err)
}

err = c.register(stdCtx)
if err != nil {
return errors.Trace(err)
}
Expand Down
50 changes: 50 additions & 0 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
mock_etcd "github.com/pingcap/tiflow/pkg/etcd/mock"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/client/pkg/v3/logutil"
clientv3 "go.etcd.io/etcd/client/v3"
Expand Down Expand Up @@ -77,6 +78,55 @@ func TestReset(t *testing.T) {
wg.Wait()
}

type mockEtcdClient struct {
etcd.CDCEtcdClient
clientv3.Lease
called chan struct{}
}

func (m *mockEtcdClient) GetEtcdClient() *etcd.Client {
cli := &clientv3.Client{Lease: m}
return etcd.Wrap(cli, map[string]prometheus.Counter{})
}

func (m *mockEtcdClient) Grant(_ context.Context, _ int64) (*clientv3.LeaseGrantResponse, error) {
select {
case m.called <- struct{}{}:
default:
}
return nil, context.DeadlineExceeded
}

func TestRetryInternalContextDeadlineExceeded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

called := make(chan struct{}, 2)
cp := NewCapture4Test(nil)
// In the current implementation, the first RPC is grant.
// the mock client always retry DeadlineExceeded for the RPC.
cp.EtcdClient = &mockEtcdClient{called: called}

errCh := make(chan error, 1)
go func() {
errCh <- cp.Run(ctx)
}()
time.Sleep(100 * time.Millisecond)
// Waiting for Grant to be called.
<-called
time.Sleep(100 * time.Millisecond)
// Make sure it retrys
<-called

// Do not retry context canceled.
cancel()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timeout")
}
}

func TestInfo(t *testing.T) {
cp := NewCapture4Test(nil)
cp.info = nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,12 @@ func (s *server) prepare(ctx context.Context) error {
},
})
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}

cdcEtcdClient, err := etcd.NewCDCEtcdClient(ctx, etcdCli, conf.ClusterID)
if err != nil {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
return errors.Trace(err)
}
s.etcdClient = cdcEtcdClient

Expand Down
3 changes: 3 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ error = '''
MySQL worker panic
'''

<<<<<<< HEAD
["CDC:ErrNewCaptureFailed"]
error = '''
new capture failed
Expand All @@ -716,6 +717,8 @@ error = '''
new processor failed
'''

=======
>>>>>>> 94497b4d54 (cdc: retry internal context deadline exceeded (#8602))
["CDC:ErrNewSemVersion"]
error = '''
create sem version
Expand Down
4 changes: 0 additions & 4 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,6 @@ var (
"capture suicide",
errors.RFCCodeText("CDC:ErrCaptureSuicide"),
)
ErrNewCaptureFailed = errors.Normalize(
"new capture failed",
errors.RFCCodeText("CDC:ErrNewCaptureFailed"),
)
ErrCaptureRegister = errors.Normalize(
"capture register to etcd failed",
errors.RFCCodeText("CDC:ErrCaptureRegister"),
Expand Down

0 comments on commit 03be1ff

Please sign in to comment.