Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support time detail v2 #729

Merged
merged 7 commits into from
Mar 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,22 +555,29 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
counter.(sendReqCounterCacheValue).counter.Inc()
counter.(sendReqCounterCacheValue).timeCounter.Add(secs)

if execDetail := resp.GetExecDetailsV2(); execDetail != nil &&
execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
if execDetail := resp.GetExecDetailsV2(); execDetail != nil {
var totalRpcWallTimeNs uint64
if execDetail.TimeDetailV2 != nil {
totalRpcWallTimeNs = execDetail.TimeDetailV2.TotalRpcWallTimeNs
} else if execDetail.TimeDetail != nil {
totalRpcWallTimeNs = execDetail.TimeDetail.TotalRpcWallTimeNs
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
if totalRpcWallTimeNs > 0 {
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
}
latency := elapsed - time.Duration(totalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
}
}

Expand Down Expand Up @@ -876,17 +883,26 @@ func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo {
return nil
}

td := details.TimeDetail
td := details.TimeDetailV2
tdOld := details.TimeDetail
sd := details.ScanDetailV2
wd := details.WriteDetail

if td == nil {
if td == nil && tdOld == nil {
return nil
}

spanRPC := spanInfo{name: "tikv.RPC", dur: td.TotalRpcWallTimeNs}
spanWait := spanInfo{name: "tikv.Wait", dur: td.WaitWallTimeMs * uint64(time.Millisecond)}
spanProcess := spanInfo{name: "tikv.Process", dur: td.ProcessWallTimeMs * uint64(time.Millisecond)}
var spanRPC, spanWait, spanProcess spanInfo
if td != nil {
spanRPC = spanInfo{name: "tikv.RPC", dur: td.TotalRpcWallTimeNs}
spanWait = spanInfo{name: "tikv.Wait", dur: td.WaitWallTimeNs}
spanProcess = spanInfo{name: "tikv.Process", dur: td.ProcessWallTimeNs}
} else if tdOld != nil {
// TimeDetail is deprecated, will be removed in future version.
spanRPC = spanInfo{name: "tikv.RPC", dur: tdOld.TotalRpcWallTimeNs}
spanWait = spanInfo{name: "tikv.Wait", dur: tdOld.WaitWallTimeMs * uint64(time.Millisecond)}
spanProcess = spanInfo{name: "tikv.Process", dur: tdOld.ProcessWallTimeMs * uint64(time.Millisecond)}
}

if sd != nil {
spanWait.children = append(spanWait.children, spanInfo{name: "tikv.GetSnapshot", dur: sd.GetSnapshotNanos})
Expand All @@ -896,6 +912,10 @@ func buildSpanInfoFromResp(resp *tikvrpc.Response) *spanInfo {
}

spanRPC.children = append(spanRPC.children, spanWait, spanProcess)
if td != nil {
spanSuspend := spanInfo{name: "tikv.Suspend", dur: td.ProcessSuspendWallTimeNs}
spanRPC.children = append(spanRPC.children, spanSuspend)
}

if wd != nil {
spanAsyncWrite := spanInfo{
Expand Down
44 changes: 25 additions & 19 deletions internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,72 +515,78 @@ func TestTraceExecDetails(t *testing.T) {
}{
{
&kvrpcpb.ExecDetailsV2{
TimeDetail: &kvrpcpb.TimeDetail{TotalRpcWallTimeNs: uint64(time.Second)},
TimeDetailV2: &kvrpcpb.TimeDetailV2{TotalRpcWallTimeNs: uint64(time.Second)},
},
"tikv.RPC[1s]{ tikv.Wait tikv.Process }",
"tikv.RPC[1s]{ tikv.Wait tikv.Process tikv.Suspend }",
"[00.000,01.000] tikv.RPC",
},
{
&kvrpcpb.ExecDetailsV2{
TimeDetail: &kvrpcpb.TimeDetail{
TotalRpcWallTimeNs: uint64(time.Second),
WaitWallTimeMs: 100,
ProcessWallTimeMs: 500,
TimeDetailV2: &kvrpcpb.TimeDetailV2{
TotalRpcWallTimeNs: uint64(time.Second),
WaitWallTimeNs: 100000000,
ProcessWallTimeNs: 500000000,
ProcessSuspendWallTimeNs: 50000000,
},
},
"tikv.RPC[1s]{ tikv.Wait[100ms] tikv.Process[500ms] }",
"tikv.RPC[1s]{ tikv.Wait[100ms] tikv.Process[500ms] tikv.Suspend[50ms] }",
strings.Join([]string{
"[00.000,00.100] tikv.Wait",
"[00.100,00.600] tikv.Process",
"[00.600,00.650] tikv.Suspend",
"[00.000,01.000] tikv.RPC",
}, "\n"),
},
{
&kvrpcpb.ExecDetailsV2{
TimeDetail: &kvrpcpb.TimeDetail{
TotalRpcWallTimeNs: uint64(time.Second),
WaitWallTimeMs: 100,
ProcessWallTimeMs: 500,
TimeDetailV2: &kvrpcpb.TimeDetailV2{
TotalRpcWallTimeNs: uint64(time.Second),
WaitWallTimeNs: 100000000,
ProcessWallTimeNs: 500000000,
ProcessSuspendWallTimeNs: 50000000,
},
ScanDetailV2: &kvrpcpb.ScanDetailV2{
GetSnapshotNanos: uint64(80 * time.Millisecond),
RocksdbBlockReadNanos: uint64(200 * time.Millisecond),
},
},
"tikv.RPC[1s]{ tikv.Wait[100ms]{ tikv.GetSnapshot[80ms] } tikv.Process[500ms]{ tikv.RocksDBBlockRead[200ms] } }",
"tikv.RPC[1s]{ tikv.Wait[100ms]{ tikv.GetSnapshot[80ms] } tikv.Process[500ms]{ tikv.RocksDBBlockRead[200ms] } tikv.Suspend[50ms] }",
strings.Join([]string{
"[00.000,00.080] tikv.GetSnapshot",
"[00.000,00.100] tikv.Wait",
"[00.100,00.300] tikv.RocksDBBlockRead",
"[00.100,00.600] tikv.Process",
"[00.600,00.650] tikv.Suspend",
"[00.000,01.000] tikv.RPC",
}, "\n"),
},
{
// WriteDetail hides RocksDBBlockRead
&kvrpcpb.ExecDetailsV2{
TimeDetail: &kvrpcpb.TimeDetail{
TotalRpcWallTimeNs: uint64(time.Second),
WaitWallTimeMs: 100,
ProcessWallTimeMs: 500,
TimeDetailV2: &kvrpcpb.TimeDetailV2{
TotalRpcWallTimeNs: uint64(time.Second),
WaitWallTimeNs: 100000000,
ProcessWallTimeNs: 500000000,
ProcessSuspendWallTimeNs: 50000000,
},
ScanDetailV2: &kvrpcpb.ScanDetailV2{
GetSnapshotNanos: uint64(80 * time.Millisecond),
RocksdbBlockReadNanos: uint64(200 * time.Millisecond),
},
WriteDetail: &kvrpcpb.WriteDetail{},
},
"tikv.RPC[1s]{ tikv.Wait[100ms]{ tikv.GetSnapshot[80ms] } tikv.Process[500ms] tikv.AsyncWrite{ tikv.StoreBatchWait tikv.ProposeSendWait tikv.PersistLog'{ tikv.RaftDBWriteWait tikv.RaftDBWriteWAL tikv.RaftDBWriteMemtable } tikv.CommitLog tikv.ApplyBatchWait tikv.ApplyLog{ tikv.ApplyMutexLock tikv.ApplyWriteLeaderWait tikv.ApplyWriteWAL tikv.ApplyWriteMemtable } } }",
"tikv.RPC[1s]{ tikv.Wait[100ms]{ tikv.GetSnapshot[80ms] } tikv.Process[500ms] tikv.Suspend[50ms] tikv.AsyncWrite{ tikv.StoreBatchWait tikv.ProposeSendWait tikv.PersistLog'{ tikv.RaftDBWriteWait tikv.RaftDBWriteWAL tikv.RaftDBWriteMemtable } tikv.CommitLog tikv.ApplyBatchWait tikv.ApplyLog{ tikv.ApplyMutexLock tikv.ApplyWriteLeaderWait tikv.ApplyWriteWAL tikv.ApplyWriteMemtable } } }",
strings.Join([]string{
"[00.000,00.080] tikv.GetSnapshot",
"[00.000,00.100] tikv.Wait",
"[00.100,00.600] tikv.Process",
"[00.600,00.650] tikv.Suspend",
"[00.000,01.000] tikv.RPC",
}, "\n"),
},
{
&kvrpcpb.ExecDetailsV2{
TimeDetail: &kvrpcpb.TimeDetail{
TimeDetailV2: &kvrpcpb.TimeDetailV2{
TotalRpcWallTimeNs: uint64(time.Second),
},
ScanDetailV2: &kvrpcpb.ScanDetailV2{
Expand All @@ -602,7 +608,7 @@ func TestTraceExecDetails(t *testing.T) {
ApplyWriteMemtableNanos: uint64(50 * time.Millisecond),
},
},
"tikv.RPC[1s]{ tikv.Wait{ tikv.GetSnapshot[80ms] } tikv.Process tikv.AsyncWrite{ tikv.StoreBatchWait[10ms] tikv.ProposeSendWait[10ms] tikv.PersistLog'[100ms]{ tikv.RaftDBWriteWait[20ms] tikv.RaftDBWriteWAL[30ms] tikv.RaftDBWriteMemtable[30ms] } tikv.CommitLog[200ms] tikv.ApplyBatchWait[20ms] tikv.ApplyLog[300ms]{ tikv.ApplyMutexLock[10ms] tikv.ApplyWriteLeaderWait[10ms] tikv.ApplyWriteWAL[80ms] tikv.ApplyWriteMemtable[50ms] } } }",
"tikv.RPC[1s]{ tikv.Wait{ tikv.GetSnapshot[80ms] } tikv.Process tikv.Suspend tikv.AsyncWrite{ tikv.StoreBatchWait[10ms] tikv.ProposeSendWait[10ms] tikv.PersistLog'[100ms]{ tikv.RaftDBWriteWait[20ms] tikv.RaftDBWriteWAL[30ms] tikv.RaftDBWriteMemtable[30ms] } tikv.CommitLog[200ms] tikv.ApplyBatchWait[20ms] tikv.ApplyLog[300ms]{ tikv.ApplyMutexLock[10ms] tikv.ApplyWriteLeaderWait[10ms] tikv.ApplyWriteWAL[80ms] tikv.ApplyWriteMemtable[50ms] } } }",
strings.Join([]string{
"[00.000,00.080] tikv.GetSnapshot",
"[00.000,00.080] tikv.Wait",
Expand Down
7 changes: 5 additions & 2 deletions internal/resourcecontrol/resource_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,21 +125,24 @@ func MakeResponseInfo(resp *tikvrpc.Response) *ResponseInfo {

// TODO: find out a more accurate way to get the actual KV CPU time.
func getKVCPU(detailsV2 *kvrpcpb.ExecDetailsV2, details *kvrpcpb.ExecDetails) time.Duration {
if timeDetail := detailsV2.GetTimeDetailV2(); timeDetail != nil {
return time.Duration(timeDetail.GetProcessWallTimeNs())
}
if timeDetail := detailsV2.GetTimeDetail(); timeDetail != nil {
return time.Duration(timeDetail.GetProcessWallTimeMs()) * time.Millisecond
}
if timeDetail := details.GetTimeDetail(); timeDetail != nil {
return time.Duration(timeDetail.GetProcessWallTimeMs()) * time.Millisecond
}
return 0
return time.Duration(0)
}

// ReadBytes returns the read bytes of the response.
func (res *ResponseInfo) ReadBytes() uint64 {
return res.readBytes
}

// KVCPUMs returns the KV CPU time in milliseconds of the response.
// KVCPU returns the KV CPU time of the response.
func (res *ResponseInfo) KVCPU() time.Duration {
return res.kvCPU
}
Expand Down
16 changes: 13 additions & 3 deletions txnkv/txnsnapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,12 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *retry.Backoffer, batch batchKeys,
}
if batchGetResp.ExecDetailsV2 != nil {
readKeys := len(batchGetResp.Pairs)
readTime := float64(batchGetResp.ExecDetailsV2.GetTimeDetail().GetKvReadWallTimeMs() / 1000)
var readTime float64
if timeDetail := batchGetResp.ExecDetailsV2.GetTimeDetailV2(); timeDetail != nil {
readTime = float64(timeDetail.GetKvReadWallTimeNs()) / 1000000000.
} else if timeDetail := batchGetResp.ExecDetailsV2.GetTimeDetail(); timeDetail != nil {
readTime = float64(timeDetail.GetKvReadWallTimeMs()) / 1000.
}
readSize := float64(batchGetResp.ExecDetailsV2.GetScanDetailV2().GetProcessedVersionsSize())
metrics.ObserveReadSLI(uint64(readKeys), readTime, readSize)
s.mergeExecDetail(batchGetResp.ExecDetailsV2)
Expand Down Expand Up @@ -668,7 +673,12 @@ func (s *KVSnapshot) get(ctx context.Context, bo *retry.Backoffer, k []byte) ([]
cmdGetResp := resp.Resp.(*kvrpcpb.GetResponse)
if cmdGetResp.ExecDetailsV2 != nil {
readKeys := len(cmdGetResp.Value)
readTime := float64(cmdGetResp.ExecDetailsV2.GetTimeDetail().GetKvReadWallTimeMs() / 1000)
var readTime float64
if timeDetail := cmdGetResp.ExecDetailsV2.GetTimeDetailV2(); timeDetail != nil {
readTime = float64(timeDetail.GetKvReadWallTimeNs()) / 1000000000.
} else if timeDetail := cmdGetResp.ExecDetailsV2.GetTimeDetail(); timeDetail != nil {
readTime = float64(timeDetail.GetKvReadWallTimeMs()) / 1000.
}
readSize := float64(cmdGetResp.ExecDetailsV2.GetScanDetailV2().GetProcessedVersionsSize())
metrics.ObserveReadSLI(uint64(readKeys), readTime, readSize)
s.mergeExecDetail(cmdGetResp.ExecDetailsV2)
Expand Down Expand Up @@ -736,7 +746,7 @@ func (s *KVSnapshot) mergeExecDetail(detail *kvrpcpb.ExecDetailsV2) {
s.mu.stats.timeDetail = &util.TimeDetail{}
}
s.mu.stats.scanDetail.MergeFromScanDetailV2(detail.ScanDetailV2)
s.mu.stats.timeDetail.MergeFromTimeDetail(detail.TimeDetail)
s.mu.stats.timeDetail.MergeFromTimeDetail(detail.TimeDetailV2, detail.TimeDetail)
}

// Iter return a list of key-value pair after `k`.
Expand Down
27 changes: 21 additions & 6 deletions util/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewTiKVExecDetails(pb *kvrpcpb.ExecDetailsV2) TiKVExecDetails {
return TiKVExecDetails{}
}
td := &TimeDetail{}
td.MergeFromTimeDetail(pb.TimeDetail)
td.MergeFromTimeDetail(pb.TimeDetailV2, pb.TimeDetail)
sd := &ScanDetail{}
sd.MergeFromScanDetailV2(pb.ScanDetailV2)
wd := &WriteDetail{}
Expand Down Expand Up @@ -606,11 +606,13 @@ type TimeDetail struct {
// cannot be excluded for now, like Mutex wait time, which is included in this field, so that
// this field is called wall time instead of CPU time.
ProcessTime time.Duration
// Cpu wall time elapsed that task is waiting in queue.
SuspendTime time.Duration
// Off-cpu wall time elapsed in TiKV side. Usually this includes queue waiting time and
// other kind of waits in series.
WaitTime time.Duration
// KvReadWallTimeMs is the time used in KV Scan/Get.
KvReadWallTimeMs time.Duration
// KvReadWallTime is the time used in KV Scan/Get.
KvReadWallTime time.Duration
// TotalRPCWallTime is Total wall clock time spent on this RPC in TiKV.
TotalRPCWallTime time.Duration
}
Expand All @@ -625,6 +627,13 @@ func (td *TimeDetail) String() string {
buf.WriteString("total_process_time: ")
buf.WriteString(FormatDuration(td.ProcessTime))
}
if td.SuspendTime > 0 {
if buf.Len() > 0 {
buf.WriteString(", ")
}
buf.WriteString("total_suspend_time: ")
buf.WriteString(FormatDuration(td.SuspendTime))
}
if td.WaitTime > 0 {
if buf.Len() > 0 {
buf.WriteString(", ")
Expand All @@ -643,11 +652,17 @@ func (td *TimeDetail) String() string {
}

// MergeFromTimeDetail merges time detail from pb into itself.
func (td *TimeDetail) MergeFromTimeDetail(timeDetail *kvrpcpb.TimeDetail) {
if timeDetail != nil {
func (td *TimeDetail) MergeFromTimeDetail(timeDetailV2 *kvrpcpb.TimeDetailV2, timeDetail *kvrpcpb.TimeDetail) {
if timeDetailV2 != nil {
td.WaitTime += time.Duration(timeDetailV2.WaitWallTimeNs) * time.Nanosecond
td.ProcessTime += time.Duration(timeDetailV2.ProcessWallTimeNs) * time.Nanosecond
td.SuspendTime += time.Duration(timeDetailV2.ProcessSuspendWallTimeNs) * time.Nanosecond
td.KvReadWallTime += time.Duration(timeDetailV2.KvReadWallTimeNs) * time.Nanosecond
td.TotalRPCWallTime += time.Duration(timeDetailV2.TotalRpcWallTimeNs) * time.Nanosecond
} else if timeDetail != nil {
td.WaitTime += time.Duration(timeDetail.WaitWallTimeMs) * time.Millisecond
td.ProcessTime += time.Duration(timeDetail.ProcessWallTimeMs) * time.Millisecond
td.KvReadWallTimeMs += time.Duration(timeDetail.KvReadWallTimeMs) * time.Millisecond
td.KvReadWallTime += time.Duration(timeDetail.KvReadWallTimeMs) * time.Millisecond
td.TotalRPCWallTime += time.Duration(timeDetail.TotalRpcWallTimeNs) * time.Nanosecond
}
}
Expand Down