Skip to content

Commit

Permalink
etcd (ticdc): fix etcd transaction operation out of limit. (#7216) (#…
Browse files Browse the repository at this point in the history
…7257)

close #7131
  • Loading branch information
ti-chi-bot authored Oct 1, 2022
1 parent 79aac3f commit 2310cfd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 9 deletions.
4 changes: 3 additions & 1 deletion cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"github.com/pingcap/tiflow/pkg/version"
)

const cleanMetaDuration = 10 * time.Second

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
type Capture struct {
// captureMu is used to protect the capture info and processorManager.
Expand Down Expand Up @@ -254,7 +256,7 @@ func (c *Capture) run(stdCtx context.Context) error {
return errors.Trace(err)
}
defer func() {
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
timeoutCtx, cancel := context.WithTimeout(context.Background(), cleanMetaDuration)
if err := ctx.GlobalVars().EtcdClient.DeleteCaptureInfo(timeoutCtx, c.info.ID); err != nil {
log.Warn("failed to delete capture info when capture exited", zap.Error(err))
}
Expand Down
30 changes: 22 additions & 8 deletions pkg/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,9 @@ func (c CDCEtcdClient) GetAllTaskStatus(ctx context.Context, changefeedID string
}

// GetTaskStatus queries task status from etcd, returns
// - ModRevision of the given key
// - *model.TaskStatus unmarshalled from the value
// - error if error happens
// - ModRevision of the given key
// - *model.TaskStatus unmarshalled from the value
// - error if error happens
func (c CDCEtcdClient) GetTaskStatus(
ctx context.Context,
changefeedID string,
Expand Down Expand Up @@ -468,9 +468,9 @@ func (c CDCEtcdClient) GetAllTaskPositions(ctx context.Context, changefeedID str
}

// GetTaskPosition queries task process from etcd, returns
// - ModRevision of the given key
// - *model.TaskPosition unmarshaled from the value
// - error if error happens
// - ModRevision of the given key
// - *model.TaskPosition unmarshaled from the value
// - error if error happens
func (c CDCEtcdClient) GetTaskPosition(
ctx context.Context,
changefeedID string,
Expand Down Expand Up @@ -503,9 +503,23 @@ func (c CDCEtcdClient) PutCaptureInfo(ctx context.Context, info *model.CaptureIn
}

// DeleteCaptureInfo delete capture info from etcd.
func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, id string) error {
key := GetEtcdKeyCaptureInfo(id)
func (c CDCEtcdClient) DeleteCaptureInfo(ctx context.Context, captureID string) error {
key := GetEtcdKeyCaptureInfo(captureID)
_, err := c.Client.Delete(ctx, key)
if err != nil {
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}

// we need to clean all task position related to this capture when the capture is offline
// otherwise the task positions may leak
// the taskKey format is /tidb/cdc/task/position/{captureID}
taskKey := fmt.Sprintf("%s/%s", TaskPositionKeyPrefix, captureID)
_, err = c.Client.Delete(ctx, taskKey, clientv3.WithPrefix())
if err != nil {
log.Warn("delete task position failed",
zap.String("captureID", captureID),
zap.String("key", key), zap.Error(err))
}
return cerror.WrapError(cerror.ErrPDEtcdAPIError, err)
}

Expand Down
35 changes: 35 additions & 0 deletions pkg/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,38 @@ func TestExtractKeySuffix(t *testing.T) {
}
}
}

func TestDeleteCaptureInfo(t *testing.T) {
s := &etcdTester{}
s.setUpTest(t)
defer s.tearDownTest(t)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
captureID := "test-capture-id"

changefeedStatus := map[model.ChangeFeedID]model.ChangeFeedStatus{
model.DefaultChangeFeedID("test-cf-1"): {ResolvedTs: 1},
}

for id, status := range changefeedStatus {
val, err := status.Marshal()
require.NoError(t, err)
statusKey := fmt.Sprintf("%s/%s", JobKeyPrefix, id.ID)
_, err = s.client.Client.Put(ctx, statusKey, val)
require.NoError(t, err)

_, err = s.client.Client.Put(
ctx, GetEtcdKeyTaskPosition(id.ID, captureID),
fmt.Sprintf("task-%s", id.ID))
require.NoError(t, err)
}
err := s.client.DeleteCaptureInfo(ctx, captureID)
require.NoError(t, err)
for id := range changefeedStatus {
taskPositionKey := GetEtcdKeyTaskPosition(id.ID, captureID)
v, err := s.client.Client.Get(ctx, taskPositionKey)
require.NoError(t, err)
require.Equal(t, 0, len(v.Kvs))
}
}

0 comments on commit 2310cfd

Please sign in to comment.