From bb026bdad8f34604cc01fdafbaf4063729583a44 Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 24 Jun 2022 18:17:09 +0800 Subject: [PATCH 1/2] client: support requests with source label (#506) * support request scope and source. Signed-off-by: you06 * update Signed-off-by: you06 * use counter for requests with too-many source Signed-off-by: you06 * refine resolve lock Signed-off-by: you06 * remove request source from resolve details Signed-off-by: you06 * address comment Signed-off-by: you06 --- go.mod | 2 +- go.sum | 4 +- integration_tests/go.mod | 10 ++-- integration_tests/go.sum | 38 +++++++++----- internal/client/client.go | 42 ++++++++++++--- metrics/metrics.go | 21 ++++++++ tikv/kv.go | 40 +++++++-------- tikv/split_region.go | 3 +- txnkv/transaction/2pc.go | 6 ++- txnkv/transaction/cleanup.go | 9 +++- txnkv/transaction/commit.go | 11 ++-- txnkv/transaction/pessimistic.go | 23 ++++++--- txnkv/transaction/prewrite.go | 21 +++++--- txnkv/transaction/txn.go | 32 ++++++++++-- txnkv/txnlock/lock_resolver.go | 81 ++++++++++++++++++++++++----- txnkv/txnlock/test_probe.go | 2 +- txnkv/txnsnapshot/client_helper.go | 22 ++++++++ txnkv/txnsnapshot/scan.go | 2 + txnkv/txnsnapshot/snapshot.go | 56 ++++++++++++++++---- util/execdetails.go | 49 +++++++++++------- util/request_source.go | 82 ++++++++++++++++++++++++++++++ 21 files changed, 443 insertions(+), 113 deletions(-) create mode 100644 util/request_source.go diff --git a/go.mod b/go.mod index 79f79e233..44fd6431d 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 - github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 + github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.11.0 diff --git a/go.sum b/go.sum index f95c1904d..4064e6169 100644 --- a/go.sum +++ b/go.sum @@ -161,8 +161,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN8dIUmo4Be2+pMRb6f55i+UIYrluu2E= github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 h1:dsMpneacHyuVslSVndgUfJKrXFNG7VPdXip2ulG6glo= -github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw= +github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/integration_tests/go.mod b/integration_tests/go.mod index 608815862..180494d67 100644 --- a/integration_tests/go.mod +++ b/integration_tests/go.mod @@ -6,14 +6,16 @@ require ( github.com/ninedraft/israce v0.0.3 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3 - github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 - github.com/pingcap/tidb v1.1.0-beta.0.20220517125829-586716bff25e - github.com/pingcap/tidb/parser v0.0.0-20220517125829-586716bff25e // indirect + github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 + github.com/pingcap/tidb v1.1.0-beta.0.20220621061036-5c9ad77ae1f1 + github.com/pingcap/tidb/parser v0.0.0-20220621061036-5c9ad77ae1f1 // indirect github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df - github.com/tikv/client-go/v2 v2.0.1-0.20220516035221-e007187e5101 + github.com/tikv/client-go/v2 v2.0.1-0.20220613112734-be31f33ba03b github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 go.uber.org/goleak v1.1.12 ) replace github.com/tikv/client-go/v2 => ../ + +replace github.com/pingcap/tidb => github.com/you06/tidb v1.1.0-beta.0.20220620132310-ba06be65cc3b diff --git a/integration_tests/go.sum b/integration_tests/go.sum index 3117ec94e..9f0e17d33 100644 --- a/integration_tests/go.sum +++ b/integration_tests/go.sum @@ -89,6 +89,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.1581 h1:Q/yk4z/cHUVZfgTqtD09qeYBxHwshQAjVRX73qs8UH0= +github.com/aliyun/alibaba-cloud-sdk-go v1.61.1581/go.mod h1:RcDobYh8k5VP6TNybz9m++gL3ijVI5wueVr0EM10VsU= github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/thrift v0.0.0-20181112125854-24918abba929/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= @@ -175,7 +177,6 @@ github.com/cznic/golex v0.0.0-20181122101858-9c343928389c/go.mod h1:+bmmJDNmKlhW github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 h1:iwZdTE0PVqJCos1vaoKsclOGD3ADKpshg3SRtYBbwso= github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= github.com/cznic/parser v0.0.0-20160622100904-31edd927e5b1/go.mod h1:2B43mz36vGZNZEwkWi8ayRSSUXLfjL8OkbzwW4NcPMM= -github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8 h1:LpMLYGyy67BoAFGda1NeOBQwqlv7nUXpm+rIVHGxZZ4= github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8/go.mod h1:q2w6Bg5jeox1B+QkJ6Wp/+Vn0G/bo3f1uY7Fn3vivIQ= github.com/cznic/strutil v0.0.0-20171016134553-529a34b1c186/go.mod h1:AHHPPPXTw0h6pVabbcbyGRK1DckRn7r/STdZEeIDzZc= github.com/cznic/y v0.0.0-20170802143616-045f81c6662a/go.mod h1:1rk5VM7oSnA4vjp+hrLQ3HWHa+Y4yPCa3/CsJrcNnvs= @@ -274,6 +275,7 @@ github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXP github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/status v1.1.0/go.mod h1:BFv9nrluPLmrS0EmGVvLaPNmRosr9KapBYd5/hpY1WM= +github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= @@ -435,6 +437,7 @@ github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxy github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jedib0t/go-pretty/v6 v6.2.2/go.mod h1:+nE9fyyHGil+PuISTCrp7avEdo6bqoMwqZnuiK2r2a0= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= @@ -444,10 +447,12 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22 github.com/jonboulle/clockwork v0.2.2 h1:UOGuzwb1PwsrDAObMuhUnj0p5ULPj8V/xJ7Kx9qUBdQ= github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= @@ -530,8 +535,9 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -597,8 +603,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167 h1:dsMpneacHyuVslSVndgUfJKrXFNG7VPdXip2ulG6glo= -github.com/pingcap/kvproto v0.0.0-20220517085838-12e2f5a9d167/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= +github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305 h1:TZ0teMZoKHnZDlJxNkWrp5Sgv3w+ruNbrqtBYKsfaNw= +github.com/pingcap/kvproto v0.0.0-20220525022339-6aaebf466305/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= @@ -607,13 +613,11 @@ github.com/pingcap/log v1.1.0 h1:ELiPxACz7vdo1qAvvaWJg1NrYFoY6gqAh/+Uo6aXdD8= github.com/pingcap/log v1.1.0/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4 h1:HYbcxtnkN3s5tqrZ/z3eJS4j3Db8wMphEm1q10lY/TM= github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4/go.mod h1:sDCsM39cGiv2vwunZkaFA917vVkqDTGSPbbV7z4Oops= -github.com/pingcap/tidb v1.1.0-beta.0.20220517125829-586716bff25e h1:EOQMlH0PHWOuG80DDelsL7brkZSvhiewVwKoDiFLZnc= -github.com/pingcap/tidb v1.1.0-beta.0.20220517125829-586716bff25e/go.mod h1:LuTxJolM8+ftJgDYrNtps4loodvXN0GBAEHJc3d0tb4= github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e/go.mod h1:e1MGCA9Sg3T8jid8PKAEq5eYVuMMCq4n8gJ+Kqp4Plg= -github.com/pingcap/tidb/parser v0.0.0-20220517125829-586716bff25e h1:KPDMn5WiIPGvLBc5iT3icdCP/a0KPofpUQdhLVC/7W8= -github.com/pingcap/tidb/parser v0.0.0-20220517125829-586716bff25e/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= -github.com/pingcap/tipb v0.0.0-20220314125451-bfb5c2c55188 h1:+46isFI9fR9R+nJVDMI55tCC/TCwp+bvVA4HLGEv1rY= -github.com/pingcap/tipb v0.0.0-20220314125451-bfb5c2c55188/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= +github.com/pingcap/tidb/parser v0.0.0-20220621061036-5c9ad77ae1f1 h1:IOBUGVF1Neq5BPRKKIxyg/57PYKdCC8QxCUzi/Q2DV4= +github.com/pingcap/tidb/parser v0.0.0-20220621061036-5c9ad77ae1f1/go.mod h1:ElJiub4lRy6UZDb+0JHDkGEdr6aOli+ykhyej7VCLoI= +github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73 h1:L4nZwfYSrIsWPAZR8zMwHaNQJy0Rjy3Od6Smj5mlOms= +github.com/pingcap/tipb v0.0.0-20220602075447-4847c5d68e73/go.mod h1:A7mrd7WHBl1o63LE2bIBGEJMTNWXqhgmYiOvMLxozfs= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 h1:49lOXmGaUpV9Fz3gd7TFZY106KVlPVa5jcYD1gaQf98= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -631,8 +635,9 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.12.2 h1:51L9cDoUHVrXx4zWYlcLQIZ+d+VXHgqnYKkIuq4g/34= +github.com/prometheus/client_golang v1.12.2/go.mod h1:3Z9XVyYiZYEO+YQWt3RD2R3jrbd179Rt297l4aS6nDY= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -650,8 +655,9 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= -github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= +github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= @@ -754,6 +760,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0/go.mod h1:/LWChgwKmvncFJFHJ7Gvn9wZArjbV5/FppcK2fKk/tI= +github.com/you06/tidb v1.1.0-beta.0.20220620132310-ba06be65cc3b h1:LctQrf5HpbkRpLOBMzx2vSE05x05A8FBgYMU9DOmhqs= +github.com/you06/tidb v1.1.0-beta.0.20220620132310-ba06be65cc3b/go.mod h1:mdUm2FwYwwqDYXtVtASTY3rrtA1ikgSGH5FKdQfKdmI= github.com/yudai/gojsondiff v1.0.0/go.mod h1:AY32+k2cwILAkW1fbgxQ5mUmMiZFgLIV+FBNExI05xg= github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc= @@ -990,8 +998,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180816055513-1c9583448a9c/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1349,6 +1358,7 @@ gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMy gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4= diff --git a/internal/client/client.go b/internal/client/client.go index d26546b3d..40f963f2c 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -344,7 +344,10 @@ func (c *RPCClient) closeConns() { c.Unlock() } -var sendReqHistCache sync.Map +var ( + sendReqHistCache sync.Map + sendReqCounterCache sync.Map +) type sendReqHistCacheKey struct { tp tikvrpc.CmdType @@ -352,22 +355,49 @@ type sendReqHistCacheKey struct { staleRad bool } +type sendReqCounterCacheKey struct { + sendReqHistCacheKey + requestSource string +} + +type sendReqCounterCacheValue struct { + counter prometheus.Counter + timeCounter prometheus.Counter +} + func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, start time.Time, staleRead bool) { - key := sendReqHistCacheKey{ + histKey := sendReqHistCacheKey{ req.Type, req.Context.GetPeer().GetStoreId(), staleRead, } + counterKey := sendReqCounterCacheKey{ + histKey, + req.GetRequestSource(), + } - v, ok := sendReqHistCache.Load(key) + hist, ok := sendReqHistCache.Load(histKey) if !ok { reqType := req.Type.String() storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) - v = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead)) - sendReqHistCache.Store(key, v) + hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead)) + sendReqHistCache.Store(histKey, hist) + } + counter, ok := sendReqCounterCache.Load(counterKey) + if !ok { + reqType := req.Type.String() + storeID := strconv.FormatUint(req.Context.GetPeer().GetStoreId(), 10) + counter = sendReqCounterCacheValue{ + metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource), + metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeID, strconv.FormatBool(staleRead), counterKey.requestSource), + } + sendReqCounterCache.Store(counterKey, counter) } - v.(prometheus.Observer).Observe(time.Since(start).Seconds()) + secs := time.Since(start).Seconds() + hist.(prometheus.Observer).Observe(secs) + counter.(sendReqCounterCacheValue).counter.Inc() + counter.(sendReqCounterCacheValue).timeCounter.Add(secs) } // SendRequest sends a Request to server and receives Response. diff --git a/metrics/metrics.go b/metrics/metrics.go index 932859d4c..0d2c8711a 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -44,6 +44,8 @@ var ( TiKVTxnCmdHistogram *prometheus.HistogramVec TiKVBackoffHistogram *prometheus.HistogramVec TiKVSendReqHistogram *prometheus.HistogramVec + TiKVSendReqCounter *prometheus.CounterVec + TiKVSendReqTimeCounter *prometheus.CounterVec TiKVCoprocessorHistogram *prometheus.HistogramVec TiKVLockResolverCounter *prometheus.CounterVec TiKVRegionErrorCounter *prometheus.CounterVec @@ -112,6 +114,7 @@ const ( LblFromStore = "from_store" LblToStore = "to_store" LblStaleRead = "stale_read" + LblSource = "source" ) func initMetrics(namespace, subsystem string) { @@ -142,6 +145,22 @@ func initMetrics(namespace, subsystem string) { Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days }, []string{LblType, LblStore, LblStaleRead}) + TiKVSendReqCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_counter", + Help: "Counter of sending request with multi dimensions.", + }, []string{LblType, LblStore, LblStaleRead, LblSource}) + + TiKVSendReqTimeCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "request_time_counter", + Help: "Counter of request time with multi dimensions.", + }, []string{LblType, LblStore, LblStaleRead, LblSource}) + TiKVCoprocessorHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: namespace, @@ -578,6 +597,8 @@ func RegisterMetrics() { prometheus.MustRegister(TiKVTxnCmdHistogram) prometheus.MustRegister(TiKVBackoffHistogram) prometheus.MustRegister(TiKVSendReqHistogram) + prometheus.MustRegister(TiKVSendReqCounter) + prometheus.MustRegister(TiKVSendReqTimeCounter) prometheus.MustRegister(TiKVCoprocessorHistogram) prometheus.MustRegister(TiKVLockResolverCounter) prometheus.MustRegister(TiKVRegionErrorCounter) diff --git a/tikv/kv.go b/tikv/kv.go index f34803f65..476eb5a54 100644 --- a/tikv/kv.go +++ b/tikv/kv.go @@ -251,7 +251,7 @@ func (s *KVStore) runSafePointChecker() { // Begin a global transaction. func (s *KVStore) Begin(opts ...TxnOption) (*transaction.KVTxn, error) { - options := &txnOptions{} + options := &transaction.TxnOptions{} // Inject the options for _, opt := range opts { opt(options) @@ -260,18 +260,22 @@ func (s *KVStore) Begin(opts ...TxnOption) (*transaction.KVTxn, error) { if options.TxnScope == "" { options.TxnScope = oracle.GlobalTxnScope } + var ( + startTS uint64 + err error + ) if options.StartTS != nil { - snapshot := txnsnapshot.NewTiKVSnapshot(s, *options.StartTS, s.nextReplicaReadSeed()) - return transaction.NewTiKVTxn(s, snapshot, *options.StartTS, options.TxnScope) + startTS = *options.StartTS + } else { + bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) + startTS, err = s.getTimestampWithRetry(bo, options.TxnScope) + if err != nil { + return nil, err + } } - bo := retry.NewBackofferWithVars(context.Background(), transaction.TsoMaxBackoff, nil) - startTS, err := s.getTimestampWithRetry(bo, options.TxnScope) - if err != nil { - return nil, err - } snapshot := txnsnapshot.NewTiKVSnapshot(s, startTS, s.nextReplicaReadSeed()) - return transaction.NewTiKVTxn(s, snapshot, startTS, options.TxnScope) + return transaction.NewTiKVTxn(s, snapshot, startTS, options) } // DeleteRange delete all versions of all keys in the range[startKey,endKey) immediately. @@ -516,6 +520,7 @@ func (s *KVStore) safeTSUpdater() { t := time.NewTicker(time.Second * 2) defer t.Stop() ctx, cancel := context.WithCancel(s.ctx) + ctx = util.WithInternalSourceType(ctx, util.InternalTxnGC) defer cancel() for { select { @@ -540,7 +545,9 @@ func (s *KVStore) updateSafeTS(ctx context.Context) { resp, err := tikvClient.SendRequest(ctx, storeAddr, tikvrpc.NewRequest(tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{KeyRange: &kvrpcpb.KeyRange{ StartKey: []byte(""), EndKey: []byte(""), - }}), client.ReadTimeoutShort) + }}, kvrpcpb.Context{ + RequestSource: util.RequestSourceFromCtx(ctx), + }), client.ReadTimeoutShort) storeIDStr := strconv.Itoa(int(storeID)) if err != nil { metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc() @@ -602,26 +609,19 @@ func NewLockResolver(etcdAddrs []string, security config.Security, opts ...pd.Cl return s.lockResolver, nil } -// txnOptions indicates the option when beginning a transaction. -// txnOptions are set by the TxnOption values passed to Begin -type txnOptions struct { - TxnScope string - StartTS *uint64 -} - // TxnOption configures Transaction -type TxnOption func(*txnOptions) +type TxnOption func(*transaction.TxnOptions) // WithTxnScope sets the TxnScope to txnScope func WithTxnScope(txnScope string) TxnOption { - return func(st *txnOptions) { + return func(st *transaction.TxnOptions) { st.TxnScope = txnScope } } // WithStartTS sets the StartTS to startTS func WithStartTS(startTS uint64) TxnOption { - return func(st *txnOptions) { + return func(st *transaction.TxnOptions) { st.StartTS = &startTS } } diff --git a/tikv/split_region.go b/tikv/split_region.go index 6186c3041..413747a91 100644 --- a/tikv/split_region.go +++ b/tikv/split_region.go @@ -144,7 +144,8 @@ func (s *KVStore) batchSendSingleRegion(bo *Backoffer, batch kvrpc.Batch, scatte req := tikvrpc.NewRequest(tikvrpc.CmdSplitRegion, &kvrpcpb.SplitRegionRequest{ SplitKeys: batch.Keys, }, kvrpcpb.Context{ - Priority: kvrpcpb.CommandPri_Normal, + Priority: kvrpcpb.CommandPri_Normal, + RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), }) sender := locate.NewRegionRequestSender(s.regionCache, s.GetTiKVClient()) diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index 96070cd4e..1958878ee 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -647,7 +647,11 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error { return err } - commitDetail := &util.CommitDetails{WriteSize: size, WriteKeys: c.mutations.Len()} + commitDetail := &util.CommitDetails{ + WriteSize: size, + WriteKeys: c.mutations.Len(), + ResolveLock: util.ResolveLockDetail{}, + } metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys)) metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize)) c.hasNoNeedCommitKeys = checkCnt > 0 diff --git a/txnkv/transaction/cleanup.go b/txnkv/transaction/cleanup.go index bc6bd2609..bf53ee1b3 100644 --- a/txnkv/transaction/cleanup.go +++ b/txnkv/transaction/cleanup.go @@ -62,8 +62,13 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{ Keys: batch.mutations.GetKeys(), StartVersion: c.startTS, - }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, - MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + }) if c.resourceGroupTag == nil && c.resourceGroupTagger != nil { c.resourceGroupTagger(req) } diff --git a/txnkv/transaction/commit.go b/txnkv/transaction/commit.go index fda0d611c..88ccc4a13 100644 --- a/txnkv/transaction/commit.go +++ b/txnkv/transaction/commit.go @@ -70,9 +70,14 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, StartVersion: c.startTS, Keys: keys, CommitVersion: c.commitTS, - }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, - ResourceGroupTag: c.resourceGroupTag, DiskFullOpt: c.diskFullOpt, - MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.diskFullOpt, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + }) if c.resourceGroupTag == nil && c.resourceGroupTagger != nil { c.resourceGroupTagger(req) } diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 9dc5aad4a..81fb1d692 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -106,8 +106,13 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * ReturnValues: action.ReturnValues, CheckExistence: action.CheckExistence, MinCommitTs: c.forUpdateTS + 1, - }, kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: action.LockCtx.ResourceGroupTag, - MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: action.LockCtx.ResourceGroupTag, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + }) if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil { req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest)) } @@ -226,7 +231,6 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * } // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs - startTime = time.Now() if resolvingRecordToken == nil { token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) resolvingRecordToken = &token @@ -234,17 +238,19 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * } else { c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) } - msBeforeTxnExpired, err := c.store.GetLockResolver().ResolveLocks(bo, 0, locks) + resolveLockOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: 0, + Locks: locks, + Detail: &action.LockCtx.Stats.ResolveLock, + } + resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts) if err != nil { return err } - if action.LockCtx.Stats != nil { - atomic.AddInt64(&action.LockCtx.Stats.ResolveLockTime, int64(time.Since(startTime))) - } // If msBeforeTxnExpired is not zero, it means there are still locks blocking us acquiring // the pessimistic lock. We should return acquire fail with nowait set or timeout error if necessary. - if msBeforeTxnExpired > 0 { + if resolveLockRes.TTL > 0 { if action.LockWaitTime() == kv.LockNoWait { return errors.WithStack(tikverr.ErrLockAcquireFailAndNoWaitSet) } else if action.LockWaitTime() == kv.LockAlwaysWait { @@ -280,6 +286,7 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret ForUpdateTs: c.forUpdateTS, Keys: batch.mutations.GetKeys(), }) + req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx()) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) if err != nil { diff --git a/txnkv/transaction/prewrite.go b/txnkv/transaction/prewrite.go index b8be353d9..738fb74e0 100644 --- a/txnkv/transaction/prewrite.go +++ b/txnkv/transaction/prewrite.go @@ -170,9 +170,14 @@ func (c *twoPhaseCommitter) buildPrewriteRequest(batch batchMutations, txnSize u req.TryOnePc = true } - r := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, - kvrpcpb.Context{Priority: c.priority, SyncLog: c.syncLog, ResourceGroupTag: c.resourceGroupTag, - DiskFullOpt: c.diskFullOpt, MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds())}) + r := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, req, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: c.resourceGroupTag, + DiskFullOpt: c.diskFullOpt, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + }) if c.resourceGroupTag == nil && c.resourceGroupTagger != nil { c.resourceGroupTagger(r) } @@ -382,7 +387,6 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B } locks = append(locks, lock) } - start := time.Now() if resolvingRecordToken == nil { token := c.store.GetLockResolver().RecordResolvingLocks(locks, c.startTS) resolvingRecordToken = &token @@ -390,11 +394,16 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *retry.B } else { c.store.GetLockResolver().UpdateResolvingLocks(locks, c.startTS, *resolvingRecordToken) } - msBeforeExpired, err := c.store.GetLockResolver().ResolveLocks(bo, c.startTS, locks) + resolveLockOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: c.startTS, + Locks: locks, + Detail: &c.getDetail().ResolveLock, + } + resolveLockRes, err := c.store.GetLockResolver().ResolveLocksWithOpts(bo, resolveLockOpts) if err != nil { return err } - atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start))) + msBeforeExpired := resolveLockRes.TTL if msBeforeExpired > 0 { err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 22da17942..62fea02d9 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -77,6 +77,13 @@ type SchemaAmender interface { AmendTxn(ctx context.Context, startInfoSchema SchemaVer, change *RelatedSchemaChange, mutations CommitterMutations) (CommitterMutations, error) } +// TxnOptions indicates the option when beginning a transaction. +// TxnOptions are set by the TxnOption values passed to Begin +type TxnOptions struct { + TxnScope string + StartTS *uint64 +} + // KVTxn contains methods to interact with a TiKV transaction. type KVTxn struct { snapshot *txnsnapshot.KVSnapshot @@ -117,10 +124,11 @@ type KVTxn struct { // interceptor is used to decorate the RPC request logic related to the txn. interceptor interceptor.RPCInterceptor assertionLevel kvrpcpb.AssertionLevel + *util.RequestSource } // NewTiKVTxn creates a new KVTxn. -func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, scope string) (*KVTxn, error) { +func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, options *TxnOptions) (*KVTxn, error) { cfg := config.GetGlobalConfig() newTiKVTxn := &KVTxn{ snapshot: snapshot, @@ -130,10 +138,11 @@ func NewTiKVTxn(store kvstore, snapshot *txnsnapshot.KVSnapshot, startTS uint64, startTime: time.Now(), valid: true, vars: tikv.DefaultVars, - scope: scope, + scope: options.TxnScope, enableAsyncCommit: cfg.EnableAsyncCommit, enable1PC: cfg.Enable1PC, diskFullOpt: kvrpcpb.DiskFullOpt_NotAllowedOnFull, + RequestSource: snapshot.RequestSource, } return newTiKVTxn, nil } @@ -361,6 +370,8 @@ func (txn *KVTxn) Commit(ctx context.Context) error { } defer txn.close() + ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource) + if val, err := util.EvalFailpoint("mockCommitError"); err == nil && val.(bool) { if _, err := util.EvalFailpoint("mockCommitErrorOpt"); err == nil { failpoint.Disable("tikvclient/mockCommitErrorOpt") @@ -494,7 +505,8 @@ func (txn *KVTxn) rollbackPessimisticLocks() error { if txn.lockedCnt == 0 { return nil } - bo := retry.NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) + ctx := context.WithValue(context.Background(), util.RequestSourceKey, *txn.RequestSource) + bo := retry.NewBackofferWithVars(ctx, cleanupMaxBackoff, txn.vars) if txn.interceptor != nil { // User has called txn.SetRPCInterceptor() to explicitly set an interceptor, we // need to bind it to ctx so that the internal client can perceive and execute @@ -583,6 +595,7 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput // it before initiating an RPC request. ctx = interceptor.WithRPCInterceptor(ctx, txn.interceptor) } + ctx = context.WithValue(ctx, util.RequestSourceKey, *txn.RequestSource) // Exclude keys that are already locked. var err error keys := make([][]byte, 0, len(keysInput)) @@ -662,7 +675,8 @@ func (txn *KVTxn) LockKeys(ctx context.Context, lockCtx *tikv.LockCtx, keysInput } lockCtx.Stats = &util.LockKeysDetails{ - LockKeys: int32(len(keys)), + LockKeys: int32(len(keys)), + ResolveLock: util.ResolveLockDetail{}, } bo := retry.NewBackofferWithVars(ctx, pessimisticLockMaxBackoff, txn.vars) txn.committer.forUpdateTS = lockCtx.ForUpdateTS @@ -869,3 +883,13 @@ func (txn *KVTxn) SetBinlogExecutor(binlog BinlogExecutor) { func (txn *KVTxn) GetClusterID() uint64 { return txn.store.GetClusterID() } + +// SetRequestSourceInternal sets the scope of the request source. +func (txn *KVTxn) SetRequestSourceInternal(internal bool) { + txn.RequestSource.SetRequestSourceInternal(internal) +} + +// SetRequestSourceType sets the type of the request source. +func (txn *KVTxn) SetRequestSourceType(tp string) { + txn.RequestSource.SetRequestSourceType(tp) +} diff --git a/txnkv/txnlock/lock_resolver.go b/txnkv/txnlock/lock_resolver.go index 43d9a5837..b7e4c789b 100644 --- a/txnkv/txnlock/lock_resolver.go +++ b/txnkv/txnlock/lock_resolver.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "sync" + "sync/atomic" "time" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -283,7 +284,8 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo }) } - req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos}) + req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, &kvrpcpb.ResolveLockRequest{TxnInfos: listTxnInfos}, + kvrpcpb.Context{RequestSource: util.RequestSourceFromCtx(bo.GetCtx())}) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) startTime = time.Now() resp, err := lr.store.SendReq(bo, req, loc, client.ReadTimeoutShort) @@ -318,6 +320,27 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo return true, nil } +// ResolveLocksOptions is the options struct for calling resolving lock. +type ResolveLocksOptions struct { + CallerStartTS uint64 + Locks []*Lock + Lite bool + ForRead bool + Detail *util.ResolveLockDetail +} + +// ResolveLockResult is the result struct for resolving lock. +type ResolveLockResult struct { + TTL int64 + IgnoreLocks []uint64 + AccessLocks []uint64 +} + +// ResolveLocksWithOpts wraps ResolveLocks and ResolveLocksForRead, which extract the parameters into structs for better extension. +func (lr *LockResolver) ResolveLocksWithOpts(bo *retry.Backoffer, opts ResolveLocksOptions) (ResolveLockResult, error) { + return lr.resolveLocks(bo, opts) +} + // ResolveLocks tries to resolve Locks. The resolving process is in 3 steps: // 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too // old are considered orphan locks and will be handled later. If all locks @@ -328,15 +351,26 @@ func (lr *LockResolver) BatchResolveLocks(bo *retry.Backoffer, locks []*Lock, lo // 3) Send `ResolveLock` cmd to the lock's region to resolve all locks belong to // the same transaction. func (lr *LockResolver) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock) (int64, error) { - ttl, _, _, err := lr.resolveLocks(bo, callerStartTS, locks, false, false) - return ttl, err + opts := ResolveLocksOptions{ + CallerStartTS: callerStartTS, + Locks: locks, + } + res, err := lr.resolveLocks(bo, opts) + return res.TTL, err } // ResolveLocksForRead is essentially the same as ResolveLocks, except with some optimizations for read. // Read operations needn't wait for resolve secondary locks and can read through(the lock's transaction is committed // and its commitTS is less than or equal to callerStartTS) or ignore(the lock's transaction is rolled back or its minCommitTS is pushed) the lock . func (lr *LockResolver) ResolveLocksForRead(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) { - return lr.resolveLocks(bo, callerStartTS, locks, true, lite) + opts := ResolveLocksOptions{ + CallerStartTS: callerStartTS, + Locks: locks, + Lite: lite, + ForRead: true, + } + res, err := lr.resolveLocks(bo, opts) + return res.TTL, res.IgnoreLocks, res.AccessLocks, err } // RecordResolvingLocks records a txn which startTS is callerStartTS tries to resolve locks @@ -378,22 +412,33 @@ func (lr *LockResolver) ResolveLocksDone(callerStartTS uint64, token int) { lr.mu.Unlock() } -func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*Lock, forRead bool, lite bool) (int64, []uint64 /* canIgnore */, []uint64 /* canAccess */, error) { +func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, opts ResolveLocksOptions) (ResolveLockResult, error) { + callerStartTS, locks, forRead, lite, detail := opts.CallerStartTS, opts.Locks, opts.ForRead, opts.Lite, opts.Detail if lr.testingKnobs.meetLock != nil { lr.testingKnobs.meetLock(locks) } var msBeforeTxnExpired txnExpireTime if len(locks) == 0 { - return msBeforeTxnExpired.value(), nil, nil, nil + return ResolveLockResult{ + TTL: msBeforeTxnExpired.value(), + }, nil } metrics.LockResolverCountWithResolve.Inc() + // This is the origin resolve lock time. + // TODO(you06): record the more details and calculate the total time by calculating the sum of details. + if detail != nil { + startTime := time.Now() + defer func() { + atomic.AddInt64(&detail.ResolveLockTime, int64(time.Since(startTime))) + }() + } // TxnID -> []Region, record resolved Regions. // TODO: Maybe put it in LockResolver and share by all txns. cleanTxns := make(map[uint64]map[locate.RegionVerID]struct{}) var resolve func(*Lock, bool) (TxnStatus, error) resolve = func(l *Lock, forceSyncCommit bool) (TxnStatus, error) { - status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit) + status, err := lr.getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit, detail) if err != nil { return TxnStatus{}, err } @@ -426,7 +471,8 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, err = lr.resolvePessimisticLock(bo, l) } else { if forRead { - asyncBo := retry.NewBackoffer(lr.asyncResolveCtx, asyncResolveLockMaxBackoff) + asyncCtx := context.WithValue(lr.asyncResolveCtx, util.RequestSourceKey, bo.GetCtx().Value(util.RequestSourceKey)) + asyncBo := retry.NewBackoffer(asyncCtx, asyncResolveLockMaxBackoff) go func() { // Pass an empty cleanRegions here to avoid data race and // let `reqCollapse` deduplicate identical resolve requests. @@ -448,7 +494,9 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, status, err := resolve(l, false) if err != nil { msBeforeTxnExpired.update(0) - return msBeforeTxnExpired.value(), nil, nil, err + return ResolveLockResult{ + TTL: msBeforeTxnExpired.value(), + }, err } if !forRead { if status.ttl != 0 { @@ -478,7 +526,11 @@ func (lr *LockResolver) resolveLocks(bo *retry.Backoffer, callerStartTS uint64, if msBeforeTxnExpired.value() > 0 { metrics.LockResolverCountWithWaitExpired.Inc() } - return msBeforeTxnExpired.value(), canIgnore, canAccess, nil + return ResolveLockResult{ + TTL: msBeforeTxnExpired.value(), + IgnoreLocks: canIgnore, + AccessLocks: canAccess, + }, nil } // Resolving returns the locks' information we are resolving currently. @@ -541,7 +593,7 @@ func (lr *LockResolver) GetTxnStatus(txnID uint64, callerStartTS uint64, primary return lr.getTxnStatus(bo, txnID, primary, callerStartTS, currentTS, true, false, nil) } -func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) { +func (lr *LockResolver) getTxnStatusFromLock(bo *retry.Backoffer, l *Lock, callerStartTS uint64, forceSyncCommit bool, detail *util.ResolveLockDetail) (TxnStatus, error) { var currentTS uint64 var err error var status TxnStatus @@ -649,6 +701,8 @@ func (lr *LockResolver) getTxnStatus(bo *retry.Backoffer, txnID uint64, primary RollbackIfNotExist: rollbackIfNotExist, ForceSyncCommit: forceSyncCommit, ResolvingPessimisticLock: resolvingPessimisticLock, + }, kvrpcpb.Context{ + RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), }) for { loc, err := lr.store.GetRegionCache().LocateKey(bo, primary) @@ -790,7 +844,9 @@ func (lr *LockResolver) checkSecondaries(bo *retry.Backoffer, txnID uint64, curK Keys: curKeys, StartVersion: txnID, } - req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq) + req := tikvrpc.NewRequest(tikvrpc.CmdCheckSecondaryLocks, checkReq, kvrpcpb.Context{ + RequestSource: util.RequestSourceFromCtx(bo.GetCtx()), + }) metrics.LockResolverCountWithQueryCheckSecondaryLocks.Inc() req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := lr.store.SendReq(bo, req, curRegionID, client.ReadTimeoutShort) @@ -1022,6 +1078,7 @@ func (lr *LockResolver) resolveLock(bo *retry.Backoffer, l *Lock, status TxnStat } req := tikvrpc.NewRequest(tikvrpc.CmdResolveLock, lreq) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) + req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx()) resp, err := lr.store.SendReq(bo, req, loc.Region, client.ReadTimeoutShort) if err != nil { return err diff --git a/txnkv/txnlock/test_probe.go b/txnkv/txnlock/test_probe.go index 3b713b2ec..f65a60978 100644 --- a/txnkv/txnlock/test_probe.go +++ b/txnkv/txnlock/test_probe.go @@ -69,7 +69,7 @@ func (l LockResolverProbe) GetTxnStatus(bo *retry.Backoffer, txnID uint64, prima // GetTxnStatusFromLock queries tikv for a txn's status. func (l LockResolverProbe) GetTxnStatusFromLock(bo *retry.Backoffer, lock *Lock, callerStartTS uint64, forceSyncCommit bool) (TxnStatus, error) { - return l.getTxnStatusFromLock(bo, lock, callerStartTS, forceSyncCommit) + return l.getTxnStatusFromLock(bo, lock, callerStartTS, forceSyncCommit, nil) } // GetSecondariesFromTxnStatus returns the secondary locks from txn status. diff --git a/txnkv/txnsnapshot/client_helper.go b/txnkv/txnsnapshot/client_helper.go index ea699b559..34a6636d5 100644 --- a/txnkv/txnsnapshot/client_helper.go +++ b/txnkv/txnsnapshot/client_helper.go @@ -77,6 +77,28 @@ func NewClientHelper(store kvstore, resolvedLocks *util.TSSet, committedLocks *u } } +// ResolveLocksWithOpts wraps the ResolveLocksWithOpts function and store the resolved result. +func (ch *ClientHelper) ResolveLocksWithOpts(bo *retry.Backoffer, opts txnlock.ResolveLocksOptions) (txnlock.ResolveLockResult, error) { + if ch.Stats != nil { + defer func(start time.Time) { + locate.RecordRegionRequestRuntimeStats(ch.Stats, tikvrpc.CmdResolveLock, time.Since(start)) + }(time.Now()) + } + opts.ForRead = true + opts.Lite = ch.resolveLite + res, err := ch.lockResolver.ResolveLocksWithOpts(bo, opts) + if err != nil { + return res, err + } + if len(res.IgnoreLocks) > 0 { + ch.resolvedLocks.Put(res.IgnoreLocks...) + } + if len(res.AccessLocks) > 0 { + ch.committedLocks.Put(res.AccessLocks...) + } + return res, nil +} + // ResolveLocks wraps the ResolveLocks function and store the resolved result. func (ch *ClientHelper) ResolveLocks(bo *retry.Backoffer, callerStartTS uint64, locks []*txnlock.Lock) (int64, error) { if ch.Stats != nil { diff --git a/txnkv/txnsnapshot/scan.go b/txnkv/txnsnapshot/scan.go index 84a2dbd0e..783d93934 100644 --- a/txnkv/txnsnapshot/scan.go +++ b/txnkv/txnsnapshot/scan.go @@ -230,6 +230,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { NotFillCache: s.snapshot.notFillCache, IsolationLevel: s.snapshot.isolationLevel.ToPB(), ResourceGroupTag: s.snapshot.mu.resourceGroupTag, + RequestSource: s.snapshot.GetRequestSource(), }, StartKey: s.nextStartKey, EndKey: reqEndKey, @@ -250,6 +251,7 @@ func (s *Scanner) getData(bo *retry.Backoffer) error { TaskId: s.snapshot.mu.taskID, ResourceGroupTag: s.snapshot.mu.resourceGroupTag, IsolationLevel: s.snapshot.isolationLevel.ToPB(), + RequestSource: s.snapshot.GetRequestSource(), }) if s.snapshot.mu.resourceGroupTag == nil && s.snapshot.mu.resourceGroupTagger != nil { s.snapshot.mu.resourceGroupTagger(req) diff --git a/txnkv/txnsnapshot/snapshot.go b/txnkv/txnsnapshot/snapshot.go index 5b784390a..38b861297 100644 --- a/txnkv/txnsnapshot/snapshot.go +++ b/txnkv/txnsnapshot/snapshot.go @@ -141,6 +141,7 @@ type KVSnapshot struct { interceptor interceptor.RPCInterceptor } sampleStep uint32 + *util.RequestSource } // NewTiKVSnapshot creates a snapshot of an TiKV store. @@ -157,6 +158,7 @@ func NewTiKVSnapshot(store kvstore, ts uint64, replicaReadSeed uint32) *KVSnapsh priority: txnutil.PriorityNormal, vars: kv.DefaultVars, replicaReadSeed: replicaReadSeed, + RequestSource: &util.RequestSource{}, } } @@ -206,6 +208,9 @@ func (s *KVSnapshot) BatchGet(ctx context.Context, keys [][]byte) (map[string][] } ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) + if ctx.Value(util.RequestSourceKey) == nil { + ctx = context.WithValue(ctx, util.RequestSourceKey, *s.RequestSource) + } bo := retry.NewBackofferWithVars(ctx, batchGetMaxBackoff, s.vars) s.mu.RLock() if s.mu.interceptor != nil { @@ -368,6 +373,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, TaskId: s.mu.taskID, ResourceGroupTag: s.mu.resourceGroupTag, IsolationLevel: s.isolationLevel.ToPB(), + RequestSource: s.GetRequestSource(), }) if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) @@ -458,7 +464,13 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys, } else { cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken) } - msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks) + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: s.version, + Locks: locks, + Detail: s.getResolveLockDetail(), + } + resolveLocksRes, err := cli.ResolveLocksWithOpts(bo, resolveLocksOpts) + msBeforeExpired := resolveLocksRes.TTL if err != nil { return err } @@ -488,6 +500,9 @@ func (s *KVSnapshot) Get(ctx context.Context, k []byte) ([]byte, error) { }(time.Now()) ctx = context.WithValue(ctx, retry.TxnStartKey, s.version) + if ctx.Value(util.RequestSourceKey) == nil { + ctx = context.WithValue(ctx, util.RequestSourceKey, *s.RequestSource) + } bo := retry.NewBackofferWithVars(ctx, getMaxBackoff, s.vars) s.mu.RLock() if s.mu.interceptor != nil { @@ -523,7 +538,6 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] return value, nil } } - s.mu.RUnlock() if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context())) defer span1.Finish() @@ -531,13 +545,13 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] } if _, err := util.EvalFailpoint("snapshot-get-cache-fail"); err == nil { if bo.GetCtx().Value("TestSnapshotCache") != nil { + s.mu.RUnlock() panic("cache miss") } } cli := NewClientHelper(s.store, &s.resolvedLocks, &s.committedLocks, true) - s.mu.RLock() if s.mu.stats != nil { cli.Stats = make(map[tikvrpc.CmdType]*locate.RPCRuntimeStats) defer func() { @@ -554,6 +568,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] TaskId: s.mu.taskID, ResourceGroupTag: s.mu.resourceGroupTag, IsolationLevel: s.isolationLevel.ToPB(), + RequestSource: s.GetRequestSource(), }) if s.mu.resourceGroupTag == nil && s.mu.resourceGroupTagger != nil { s.mu.resourceGroupTagger(req) @@ -634,10 +649,16 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([] } else { cli.UpdateResolvingLocks(locks, s.version, *resolvingRecordToken) } - msBeforeExpired, err := cli.ResolveLocks(bo, s.version, locks) + resolveLocksOpts := txnlock.ResolveLocksOptions{ + CallerStartTS: s.version, + Locks: locks, + Detail: s.getResolveLockDetail(), + } + resolveLocksRes, err := cli.ResolveLocksWithOpts(bo, resolveLocksOpts) if err != nil { return nil, err } + msBeforeExpired := resolveLocksRes.TTL if msBeforeExpired > 0 { err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(keyErr.String())) if err != nil { @@ -656,8 +677,13 @@ func (s *KVSnapshot) mergeExecDetail(detail *kvrpcpb.ExecDetailsV2) { if detail == nil || s.mu.stats == nil { return } + if s.mu.stats.resolveLockDetail == nil { + s.mu.stats.resolveLockDetail = &util.ResolveLockDetail{} + } if s.mu.stats.scanDetail == nil { - s.mu.stats.scanDetail = &util.ScanDetail{} + s.mu.stats.scanDetail = &util.ScanDetail{ + ResolveLock: s.mu.stats.resolveLockDetail, + } } if s.mu.stats.timeDetail == nil { s.mu.stats.timeDetail = &util.TimeDetail{} @@ -859,13 +885,23 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*locate.R } } +func (s *KVSnapshot) getResolveLockDetail() *util.ResolveLockDetail { + s.mu.RLock() + defer s.mu.RUnlock() + if s.mu.stats == nil { + return nil + } + return s.mu.stats.resolveLockDetail +} + // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { - rpcStats locate.RegionRequestRuntimeStats - backoffSleepMS map[string]int - backoffTimes map[string]int - scanDetail *util.ScanDetail - timeDetail *util.TimeDetail + rpcStats locate.RegionRequestRuntimeStats + backoffSleepMS map[string]int + backoffTimes map[string]int + scanDetail *util.ScanDetail + timeDetail *util.TimeDetail + resolveLockDetail *util.ResolveLockDetail } // Clone implements the RuntimeStats interface. diff --git a/util/execdetails.go b/util/execdetails.go index 10171b5a7..db16c5c18 100644 --- a/util/execdetails.go +++ b/util/execdetails.go @@ -72,11 +72,11 @@ type CommitDetails struct { CommitBackoffTime int64 BackoffTypes []string } - ResolveLockTime int64 WriteKeys int WriteSize int PrewriteRegionNum int32 TxnRetry int + ResolveLock ResolveLockDetail } // Merge merges commit details into itself. @@ -86,7 +86,7 @@ func (cd *CommitDetails) Merge(other *CommitDetails) { cd.WaitPrewriteBinlogTime += other.WaitPrewriteBinlogTime cd.CommitTime += other.CommitTime cd.LocalLatchTime += other.LocalLatchTime - cd.ResolveLockTime += other.ResolveLockTime + cd.ResolveLock.ResolveLockTime += other.ResolveLock.ResolveLockTime cd.WriteKeys += other.WriteKeys cd.WriteSize += other.WriteSize cd.PrewriteRegionNum += other.PrewriteRegionNum @@ -103,11 +103,11 @@ func (cd *CommitDetails) Clone() *CommitDetails { WaitPrewriteBinlogTime: cd.WaitPrewriteBinlogTime, CommitTime: cd.CommitTime, LocalLatchTime: cd.LocalLatchTime, - ResolveLockTime: cd.ResolveLockTime, WriteKeys: cd.WriteKeys, WriteSize: cd.WriteSize, PrewriteRegionNum: cd.PrewriteRegionNum, TxnRetry: cd.TxnRetry, + ResolveLock: cd.ResolveLock, } commit.Mu.BackoffTypes = append([]string{}, cd.Mu.BackoffTypes...) commit.Mu.CommitBackoffTime = cd.Mu.CommitBackoffTime @@ -116,12 +116,12 @@ func (cd *CommitDetails) Clone() *CommitDetails { // LockKeysDetails contains pessimistic lock keys detail information. type LockKeysDetails struct { - TotalTime time.Duration - RegionNum int32 - LockKeys int32 - ResolveLockTime int64 - BackoffTime int64 - Mu struct { + TotalTime time.Duration + RegionNum int32 + LockKeys int32 + ResolveLock ResolveLockDetail + BackoffTime int64 + Mu struct { sync.Mutex BackoffTypes []string } @@ -135,7 +135,7 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) { ld.TotalTime += lockKey.TotalTime ld.RegionNum += lockKey.RegionNum ld.LockKeys += lockKey.LockKeys - ld.ResolveLockTime += lockKey.ResolveLockTime + ld.ResolveLock.ResolveLockTime += lockKey.ResolveLock.ResolveLockTime ld.BackoffTime += lockKey.BackoffTime ld.LockRPCTime += lockKey.LockRPCTime ld.LockRPCCount += ld.LockRPCCount @@ -146,14 +146,14 @@ func (ld *LockKeysDetails) Merge(lockKey *LockKeysDetails) { // Clone returns a deep copy of itself. func (ld *LockKeysDetails) Clone() *LockKeysDetails { lock := &LockKeysDetails{ - TotalTime: ld.TotalTime, - RegionNum: ld.RegionNum, - LockKeys: ld.LockKeys, - ResolveLockTime: ld.ResolveLockTime, - BackoffTime: ld.BackoffTime, - LockRPCTime: ld.LockRPCTime, - LockRPCCount: ld.LockRPCCount, - RetryCount: ld.RetryCount, + TotalTime: ld.TotalTime, + RegionNum: ld.RegionNum, + LockKeys: ld.LockKeys, + BackoffTime: ld.BackoffTime, + LockRPCTime: ld.LockRPCTime, + LockRPCCount: ld.LockRPCCount, + RetryCount: ld.RetryCount, + ResolveLock: ld.ResolveLock, } lock.Mu.BackoffTypes = append([]string{}, ld.Mu.BackoffTypes...) return lock @@ -228,6 +228,7 @@ type ScanDetail struct { RocksdbBlockReadCount uint64 // RocksdbBlockReadByte is the total number of bytes from block reads. RocksdbBlockReadByte uint64 + ResolveLock *ResolveLockDetail } // Merge merges scan detail execution details into self. @@ -330,3 +331,15 @@ func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) { td.KvReadWallTimeMs += time.Duration(timeDetail.KvReadWallTimeMs) * time.Millisecond } } + +// ResolveLockDetail contains the resolve lock detail information. +type ResolveLockDetail struct { + // ResolveLockTime is the total duration of resolving lock. + ResolveLockTime int64 + // TODO(you06): add more details of resolving locks. +} + +// Merge merges resolve lock detail details into self. +func (rd *ResolveLockDetail) Merge(resolveLock *ResolveLockDetail) { + rd.ResolveLockTime += resolveLock.ResolveLockTime +} diff --git a/util/request_source.go b/util/request_source.go new file mode 100644 index 000000000..969169611 --- /dev/null +++ b/util/request_source.go @@ -0,0 +1,82 @@ +package util + +import ( + "context" +) + +// RequestSourceTypeKeyType is a dummy type to avoid naming collision in context. +type RequestSourceTypeKeyType struct{} + +// RequestSourceTypeKey is used as the key of request source type in context. +var RequestSourceTypeKey = RequestSourceTypeKeyType{} + +// RequestSourceKeyType is a dummy type to avoid naming collision in context. +type RequestSourceKeyType struct{} + +// RequestSourceKey is used as the key of request source type in context. +var RequestSourceKey = RequestSourceKeyType{} + +const ( + // InternalTxnOthers is the type of requests that consume low resources. + // This reduces the size of metrics. + InternalTxnOthers = "others" + // InternalTxnGC is the type of GC txn. + InternalTxnGC = "gc" + // InternalTxnMeta is the type of the miscellaneous meta usage. + InternalTxnMeta = InternalTxnOthers +) + +const ( + // InternalRequest is the scope of internal queries + InternalRequest = "internal_" + // ExternalRequest is the scope of external queries + ExternalRequest = "external_" + // SourceUnknown keeps same with the default value(empty string) + SourceUnknown = "unknown" +) + +// RequestSource contains the source label of the request, used for tracking resource consuming. +type RequestSource struct { + RequestSourceInternal bool + RequestSourceType string +} + +// SetRequestSourceInternal sets the scope of the request source. +func (r *RequestSource) SetRequestSourceInternal(internal bool) { + r.RequestSourceInternal = internal +} + +// SetRequestSourceType sets the type of the request source. +func (r *RequestSource) SetRequestSourceType(tp string) { + r.RequestSourceType = tp +} + +// WithInternalSourceType create context with internal source. +func WithInternalSourceType(ctx context.Context, source string) context.Context { + return context.WithValue(ctx, RequestSourceKey, RequestSource{ + RequestSourceInternal: true, + RequestSourceType: source, + }) +} + +// GetRequestSource gets the request_source field of the request. +func (r *RequestSource) GetRequestSource() string { + // if r.RequestSourceType is not set, it's mostly possible that r.RequestSourceInternal is not set + // to avoid internal requests be marked as external(default value), return unknown source here. + if r == nil || r.RequestSourceType == "" { + return SourceUnknown + } + if r.RequestSourceInternal { + return InternalRequest + r.RequestSourceType + } + return ExternalRequest + r.RequestSourceType +} + +// RequestSourceFromCtx extract source from passed context. +func RequestSourceFromCtx(ctx context.Context) string { + if source := ctx.Value(RequestSourceKey); source != nil { + rs := source.(RequestSource) + return rs.GetRequestSource() + } + return SourceUnknown +} From 947d923945fddf5e93475afc09bc136ed3ffc418 Mon Sep 17 00:00:00 2001 From: orion <583373805@qq.com> Date: Mon, 27 Jun 2022 14:35:00 +0800 Subject: [PATCH 2/2] fix:move tikv.NewTxnClient to txnk.NewClient (#534) Signed-off-by: 583373805 <583373805@qq.com> --- examples/gcworker/gcworker.go | 4 ++-- examples/txnkv/1pc_txn/1pc_txn.go | 6 +++--- examples/txnkv/async_commit/async_commit.go | 6 +++--- examples/txnkv/delete_range/delete_range.go | 6 +++--- examples/txnkv/pessimistic_txn/pessimistic_txn.go | 5 ++--- examples/txnkv/unsafedestoryrange/unsafedestoryrange.go | 6 +++--- 6 files changed, 16 insertions(+), 17 deletions(-) diff --git a/examples/gcworker/gcworker.go b/examples/gcworker/gcworker.go index d3bc3ef28..39da00df0 100644 --- a/examples/gcworker/gcworker.go +++ b/examples/gcworker/gcworker.go @@ -21,7 +21,7 @@ import ( "time" "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" ) var ( @@ -31,7 +31,7 @@ var ( func main() { flag.Parse() - client, err := tikv.NewTxnClient([]string{*pdAddr}) + client, err := txnkv.NewClient([]string{*pdAddr}) if err != nil { panic(err) } diff --git a/examples/txnkv/1pc_txn/1pc_txn.go b/examples/txnkv/1pc_txn/1pc_txn.go index 029e5915f..288052ef7 100644 --- a/examples/txnkv/1pc_txn/1pc_txn.go +++ b/examples/txnkv/1pc_txn/1pc_txn.go @@ -20,18 +20,18 @@ import ( "fmt" "os" - "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" ) var ( - client *tikv.KVStore + client *txnkv.Client pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address") ) // Init initializes information. func initStore() { var err error - client, err = tikv.NewTxnClient([]string{*pdAddr}) + client, err = txnkv.NewClient([]string{*pdAddr}) if err != nil { panic(err) } diff --git a/examples/txnkv/async_commit/async_commit.go b/examples/txnkv/async_commit/async_commit.go index 8826f8201..abcd98d6f 100644 --- a/examples/txnkv/async_commit/async_commit.go +++ b/examples/txnkv/async_commit/async_commit.go @@ -20,19 +20,19 @@ import ( "fmt" "os" - "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" "github.com/tikv/client-go/v2/util" ) var ( - client *tikv.KVStore + client *txnkv.Client pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address") ) // Init initializes information. func initStore() { var err error - client, err = tikv.NewTxnClient([]string{*pdAddr}) + client, err = txnkv.NewClient([]string{*pdAddr}) if err != nil { panic(err) } diff --git a/examples/txnkv/delete_range/delete_range.go b/examples/txnkv/delete_range/delete_range.go index f61673ffb..aae251c4f 100644 --- a/examples/txnkv/delete_range/delete_range.go +++ b/examples/txnkv/delete_range/delete_range.go @@ -22,18 +22,18 @@ import ( "math/rand" "os" - "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" ) var ( - client *tikv.KVStore + client *txnkv.Client pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address") ) // Init initializes information. func initStore() { var err error - client, err = tikv.NewTxnClient([]string{*pdAddr}) + client, err = txnkv.NewClient([]string{*pdAddr}) panicWhenErrNotNil(err) } diff --git a/examples/txnkv/pessimistic_txn/pessimistic_txn.go b/examples/txnkv/pessimistic_txn/pessimistic_txn.go index 8b612fcae..943bd7ded 100644 --- a/examples/txnkv/pessimistic_txn/pessimistic_txn.go +++ b/examples/txnkv/pessimistic_txn/pessimistic_txn.go @@ -21,7 +21,6 @@ import ( "os" "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" ) @@ -35,14 +34,14 @@ func (kv KV) String() string { } var ( - client *tikv.KVStore + client *txnkv.Client pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address") ) // Init initializes information. func initStore() { var err error - client, err = tikv.NewTxnClient([]string{*pdAddr}) + client, err = txnkv.NewClient([]string{*pdAddr}) if err != nil { panic(err) } diff --git a/examples/txnkv/unsafedestoryrange/unsafedestoryrange.go b/examples/txnkv/unsafedestoryrange/unsafedestoryrange.go index 8202c643d..1bcfbc99f 100644 --- a/examples/txnkv/unsafedestoryrange/unsafedestoryrange.go +++ b/examples/txnkv/unsafedestoryrange/unsafedestoryrange.go @@ -22,18 +22,18 @@ import ( "math/rand" "os" - "github.com/tikv/client-go/v2/tikv" + "github.com/tikv/client-go/v2/txnkv" ) var ( - client *tikv.KVStore + client *txnkv.Client pdAddr = flag.String("pd", "127.0.0.1:2379", "pd address") ) // Init initializes information. func initStore() { var err error - client, err = tikv.NewTxnClient([]string{*pdAddr}) + client, err = txnkv.NewClient([]string{*pdAddr}) panicWhenErrNotNil(err) }