diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go index 8d5142a8b5fd4..3dcd5cce973f9 100644 --- a/br/pkg/lightning/restore/precheck_impl.go +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -16,6 +16,7 @@ package restore import ( "bytes" "context" + "encoding/json" "fmt" "path/filepath" "reflect" @@ -780,53 +781,63 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) { // check etcd KV of CDC >= v6.2 cdcPrefix := "/tidb/cdc/" - capturePath := []byte("/__cdc_meta__/capture/") + changefeedPath := []byte("/changefeed/info/") + nameSet := make(map[string][]string, 1) - resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix()) if err != nil { return nil, errors.Trace(err) } for _, kv := range resp.Kvs { - // example: /tidb/cdc//__cdc_meta__/capture/ + // example: /tidb/cdc///changefeed/info/ k := kv.Key[len(cdcPrefix):] - clusterID, captureID, found := bytes.Cut(k, capturePath) - if found { - nameSet[string(clusterID)] = append(nameSet[string(clusterID)], string(captureID)) + clusterAndNamespace, changefeedID, found := bytes.Cut(k, changefeedPath) + if !found { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue } + + nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) } if len(nameSet) == 0 { // check etcd KV of CDC <= v6.1 - cdcPrefixV61 := "/tidb/cdc/capture/" - resp, err = ci.etcdCli.Get(ctx, cdcPrefixV61, clientv3.WithPrefix(), clientv3.WithKeysOnly()) + cdcPrefixV61 := "/tidb/cdc/changefeed/info/" + resp, err = ci.etcdCli.Get(ctx, cdcPrefixV61, clientv3.WithPrefix()) if err != nil { return nil, errors.Trace(err) } for _, kv := range resp.Kvs { - // example: /tidb/cdc/capture/ + // example: /tidb/cdc/changefeed/info/ k := kv.Key[len(cdcPrefixV61):] if len(k) == 0 { continue } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + nameSet[""] = append(nameSet[""], string(k)) } } if len(nameSet) > 0 { - var captureMsgBuf strings.Builder - captureMsgBuf.WriteString("found CDC capture(s): ") + var changefeedMsgBuf strings.Builder + changefeedMsgBuf.WriteString("found CDC changefeed(s): ") isFirst := true for clusterID, captureIDs := range nameSet { if !isFirst { - captureMsgBuf.WriteString(", ") + changefeedMsgBuf.WriteString(", ") } isFirst = false - captureMsgBuf.WriteString("clusterID: ") - captureMsgBuf.WriteString(clusterID) - captureMsgBuf.WriteString(" captureID(s): ") - captureMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) + changefeedMsgBuf.WriteString("cluster/namespace: ") + changefeedMsgBuf.WriteString(clusterID) + changefeedMsgBuf.WriteString(" changefeed(s): ") + changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) } - captureMsgBuf.WriteString(",") - errorMsg = append(errorMsg, captureMsgBuf.String()) + changefeedMsgBuf.WriteString(",") + errorMsg = append(errorMsg, changefeedMsgBuf.String()) } if len(errorMsg) > 0 { @@ -841,6 +852,28 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) { return theResult, nil } +type onlyState struct { + State string `json:"state"` +} + +func isActiveCDCChangefeed(jsonBytes []byte) bool { + s := onlyState{} + err := json.Unmarshal(jsonBytes, &s) + if err != nil { + // maybe a compatible issue, skip this key + log.L().Error("unmarshal etcd value failed when check CDC changefeed, will skip this key", + zap.ByteString("value", jsonBytes), + zap.Error(err)) + return false + } + switch s.State { + case "normal", "stopped", "error": + return true + default: + return false + } +} + type schemaCheckItem struct { cfg *config.Config preInfoGetter PreRestoreInfoGetter diff --git a/br/pkg/lightning/restore/precheck_impl_test.go b/br/pkg/lightning/restore/precheck_impl_test.go index 7842bd1fd75e7..9d10c34b76d13 100644 --- a/br/pkg/lightning/restore/precheck_impl_test.go +++ b/br/pkg/lightning/restore/precheck_impl_test.go @@ -620,8 +620,12 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) err = brCli.PutTask(ctx, *taskInfo) s.Require().NoError(err) - checkEtcdPut := func(key string) { - _, err := cli.Put(ctx, key, "") + checkEtcdPut := func(key string, vals ...string) { + val := "" + if len(vals) == 1 { + val = vals[0] + } + _, err := cli.Put(ctx, key, val) s.Require().NoError(err) } // TiCDC >= v6.2 @@ -629,8 +633,14 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") - checkEtcdPut("/tidb/cdc/default/default/changefeed/info/test") - checkEtcdPut("/tidb/cdc/default/default/changefeed/info/test-1") + checkEtcdPut( + "/tidb/cdc/default/default/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut( + "/tidb/cdc/default/default/changefeed/info/test-1", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1") checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1") @@ -640,7 +650,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) s.Require().False(result.Passed) s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+ - "found CDC capture(s): clusterID: default captureID(s): [3ecd5c98-0148-4086-adfd-17641995e71f],\n"+ + "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test],\n"+ "local backend is not compatible with them. Please switch to tidb backend then try again.", result.Message) @@ -649,7 +659,10 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { // TiCDC <= v6.1 checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d") - checkEtcdPut("/tidb/cdc/changefeed/info/test") + checkEtcdPut( + "/tidb/cdc/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"stopped","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) checkEtcdPut("/tidb/cdc/job/test") checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b") checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test") @@ -658,7 +671,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) s.Require().False(result.Passed) s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+ - "found CDC capture(s): clusterID: captureID(s): [f14cb04d-5ba1-410e-a59b-ccd796920e9d],\n"+ + "found CDC changefeed(s): cluster/namespace: changefeed(s): [test],\n"+ "local backend is not compatible with them. Please switch to tidb backend then try again.", result.Message)