From b0091175805df9365e7e91f7e533ebc0c74c075e Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Thu, 29 Sep 2022 16:39:04 +0800 Subject: [PATCH] *: Support kv_prepare_flashback_to_version cmd Signed-off-by: Hangjie Mo --- go.mod | 2 +- go.sum | 4 ++-- integration_tests/go.mod | 2 +- integration_tests/go.sum | 4 ++-- internal/locate/region_request_test.go | 4 ++++ tikvrpc/tikvrpc.go | 23 ++++++++++++++++++++++- 6 files changed, 32 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index f489053d7..f6d0c5335 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21 + github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index 5825f68dd..c975e1106 100644 --- a/go.sum +++ b/go.sum @@ -155,8 +155,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21 h1:HfAWnlVF7P1nNJvXP4ew1Lcnng/BnAVQ40AsUHKR5EA= -github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index c86c26c8c..1bddb8b19 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,7 +6,7 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 - github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21 + github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c github.com/pingcap/tidb v1.1.0-beta.0.20220902042024-0482b2e83ed2 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.0 diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 6fe79bbeb..733a8b683 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -301,8 +301,8 @@ github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059 h1:Pe2LbxRmbTfAoKJ65bZL github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20220510035547-0e2f26c0a46a/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= -github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21 h1:HfAWnlVF7P1nNJvXP4ew1Lcnng/BnAVQ40AsUHKR5EA= -github.com/pingcap/kvproto v0.0.0-20220908075542-7c004f4daf21/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c h1:ceg4xjEEXNgPsScTQ5dtidiltLF4h17Y/jUqfyLAy9E= +github.com/pingcap/kvproto v0.0.0-20220929075948-06e08d5ed64c/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8= diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 61b917006..8692acc70 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -474,6 +474,10 @@ func (s *mockTikvGrpcServer) KvFlashbackToVersion(context.Context, *kvrpcpb.Flas return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) KvPrepareFlashbackToVersion(context.Context, *kvrpcpb.PrepareFlashbackToVersionRequest) (*kvrpcpb.PrepareFlashbackToVersionResponse, error) { + return nil, errors.New("unreachable") +} + func (s *testRegionRequestToSingleStoreSuite) TestNoReloadRegionForGrpcWhenCtxCanceled() { // prepare a mock tikv grpc server addr := "localhost:56341" diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index eaad28236..d864c415a 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -73,6 +73,7 @@ const ( CmdCheckTxnStatus CmdCheckSecondaryLocks CmdFlashbackToVersion + CmdPrepareFlashbackToVersion CmdRawGet CmdType = 256 + iota CmdRawBatchGet @@ -206,6 +207,8 @@ func (t CmdType) String() string { return "LockWaitInfo" case CmdFlashbackToVersion: return "FlashbackToVersion" + case CmdPrepareFlashbackToVersion: + return "PrepareFlashbackToVersion" } return "Unknown" } @@ -519,6 +522,11 @@ func (req *Request) FlashbackToVersion() *kvrpcpb.FlashbackToVersionRequest { return req.Req.(*kvrpcpb.FlashbackToVersionRequest) } +// PrepareFlashbackToVersion returns PrepareFlashbackToVersion in request. +func (req *Request) PrepareFlashbackToVersion() *kvrpcpb.PrepareFlashbackToVersionRequest { + return req.Req.(*kvrpcpb.PrepareFlashbackToVersionRequest) +} + // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request { switch req.Type { @@ -576,6 +584,8 @@ func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Reques return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_TxnHeartBeat{TxnHeartBeat: req.TxnHeartBeat()}} case CmdFlashbackToVersion: return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_FlashbackToVersion{FlashbackToVersion: req.FlashbackToVersion()}} + case CmdPrepareFlashbackToVersion: + return &tikvpb.BatchCommandsRequest_Request{Cmd: &tikvpb.BatchCommandsRequest_Request_PrepareFlashbackToVersion{PrepareFlashbackToVersion: req.PrepareFlashbackToVersion()}} } return nil } @@ -615,6 +625,8 @@ func FromBatchCommandsResponse(res *tikvpb.BatchCommandsResponse_Response) (*Res return &Response{Resp: res.DeleteRange}, nil case *tikvpb.BatchCommandsResponse_Response_FlashbackToVersion: return &Response{Resp: res.FlashbackToVersion}, nil + case *tikvpb.BatchCommandsResponse_Response_PrepareFlashbackToVersion: + return &Response{Resp: res.PrepareFlashbackToVersion}, nil case *tikvpb.BatchCommandsResponse_Response_RawGet: return &Response{Resp: res.RawGet}, nil case *tikvpb.BatchCommandsResponse_Response_RawBatchGet: @@ -767,6 +779,8 @@ func SetContext(req *Request, region *metapb.Region, peer *metapb.Peer) error { req.CheckSecondaryLocks().Context = ctx case CmdFlashbackToVersion: req.FlashbackToVersion().Context = ctx + case CmdPrepareFlashbackToVersion: + req.PrepareFlashbackToVersion().Context = ctx default: return errors.Errorf("invalid request type %v", req.Type) } @@ -918,6 +932,10 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { p = &kvrpcpb.FlashbackToVersionResponse{ RegionError: e, } + case CmdPrepareFlashbackToVersion: + p = &kvrpcpb.PrepareFlashbackToVersionResponse{ + RegionError: e, + } default: return nil, errors.Errorf("invalid request type %v", req.Type) } @@ -1074,6 +1092,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.Compact(ctx, req.Compact()) case CmdFlashbackToVersion: resp.Resp, err = client.KvFlashbackToVersion(ctx, req.FlashbackToVersion()) + case CmdPrepareFlashbackToVersion: + resp.Resp, err = client.KvPrepareFlashbackToVersion(ctx, req.PrepareFlashbackToVersion()) default: return nil, errors.Errorf("invalid request type: %v", req.Type) } @@ -1236,7 +1256,8 @@ func (req *Request) IsTxnWriteRequest() bool { req.Type == CmdCleanup || req.Type == CmdTxnHeartBeat || req.Type == CmdResolveLock || - req.Type == CmdFlashbackToVersion { + req.Type == CmdFlashbackToVersion || + req.Type == CmdPrepareFlashbackToVersion { return true } return false