From 295689b2d2202141da2d17a7dfb53a45bc6e8af1 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 21 Sep 2021 18:25:18 -0400 Subject: [PATCH 1/5] base: optimize {NodeIDContainer,StoreIDContainer}.String() These String() methods were implemented in terms of their respective SafeFormat, which was pretty expensive: upwards of 750ns and between 4-7 allocations depending on the node id. This cost caused at least two workarounds, that the patch annotates. The patch makes stringifying cheap by precomputing the value and moving from SafeFormatter to SafeValue. Moving away from SafeFormatter to a more down-to-earth implementation brings the cost down to between 0 and 1 allocations. But I went further and precomputed the value because these containers are used as logging tags and so can easily end up being stringified very frequently. Release note: None --- docs/generated/redact_safe.md | 2 ++ pkg/base/node_id.go | 63 ++++++++++++++++++++-------------- pkg/rpc/context.go | 2 +- pkg/util/tracing/span_inner.go | 5 +++ 4 files changed, 45 insertions(+), 27 deletions(-) diff --git a/docs/generated/redact_safe.md b/docs/generated/redact_safe.md index 9bac83a0c4d7..0c819ec04256 100644 --- a/docs/generated/redact_safe.md +++ b/docs/generated/redact_safe.md @@ -2,6 +2,8 @@ The following types are considered always safe for reporting: File | Type --|-- +pkg/base/node_id.go | `*NodeIDContainer` +pkg/base/node_id.go | `*StoreIDContainer` pkg/cli/exit/exit.go | `Code` pkg/jobs/jobspb/wrap.go | `Type` pkg/kv/kvserver/closedts/ctpb/service.go | `LAI` diff --git a/pkg/base/node_id.go b/pkg/base/node_id.go index 0fd1d43312de..8a619596a98a 100644 --- a/pkg/base/node_id.go +++ b/pkg/base/node_id.go @@ -29,26 +29,29 @@ import ( type NodeIDContainer struct { _ util.NoCopy - // nodeID is atomically updated under the mutex; it can be read atomically - // without the mutex. + // nodeID is accessed atomically. nodeID int32 + + // If nodeID has been set, str represents nodeID converted to string. We + // precompute this value to speed up String() and keep it from allocating + // memory dynamically. + str atomic.Value } // String returns the node ID, or "?" if it is unset. func (n *NodeIDContainer) String() string { - return redact.StringWithoutMarkers(n) -} - -// SafeFormat implements the redact.SafeFormatter interface. -func (n *NodeIDContainer) SafeFormat(w redact.SafePrinter, _ rune) { - val := n.Get() - if val == 0 { - w.SafeRune('?') - } else { - w.Print(val) + s := n.str.Load() + if s == nil { + return "?" } + return s.(string) } +var _ redact.SafeValue = &NodeIDContainer{} + +// SafeValue implements the redact.SafeValue interface. +func (n *NodeIDContainer) SafeValue() {} + // Get returns the current node ID; 0 if it is unset. func (n *NodeIDContainer) Get() roachpb.NodeID { return roachpb.NodeID(atomic.LoadInt32(&n.nodeID)) @@ -67,6 +70,7 @@ func (n *NodeIDContainer) Set(ctx context.Context, val roachpb.NodeID) { } else if oldVal != int32(val) { log.Fatalf(ctx, "different NodeIDs set: %d, then %d", oldVal, val) } + n.str.Store(strconv.Itoa(int(val))) } // Reset changes the NodeID regardless of the old value. @@ -74,6 +78,7 @@ func (n *NodeIDContainer) Set(ctx context.Context, val roachpb.NodeID) { // Should only be used in testing code. func (n *NodeIDContainer) Reset(val roachpb.NodeID) { atomic.StoreInt32(&n.nodeID, int32(val)) + n.str.Store(strconv.Itoa(int(val))) } // StoreIDContainer is added as a logtag in the pebbleLogger's context. @@ -83,9 +88,13 @@ func (n *NodeIDContainer) Reset(val roachpb.NodeID) { type StoreIDContainer struct { _ util.NoCopy - // After the struct is initially created, storeID is atomically - // updated under the mutex; it can be read atomically without the mutex. + // storeID is accessed atomically. storeID int32 + + // If storeID has been set, str represents storeID converted to string. We + // precompute this value to speed up String() and keep it from allocating + // memory dynamically. + str atomic.Value } // TempStoreID is used as the store id for a temp pebble engine's log @@ -95,21 +104,18 @@ const TempStoreID = -1 // stores if they haven't been initialized. If a main store hasn't // been initialized, then "?" is returned. func (s *StoreIDContainer) String() string { - return redact.StringWithoutMarkers(s) -} - -// SafeFormat implements the redact.SafeFormatter interface. -func (s *StoreIDContainer) SafeFormat(w redact.SafePrinter, _ rune) { - val := s.Get() - if val == 0 { - w.SafeRune('?') - } else if val == TempStoreID { - w.Print("temp") - } else { - w.Print(val) + str := s.str.Load() + if str == nil { + return "?" } + return str.(string) } +var _ redact.SafeValue = &StoreIDContainer{} + +// SafeValue implements the redact.SafeValue interface. +func (s *StoreIDContainer) SafeValue() {} + // Get returns the current storeID; 0 if it is unset. func (s *StoreIDContainer) Get() int32 { return atomic.LoadInt32(&s.storeID) @@ -133,6 +139,11 @@ func (s *StoreIDContainer) Set(ctx context.Context, val int32) { oldVal, val) } } + if val == TempStoreID { + s.str.Store("temp") + } else { + s.str.Store(strconv.Itoa(int(val))) + } } // A SQLInstanceID is an ephemeral ID assigned to a running instance of the SQL diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index e87a26c92ccf..0da3cb8eda7c 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -777,7 +777,7 @@ func (ctx *Context) grpcDialOptions( // is in setupSpanForIncomingRPC(). // tagger := func(span *tracing.Span) { - span.SetTag("node", attribute.StringValue(ctx.NodeID.Get().String())) + span.SetTag("node", attribute.IntValue(int(ctx.NodeID.Get()))) } unaryInterceptors = append(unaryInterceptors, tracing.ClientInterceptor(tracer, tagger)) diff --git a/pkg/util/tracing/span_inner.go b/pkg/util/tracing/span_inner.go index 5a1e20d9c037..6865158a9c15 100644 --- a/pkg/util/tracing/span_inner.go +++ b/pkg/util/tracing/span_inner.go @@ -81,6 +81,11 @@ func (s *spanInner) GetRecording() Recording { } // If the span is not verbose, optimize by avoiding the tags. // This span is likely only used to carry payloads around. + // + // TODO(andrei): The optimization for avoiding the tags was done back when + // stringifying a {NodeID,StoreID}Container (a very common tag) was expensive. + // That has become cheap since, so this optimization might not be worth it any + // more. wantTags := s.crdb.recordingType() == RecordingVerbose return s.crdb.getRecording(wantTags) } From f0c28e61129ba7f1994c77942f85f9a9d13d32d8 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 20 Sep 2021 18:21:19 -0400 Subject: [PATCH 2/5] util/tracing: simplify processing of trace recordings Some utilities were using the clunky opentracing.LogRecord for no reason. Release note: None --- go.mod | 1 - pkg/sql/exec_util.go | 2 +- pkg/util/tracing/BUILD.bazel | 2 - pkg/util/tracing/recording.go | 80 ++++++++++----------- pkg/util/tracing/tracingpb/BUILD.bazel | 5 +- pkg/util/tracing/tracingpb/recorded_span.go | 7 +- vendor | 2 +- 7 files changed, 46 insertions(+), 53 deletions(-) diff --git a/go.mod b/go.mod index baf59c5fc7f6..3ca4e53f1637 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,6 @@ require ( github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.1 github.com/opennota/wd v0.0.0-20180911144301-b446539ab1e7 // indirect - github.com/opentracing/opentracing-go v1.2.0 github.com/peterbourgon/g2s v0.0.0-20170223122336-d4e7ad98afea // indirect github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 github.com/pierrre/geohash v1.0.0 diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index b23a9405eb8c..cd9dd8558582 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -2453,7 +2453,7 @@ func getMessagesForSubtrace( allLogs = append(allLogs, logRecordRow{ timestamp: logTime, - msg: span.Logs[i].Msg(), + msg: span.Logs[i].Msg().StripMarkers(), span: span, // Add 1 to the index to account for the first dummy message in a // span. diff --git a/pkg/util/tracing/BUILD.bazel b/pkg/util/tracing/BUILD.bazel index b97f071f7151..9f6e76bff9ab 100644 --- a/pkg/util/tracing/BUILD.bazel +++ b/pkg/util/tracing/BUILD.bazel @@ -35,8 +35,6 @@ go_library( "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", "@com_github_jaegertracing_jaeger//model/json", - "@com_github_opentracing_opentracing_go//:opentracing-go", - "@com_github_opentracing_opentracing_go//log", "@com_github_petermattis_goid//:goid", "@com_github_pmezard_go_difflib//difflib", "@io_opentelemetry_go_otel//attribute", diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go index 2234f6e8c107..48871615af5e 100644 --- a/pkg/util/tracing/recording.go +++ b/pkg/util/tracing/recording.go @@ -21,10 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" "github.com/gogo/protobuf/types" jaegerjson "github.com/jaegertracing/jaeger/model/json" - "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pmezard/go-difflib/difflib" ) @@ -43,9 +42,7 @@ const ( ) type traceLogData struct { - // TODO(andrei): Remove the opentracing dependency. We generally don't use - // opentracing any more and this little dependency here pulls a large package. - opentracing.LogRecord + logRecord depth int // timeSincePrev represents the duration since the previous log line (previous in the // set of log lines that this is part of). This is always computed relative to a log line @@ -59,6 +56,11 @@ type traceLogData struct { timeSincePrev time.Duration } +type logRecord struct { + Timestamp time.Time + Msg redact.RedactableString +} + // String formats the given spans for human consumption, showing the // relationship using nesting and times as both relative to the previous event // and cumulative. @@ -89,12 +91,7 @@ func (r Recording) String() string { 1000*entry.Timestamp.Sub(start).Seconds(), 1000*entry.timeSincePrev.Seconds(), strings.Repeat(" ", entry.depth+1)) - for i, f := range entry.Fields { - if i != 0 { - buf.WriteByte(' ') - } - fmt.Fprintf(&buf, "%s:%v", f.Key(), f.Value()) - } + fmt.Fprint(&buf, entry.Msg.StripMarkers()) buf.WriteByte('\n') } } @@ -145,7 +142,7 @@ func (r Recording) FindLogMessage(pattern string) (string, bool) { re := regexp.MustCompile(pattern) for _, sp := range r { for _, l := range sp.Logs { - msg := l.Msg() + msg := l.Msg().StripMarkers() if re.MatchString(msg) { return msg, true } @@ -172,51 +169,47 @@ func (r Recording) FindSpan(operation string) (tracingpb.RecordedSpan, bool) { func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogData { ownLogs := make([]traceLogData, 0, len(sp.Logs)+1) - conv := func(l opentracing.LogRecord, ref time.Time) traceLogData { + conv := func(msg redact.RedactableString, timestamp time.Time, ref time.Time) traceLogData { var timeSincePrev time.Duration if ref != (time.Time{}) { - timeSincePrev = l.Timestamp.Sub(ref) + timeSincePrev = timestamp.Sub(ref) } return traceLogData{ - LogRecord: l, + logRecord: logRecord{ + Timestamp: timestamp, + Msg: msg, + }, depth: depth, timeSincePrev: timeSincePrev, } } // Add a log line representing the start of the Span. - lr := opentracing.LogRecord{ - Timestamp: sp.StartTime, - Fields: []otlog.Field{otlog.String("=== operation", sp.Operation)}, + var sb redact.StringBuilder + sb.SafeString("=== operation:") + sb.SafeString(redact.SafeString(sp.Operation)) + + tags := make([]string, 0, len(sp.Tags)) + for k := range sp.Tags { + tags = append(tags, k) } - if len(sp.Tags) > 0 { - tags := make([]string, 0, len(sp.Tags)) - for k := range sp.Tags { - tags = append(tags, k) - } - sort.Strings(tags) - for _, k := range tags { - lr.Fields = append(lr.Fields, otlog.String(k, sp.Tags[k])) - } + sort.Strings(tags) + + for _, k := range tags { + sb.SafeRune(' ') + sb.SafeString(redact.SafeString(k)) + sb.SafeRune(':') + _, _ = sb.WriteString(sp.Tags[k]) } ownLogs = append(ownLogs, conv( - lr, + sb.RedactableString(), + sp.StartTime, // ref - this entries timeSincePrev will be computed when we merge it into the parent time.Time{})) for _, l := range sp.Logs { - lr := opentracing.LogRecord{ - Timestamp: l.Time, - Fields: make([]otlog.Field, len(l.Fields)), - } - for i, f := range l.Fields { - // TODO(obs-inf): the use of opentracing data structures here seems - // like detritus and prevents good redactability of the result. - // It looks like this is only used for Recording.String() though. - lr.Fields[i] = otlog.String(f.Key, f.Value.StripMarkers()) - } lastLog := ownLogs[len(ownLogs)-1] - ownLogs = append(ownLogs, conv(lr, lastLog.Timestamp)) + ownLogs = append(ownLogs, conv("event:"+l.Msg(), l.Time, lastLog.Timestamp)) } // If the span was verbose then the Structured events would have been @@ -224,16 +217,15 @@ func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogDat // we should add the Structured events now. if !isVerbose(sp) { sp.Structured(func(sr *types.Any, t time.Time) { - lr := opentracing.LogRecord{ - Timestamp: t, - } str, err := MessageToJSONString(sr, true /* emitDefaults */) if err != nil { return } - lr.Fields = append(lr.Fields, otlog.String("structured", str)) lastLog := ownLogs[len(ownLogs)-1] - ownLogs = append(ownLogs, conv(lr, lastLog.Timestamp)) + var sb redact.StringBuilder + sb.SafeString("structured:") + _, _ = sb.WriteString(str) + ownLogs = append(ownLogs, conv(sb.RedactableString(), t, lastLog.Timestamp)) }) } diff --git a/pkg/util/tracing/tracingpb/BUILD.bazel b/pkg/util/tracing/tracingpb/BUILD.bazel index 24ca6a38846b..e3df3a8662bc 100644 --- a/pkg/util/tracing/tracingpb/BUILD.bazel +++ b/pkg/util/tracing/tracingpb/BUILD.bazel @@ -8,7 +8,10 @@ go_library( embed = [":tracingpb_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb", visibility = ["//visibility:public"], - deps = ["@com_github_gogo_protobuf//types"], + deps = [ + "@com_github_cockroachdb_redact//:redact", + "@com_github_gogo_protobuf//types", + ], ) proto_library( diff --git a/pkg/util/tracing/tracingpb/recorded_span.go b/pkg/util/tracing/tracingpb/recorded_span.go index ce91d7e9aac8..9456d0c64934 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.go +++ b/pkg/util/tracing/tracingpb/recorded_span.go @@ -15,6 +15,7 @@ import ( "strings" "time" + "github.com/cockroachdb/redact" types "github.com/gogo/protobuf/types" ) @@ -50,14 +51,14 @@ func (s *RecordedSpan) Structured(visit func(*types.Any, time.Time)) { // Msg extracts the message of the LogRecord, which is either in an "event" or // "error" field. -func (l LogRecord) Msg() string { +func (l LogRecord) Msg() redact.RedactableString { for _, f := range l.Fields { key := f.Key if key == LogMessageField { - return f.Value.StripMarkers() + return f.Value } if key == "error" { - return fmt.Sprint("error:", f.Value) + return redact.Sprintf("error: %s", f.Value) } } return "" diff --git a/vendor b/vendor index e677f187b87c..af0cc1039a4c 160000 --- a/vendor +++ b/vendor @@ -1 +1 @@ -Subproject commit e677f187b87c5e48118ac663e806b6fab4f6ab45 +Subproject commit af0cc1039a4c9b39805fea102fa3157bcde50b33 From 46b5897e28990bf9753fbdf4acbb577dc6401e72 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Mon, 20 Sep 2021 18:37:30 -0400 Subject: [PATCH 3/5] util/tracing: move some test utilities to test file Release note: None --- pkg/util/log/ambient_context_test.go | 2 +- pkg/util/log/trace_client_test.go | 4 +- pkg/util/log/trace_test.go | 2 +- pkg/util/stop/stopper_test.go | 2 +- pkg/util/tracing/collector/collector_test.go | 10 +- pkg/util/tracing/grpc_interceptor_test.go | 2 +- pkg/util/tracing/recording.go | 165 ------------------ pkg/util/tracing/service/service_test.go | 4 +- pkg/util/tracing/span_test.go | 24 +-- pkg/util/tracing/tags_test.go | 6 +- pkg/util/tracing/test_utils.go | 168 +++++++++++++++++++ pkg/util/tracing/tracer_test.go | 30 ++-- 12 files changed, 211 insertions(+), 208 deletions(-) diff --git a/pkg/util/log/ambient_context_test.go b/pkg/util/log/ambient_context_test.go index 11fa6933170c..e2a30cf9b14f 100644 --- a/pkg/util/log/ambient_context_test.go +++ b/pkg/util/log/ambient_context_test.go @@ -59,7 +59,7 @@ func TestAnnotateCtxSpan(t *testing.T) { sp2.Finish() sp1.Finish() - if err := tracing.TestingCheckRecordedSpans(sp1.GetRecording(), ` + if err := tracing.CheckRecordedSpans(sp1.GetRecording(), ` span: root tags: _verbose=1 event: a diff --git a/pkg/util/log/trace_client_test.go b/pkg/util/log/trace_client_test.go index c862101b1c4f..3d3c7f1ca2d4 100644 --- a/pkg/util/log/trace_client_test.go +++ b/pkg/util/log/trace_client_test.go @@ -37,7 +37,7 @@ func TestTrace(t *testing.T) { return ctxWithSpan, sp }, check: func(t *testing.T, _ context.Context, sp *tracing.Span) { - if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` + if err := tracing.CheckRecordedSpans(sp.GetRecording(), ` span: s tags: _verbose=1 event: test1 @@ -105,7 +105,7 @@ func TestTraceWithTags(t *testing.T) { log.Info(ctxWithSpan, "log") sp.Finish() - if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` + if err := tracing.CheckRecordedSpans(sp.GetRecording(), ` span: s tags: _verbose=1 event: [tag=1] test1 diff --git a/pkg/util/log/trace_test.go b/pkg/util/log/trace_test.go index b308ace78b2f..b0ba2f9e78c3 100644 --- a/pkg/util/log/trace_test.go +++ b/pkg/util/log/trace_test.go @@ -130,7 +130,7 @@ func TestEventLogAndTrace(t *testing.T) { sp.Finish() el.Finish() - if err := tracing.TestingCheckRecordedSpans(sp.GetRecording(), ` + if err := tracing.CheckRecordedSpans(sp.GetRecording(), ` span: s tags: _verbose=1 event: test3 diff --git a/pkg/util/stop/stopper_test.go b/pkg/util/stop/stopper_test.go index 505d184b1821..541620e1a39d 100644 --- a/pkg/util/stop/stopper_test.go +++ b/pkg/util/stop/stopper_test.go @@ -713,7 +713,7 @@ func TestStopperRunAsyncTaskTracing(t *testing.T) { s.Stop(ctx) finish() - require.NoError(t, tracing.TestingCheckRecordedSpans(getRecording(), ` + require.NoError(t, tracing.CheckRecordedSpans(getRecording(), ` span: parent span: async child same trace event: async 2`)) diff --git a/pkg/util/tracing/collector/collector_test.go b/pkg/util/tracing/collector/collector_test.go index 55c4af5f7171..f974070305c2 100644 --- a/pkg/util/tracing/collector/collector_test.go +++ b/pkg/util/tracing/collector/collector_test.go @@ -147,7 +147,7 @@ func TestTracingCollectorGetSpanRecordings(t *testing.T) { nodeRecordings := getSpansFromAllNodes(localTraceID) node1Recordings := nodeRecordings[roachpb.NodeID(1)] require.Equal(t, 1, len(node1Recordings)) - require.NoError(t, tracing.TestingCheckRecordedSpans(node1Recordings[0], ` + require.NoError(t, tracing.CheckRecordedSpans(node1Recordings[0], ` span: root tags: _unfinished=1 _verbose=1 event: structured=root @@ -158,7 +158,7 @@ func TestTracingCollectorGetSpanRecordings(t *testing.T) { `)) node2Recordings := nodeRecordings[roachpb.NodeID(2)] require.Equal(t, 1, len(node2Recordings)) - require.NoError(t, tracing.TestingCheckRecordedSpans(node2Recordings[0], ` + require.NoError(t, tracing.CheckRecordedSpans(node2Recordings[0], ` span: root.child.remotechild tags: _unfinished=1 _verbose=1 event: structured=root.child.remotechild @@ -171,18 +171,18 @@ func TestTracingCollectorGetSpanRecordings(t *testing.T) { nodeRecordings := getSpansFromAllNodes(remoteTraceID) node1Recordings := nodeRecordings[roachpb.NodeID(1)] require.Equal(t, 2, len(node1Recordings)) - require.NoError(t, tracing.TestingCheckRecordedSpans(node1Recordings[0], ` + require.NoError(t, tracing.CheckRecordedSpans(node1Recordings[0], ` span: root2.child.remotechild tags: _unfinished=1 _verbose=1 `)) - require.NoError(t, tracing.TestingCheckRecordedSpans(node1Recordings[1], ` + require.NoError(t, tracing.CheckRecordedSpans(node1Recordings[1], ` span: root2.child.remotechild2 tags: _unfinished=1 _verbose=1 `)) node2Recordings := nodeRecordings[roachpb.NodeID(2)] require.Equal(t, 1, len(node2Recordings)) - require.NoError(t, tracing.TestingCheckRecordedSpans(node2Recordings[0], ` + require.NoError(t, tracing.CheckRecordedSpans(node2Recordings[0], ` span: root2 tags: _unfinished=1 _verbose=1 event: structured=root2 diff --git a/pkg/util/tracing/grpc_interceptor_test.go b/pkg/util/tracing/grpc_interceptor_test.go index d60a0f934129..892701a326e4 100644 --- a/pkg/util/tracing/grpc_interceptor_test.go +++ b/pkg/util/tracing/grpc_interceptor_test.go @@ -240,7 +240,7 @@ func TestGRPCInterceptors(t *testing.T) { span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s tags: span.kind=server test-baggage-key=test-baggage-value event: structured=magic-value`, tc.name) - require.NoError(t, tracing.TestingCheckRecordedSpans(finalRecs, exp)) + require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp)) }) } testutils.SucceedsSoon(t, func() error { diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go index 48871615af5e..55a121c50ffe 100644 --- a/pkg/util/tracing/recording.go +++ b/pkg/util/tracing/recording.go @@ -20,11 +20,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" - "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "github.com/gogo/protobuf/types" jaegerjson "github.com/jaegertracing/jaeger/model/json" - "github.com/pmezard/go-difflib/difflib" ) // RecordingType is the type of recording that a Span might be performing. @@ -418,166 +416,3 @@ func isVerbose(s tracingpb.RecordedSpan) bool { _, isVerbose := s.Baggage[verboseTracingBaggageKey] return isVerbose } - -// TestingCheckRecordedSpans checks whether a recording looks like an expected -// one represented by a string with one line per expected span and one line per -// expected event (i.e. log message), with a tab-indentation for child spans. -// -// if err := TestingCheckRecordedSpans(Span.GetRecording(), ` -// span: root -// event: a -// span: child -// event: [ambient] b -// event: c -// `); err != nil { -// t.Fatal(err) -// } -// -// The event lines can (and generally should) omit the file:line part that they -// might contain (depending on the level at which they were logged). -// -// Note: this test function is in this file because it needs to be used by -// both tests in the tracing package and tests outside of it, and the function -// itself depends on tracing. -func TestingCheckRecordedSpans(rec Recording, expected string) error { - normalize := func(rec string) string { - // normalize the string form of a recording for ease of comparison. - // - // 1. Strip out any leading new lines. - rec = strings.TrimLeft(rec, "\n") - // 2. Strip out trailing whitespace. - rec = strings.TrimRight(rec, "\n\t ") - // 3. Strip out file:line information from the recordings. - // - // Before | "event: util/log/trace_test.go:111 log" - // After | "event: log" - re := regexp.MustCompile(`event: .*:[0-9]*`) - rec = string(re.ReplaceAll([]byte(rec), []byte("event:"))) - // 4. Change all tabs to four spaces. - rec = strings.ReplaceAll(rec, "\t", " ") - // 5. Compute the outermost indentation. - indent := strings.Repeat(" ", len(rec)-len(strings.TrimLeft(rec, " "))) - // 6. Outdent each line by that amount. - var lines []string - for _, line := range strings.Split(rec, "\n") { - lines = append(lines, strings.TrimPrefix(line, indent)) - } - // 7. Stitch everything together. - return strings.Join(lines, "\n") - } - - var rows []string - row := func(depth int, format string, args ...interface{}) { - rows = append(rows, strings.Repeat(" ", depth)+fmt.Sprintf(format, args...)) - } - - mapping := make(map[uint64]uint64) // spanID -> parentSpanID - for _, rs := range rec { - mapping[rs.SpanID] = rs.ParentSpanID - } - depth := func(spanID uint64) int { - // Traverse up the parent links until one is not found. - curSpanID := spanID - d := 0 - for { - var ok bool - curSpanID, ok = mapping[curSpanID] - if !ok { - break - } - d++ - } - return d - } - - for _, rs := range rec { - d := depth(rs.SpanID) - row(d, "span: %s", rs.Operation) - if len(rs.Tags) > 0 { - var tags []string - for k, v := range rs.Tags { - tags = append(tags, fmt.Sprintf("%s=%v", k, v)) - } - sort.Strings(tags) - row(d, " tags: %s", strings.Join(tags, " ")) - } - for _, l := range rs.Logs { - var msg string - for _, f := range l.Fields { - msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value.StripMarkers()) - } - row(d, "%s", msg) - } - } - - exp := normalize(expected) - got := normalize(strings.Join(rows, "\n")) - if got != exp { - diff := difflib.UnifiedDiff{ - A: difflib.SplitLines(exp), - FromFile: "exp", - B: difflib.SplitLines(got), - ToFile: "got", - Context: 4, - } - diffText, _ := difflib.GetUnifiedDiffString(diff) - return errors.Newf("unexpected diff:\n%s\n\nrecording:\n%s", diffText, rec.String()) - } - return nil -} - -// TestingCheckRecording checks whether a recording looks like the expected -// one. The expected string is allowed to elide timing information, and the -// outer-most indentation level is adjusted for when comparing. -// -// if err := TestingCheckRecording(sp.GetRecording(), ` -// === operation:root -// event:root 1 -// === operation:remote child -// event:remote child 1 -// `); err != nil { -// t.Fatal(err) -// } -// -func TestingCheckRecording(rec Recording, expected string) error { - normalize := func(rec string) string { - // normalize the string form of a recording for ease of comparison. - // - // 1. Strip out any leading new lines. - rec = strings.TrimLeft(rec, "\n") - // 2. Strip out trailing space. - rec = strings.TrimRight(rec, "\n\t ") - // 3. Strip out all timing information from the recordings. - // - // Before | "0.007ms 0.007ms event:root 1" - // After | "event:root 1" - re := regexp.MustCompile(`.*s.*s\s{4}`) - rec = string(re.ReplaceAll([]byte(rec), nil)) - // 4. Change all tabs to four spaces. - rec = strings.ReplaceAll(rec, "\t", " ") - // 5. Compute the outermost indentation. - indent := strings.Repeat(" ", len(rec)-len(strings.TrimLeft(rec, " "))) - // 6. Outdent each line by that amount. - var lines []string - for _, line := range strings.Split(rec, "\n") { - lines = append(lines, strings.TrimPrefix(line, indent)) - } - // 6. Stitch everything together. - return strings.Join(lines, "\n") - } - - exp := normalize(expected) - got := normalize(rec.String()) - if got != exp { - diff := difflib.UnifiedDiff{ - A: difflib.SplitLines(exp), - FromFile: "exp", - B: difflib.SplitLines(got), - ToFile: "got", - Context: 4, - } - diffText, _ := difflib.GetUnifiedDiffString(diff) - return errors.Newf("unexpected diff:\n%s", diffText) - } - return nil -} diff --git a/pkg/util/tracing/service/service_test.go b/pkg/util/tracing/service/service_test.go index 32c20a2389d0..6dd6e8a39b91 100644 --- a/pkg/util/tracing/service/service_test.go +++ b/pkg/util/tracing/service/service_test.go @@ -67,13 +67,13 @@ func TestTracingServiceGetSpanRecordings(t *testing.T) { sort.SliceStable(resp.Recordings, func(i, j int) bool { return resp.Recordings[i].RecordedSpans[0].StartTime.Before(resp.Recordings[j].RecordedSpans[0].StartTime) }) - require.NoError(t, tracing.TestingCheckRecordedSpans(resp.Recordings[0].RecordedSpans, ` + require.NoError(t, tracing.CheckRecordedSpans(resp.Recordings[0].RecordedSpans, ` span: root1 tags: _unfinished=1 _verbose=1 span: root1.child tags: _unfinished=1 _verbose=1 `)) - require.NoError(t, tracing.TestingCheckRecordedSpans(resp.Recordings[1].RecordedSpans, ` + require.NoError(t, tracing.CheckRecordedSpans(resp.Recordings[1].RecordedSpans, ` span: fork1 tags: _unfinished=1 _verbose=1 `)) diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index aebc5501ce74..2d296e37beb1 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -75,7 +75,7 @@ func TestRecordingString(t *testing.T) { rec := root.GetRecording() // Sanity check that the recording looks like we want. Note that this is not // its String() representation; this just lists all the spans in order. - require.NoError(t, TestingCheckRecordedSpans(rec, ` + require.NoError(t, CheckRecordedSpans(rec, ` span: root tags: _verbose=1 event: root 1 @@ -91,7 +91,7 @@ func TestRecordingString(t *testing.T) { event: local child 1 `)) - require.NoError(t, TestingCheckRecording(rec, ` + require.NoError(t, CheckRecording(rec, ` === operation:root _verbose:1 event:root 1 === operation:remote child _verbose:1 @@ -158,7 +158,7 @@ func TestRecordingInRecording(t *testing.T) { root.Finish() rootRec := root.GetRecording() - require.NoError(t, TestingCheckRecordedSpans(rootRec, ` + require.NoError(t, CheckRecordedSpans(rootRec, ` span: root tags: _verbose=1 span: child @@ -168,14 +168,14 @@ func TestRecordingInRecording(t *testing.T) { `)) childRec := child.GetRecording() - require.NoError(t, TestingCheckRecordedSpans(childRec, ` + require.NoError(t, CheckRecordedSpans(childRec, ` span: child tags: _verbose=1 span: grandchild tags: _verbose=1 `)) - require.NoError(t, TestingCheckRecording(childRec, ` + require.NoError(t, CheckRecording(childRec, ` === operation:child _verbose:1 === operation:grandchild _verbose:1 `)) @@ -194,7 +194,7 @@ func TestSpan_ImportRemoteSpans(t *testing.T) { sp.ImportRemoteSpans(ch.GetRecording()) sp.Finish() - require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + require.NoError(t, CheckRecordedSpans(sp.GetRecording(), ` span: root span: child event: foo @@ -221,10 +221,10 @@ func TestSpanRecordStructured(t *testing.T) { require.NoError(t, types.UnmarshalAny(item.Payload, &d1)) require.IsType(t, (*types.Int32Value)(nil), d1.Message) - require.NoError(t, TestingCheckRecordedSpans(rec, ` + require.NoError(t, CheckRecordedSpans(rec, ` span: root `)) - require.NoError(t, TestingCheckRecording(rec, ` + require.NoError(t, CheckRecording(rec, ` === operation:root structured:{"@type":"type.googleapis.com/google.protobuf.Int32Value","value":4} `)) @@ -350,7 +350,7 @@ func TestSpanReset(t *testing.T) { } } - require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + require.NoError(t, CheckRecordedSpans(sp.GetRecording(), ` span: root tags: _unfinished=1 _verbose=1 event: 1 @@ -364,7 +364,7 @@ func TestSpanReset(t *testing.T) { event: 9 event: structured=10 `)) - require.NoError(t, TestingCheckRecording(sp.GetRecording(), ` + require.NoError(t, CheckRecording(sp.GetRecording(), ` === operation:root _unfinished:1 _verbose:1 event:1 event:structured=2 @@ -380,11 +380,11 @@ func TestSpanReset(t *testing.T) { sp.ResetRecording() - require.NoError(t, TestingCheckRecordedSpans(sp.GetRecording(), ` + require.NoError(t, CheckRecordedSpans(sp.GetRecording(), ` span: root tags: _unfinished=1 _verbose=1 `)) - require.NoError(t, TestingCheckRecording(sp.GetRecording(), ` + require.NoError(t, CheckRecording(sp.GetRecording(), ` === operation:root _unfinished:1 _verbose:1 `)) diff --git a/pkg/util/tracing/tags_test.go b/pkg/util/tracing/tags_test.go index cbd056f74e30..ef0830c6baa0 100644 --- a/pkg/util/tracing/tags_test.go +++ b/pkg/util/tracing/tags_test.go @@ -31,7 +31,7 @@ func TestLogTags(t *testing.T) { sp1 := tr.StartSpan("foo", WithForceRealSpan(), WithLogTags(l)) sp1.SetVerbose(true) sp1.Finish() - require.NoError(t, TestingCheckRecordedSpans(sp1.GetRecording(), ` + require.NoError(t, CheckRecordedSpans(sp1.GetRecording(), ` span: foo tags: _verbose=1 tag1=val1 tag2=val2 `)) @@ -51,7 +51,7 @@ func TestLogTags(t *testing.T) { sp2 := tr.StartSpan("bar", WithForceRealSpan(), WithLogTags(l)) sp2.SetVerbose(true) sp2.Finish() - require.NoError(t, TestingCheckRecordedSpans(sp2.GetRecording(), ` + require.NoError(t, CheckRecordedSpans(sp2.GetRecording(), ` span: bar tags: _verbose=1 one=val1 two=val2 `)) @@ -69,7 +69,7 @@ func TestLogTags(t *testing.T) { sp3 := tr.StartSpan("baz", WithLogTags(l), WithForceRealSpan()) sp3.SetVerbose(true) sp3.Finish() - require.NoError(t, TestingCheckRecordedSpans(sp3.GetRecording(), ` + require.NoError(t, CheckRecordedSpans(sp3.GetRecording(), ` span: baz tags: _verbose=1 one=val1 two=val2 `)) diff --git a/pkg/util/tracing/test_utils.go b/pkg/util/tracing/test_utils.go index e87f486346e8..e40fc9ce4a09 100644 --- a/pkg/util/tracing/test_utils.go +++ b/pkg/util/tracing/test_utils.go @@ -11,9 +11,14 @@ package tracing import ( + "fmt" + "regexp" + "sort" "strings" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/errors" + "github.com/pmezard/go-difflib/difflib" ) // FindMsgInRecording returns the index of the first Span containing msg in its @@ -55,3 +60,166 @@ func CountLogMessages(sp tracingpb.RecordedSpan, msg string) int { } return res } + +// CheckRecordedSpans checks whether a recording looks like an expected +// one represented by a string with one line per expected span and one line per +// expected event (i.e. log message), with a tab-indentation for child spans. +// +// if err := CheckRecordedSpans(Span.GetRecording(), ` +// span: root +// event: a +// span: child +// event: [ambient] b +// event: c +// `); err != nil { +// t.Fatal(err) +// } +// +// The event lines can (and generally should) omit the file:line part that they +// might contain (depending on the level at which they were logged). +// +// Note: this test function is in this file because it needs to be used by +// both tests in the tracing package and tests outside of it, and the function +// itself depends on tracing. +func CheckRecordedSpans(rec Recording, expected string) error { + normalize := func(rec string) string { + // normalize the string form of a recording for ease of comparison. + // + // 1. Strip out any leading new lines. + rec = strings.TrimLeft(rec, "\n") + // 2. Strip out trailing whitespace. + rec = strings.TrimRight(rec, "\n\t ") + // 3. Strip out file:line information from the recordings. + // + // Before | "event: util/log/trace_test.go:111 log" + // After | "event: log" + re := regexp.MustCompile(`event: .*:[0-9]*`) + rec = string(re.ReplaceAll([]byte(rec), []byte("event:"))) + // 4. Change all tabs to four spaces. + rec = strings.ReplaceAll(rec, "\t", " ") + // 5. Compute the outermost indentation. + indent := strings.Repeat(" ", len(rec)-len(strings.TrimLeft(rec, " "))) + // 6. Outdent each line by that amount. + var lines []string + for _, line := range strings.Split(rec, "\n") { + lines = append(lines, strings.TrimPrefix(line, indent)) + } + // 7. Stitch everything together. + return strings.Join(lines, "\n") + } + + var rows []string + row := func(depth int, format string, args ...interface{}) { + rows = append(rows, strings.Repeat(" ", depth)+fmt.Sprintf(format, args...)) + } + + mapping := make(map[uint64]uint64) // spanID -> parentSpanID + for _, rs := range rec { + mapping[rs.SpanID] = rs.ParentSpanID + } + depth := func(spanID uint64) int { + // Traverse up the parent links until one is not found. + curSpanID := spanID + d := 0 + for { + var ok bool + curSpanID, ok = mapping[curSpanID] + if !ok { + break + } + d++ + } + return d + } + + for _, rs := range rec { + d := depth(rs.SpanID) + row(d, "span: %s", rs.Operation) + if len(rs.Tags) > 0 { + var tags []string + for k, v := range rs.Tags { + tags = append(tags, fmt.Sprintf("%s=%v", k, v)) + } + sort.Strings(tags) + row(d, " tags: %s", strings.Join(tags, " ")) + } + for _, l := range rs.Logs { + var msg string + for _, f := range l.Fields { + msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value.StripMarkers()) + } + row(d, "%s", msg) + } + } + + exp := normalize(expected) + got := normalize(strings.Join(rows, "\n")) + if got != exp { + diff := difflib.UnifiedDiff{ + A: difflib.SplitLines(exp), + FromFile: "exp", + B: difflib.SplitLines(got), + ToFile: "got", + Context: 4, + } + diffText, _ := difflib.GetUnifiedDiffString(diff) + return errors.Newf("unexpected diff:\n%s\n\nrecording:\n%s", diffText, rec.String()) + } + return nil +} + +// CheckRecording checks whether a recording looks like the expected +// one. The expected string is allowed to elide timing information, and the +// outer-most indentation level is adjusted for when comparing. +// +// if err := CheckRecording(sp.GetRecording(), ` +// === operation:root +// event:root 1 +// === operation:remote child +// event:remote child 1 +// `); err != nil { +// t.Fatal(err) +// } +// +func CheckRecording(rec Recording, expected string) error { + normalize := func(rec string) string { + // normalize the string form of a recording for ease of comparison. + // + // 1. Strip out any leading new lines. + rec = strings.TrimLeft(rec, "\n") + // 2. Strip out trailing space. + rec = strings.TrimRight(rec, "\n\t ") + // 3. Strip out all timing information from the recordings. + // + // Before | "0.007ms 0.007ms event:root 1" + // After | "event:root 1" + re := regexp.MustCompile(`.*s.*s\s{4}`) + rec = string(re.ReplaceAll([]byte(rec), nil)) + // 4. Change all tabs to four spaces. + rec = strings.ReplaceAll(rec, "\t", " ") + // 5. Compute the outermost indentation. + indent := strings.Repeat(" ", len(rec)-len(strings.TrimLeft(rec, " "))) + // 6. Outdent each line by that amount. + var lines []string + for _, line := range strings.Split(rec, "\n") { + lines = append(lines, strings.TrimPrefix(line, indent)) + } + // 6. Stitch everything together. + return strings.Join(lines, "\n") + } + + exp := normalize(expected) + got := normalize(rec.String()) + if got != exp { + diff := difflib.UnifiedDiff{ + A: difflib.SplitLines(exp), + FromFile: "exp", + B: difflib.SplitLines(got), + ToFile: "got", + Context: 4, + } + diffText, _ := difflib.GetUnifiedDiffString(diff) + return errors.Newf("unexpected diff:\n%s", diffText) + } + return nil +} diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index f33c78e66058..c5673c54a46d 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -69,12 +69,12 @@ func TestTracerRecording(t *testing.T) { } // Initial recording of this fresh (real) span. - if err := TestingCheckRecordedSpans(s1.GetRecording(), ``); err != nil { + if err := CheckRecordedSpans(s1.GetRecording(), ``); err != nil { t.Fatal(err) } s1.SetVerbose(true) - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a tags: _unfinished=1 _verbose=1 `); err != nil { @@ -98,7 +98,7 @@ func TestTracerRecording(t *testing.T) { } s2.Recordf("x=%d", 3) - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a tags: _unfinished=1 _verbose=1 event: x=2 @@ -109,7 +109,7 @@ func TestTracerRecording(t *testing.T) { t.Fatal(err) } - if err := TestingCheckRecordedSpans(s2.GetRecording(), ` + if err := CheckRecordedSpans(s2.GetRecording(), ` span: b tags: _unfinished=1 _verbose=1 event: x=3 @@ -123,7 +123,7 @@ func TestTracerRecording(t *testing.T) { s2.Finish() - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a tags: _unfinished=1 _verbose=1 event: x=2 @@ -137,7 +137,7 @@ func TestTracerRecording(t *testing.T) { t.Fatal(err) } s3.Finish() - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a tags: _unfinished=1 _verbose=1 event: x=2 @@ -153,7 +153,7 @@ func TestTracerRecording(t *testing.T) { s1.ResetRecording() s1.SetVerbose(false) s1.Recordf("x=%d", 100) - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a `); err != nil { t.Fatal(err) @@ -161,7 +161,7 @@ func TestTracerRecording(t *testing.T) { // The child Span, now finished, will drop future recordings. s3.Recordf("x=%d", 5) - if err := TestingCheckRecordedSpans(s3.GetRecording(), ` + if err := CheckRecordedSpans(s3.GetRecording(), ` span: c tags: _verbose=1 tag=val event: x=4 @@ -179,7 +179,7 @@ func TestStartChildSpan(t *testing.T) { sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` + if err := CheckRecordedSpans(sp1.GetRecording(), ` span: parent tags: _verbose=1 span: child @@ -193,13 +193,13 @@ func TestStartChildSpan(t *testing.T) { sp2 = tr.StartSpan("child", WithParentAndManualCollection(sp1.Meta())) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` + if err := CheckRecordedSpans(sp1.GetRecording(), ` span: parent tags: _verbose=1 `); err != nil { t.Fatal(err) } - if err := TestingCheckRecordedSpans(sp2.GetRecording(), ` + if err := CheckRecordedSpans(sp2.GetRecording(), ` span: child tags: _verbose=1 `); err != nil { @@ -212,7 +212,7 @@ func TestStartChildSpan(t *testing.T) { WithLogTags(logtags.SingleTagBuffer("key", "val"))) sp2.Finish() sp1.Finish() - if err := TestingCheckRecordedSpans(sp1.GetRecording(), ` + if err := CheckRecordedSpans(sp1.GetRecording(), ` span: parent tags: _verbose=1 span: child @@ -282,7 +282,7 @@ func TestTracerInjectExtract(t *testing.T) { // Verify that recording was started automatically. rec := s2.GetRecording() - if err := TestingCheckRecordedSpans(rec, ` + if err := CheckRecordedSpans(rec, ` span: remote op tags: _verbose=1 event: x=1 @@ -290,7 +290,7 @@ func TestTracerInjectExtract(t *testing.T) { t.Fatal(err) } - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a tags: _unfinished=1 _verbose=1 `); err != nil { @@ -300,7 +300,7 @@ func TestTracerInjectExtract(t *testing.T) { s1.ImportRemoteSpans(rec) s1.Finish() - if err := TestingCheckRecordedSpans(s1.GetRecording(), ` + if err := CheckRecordedSpans(s1.GetRecording(), ` span: a tags: _verbose=1 span: remote op From 6a3af8322550a6a5f4129c7d3f3461bf21cd0f73 Mon Sep 17 00:00:00 2001 From: Andrei Matei Date: Tue, 21 Sep 2021 12:43:06 -0400 Subject: [PATCH 4/5] util/tracing: simplify log messages in trace recordings Before this patch, the RecordedSpan proto stored log messages in a very awkward way: each message was stored as a collection of key/values, with only one such pair present (using a well-known key). This was confusing, unnecessary, hard to work with and hard to track for figuring out what keys and values are in there (with the answer being only one key). This patch simplifies the log messages, making them be represented by a single string as nature intended. A bunch of code gets simplified in consequence. Release note: None --- pkg/kv/bulk/sst_batcher_test.go | 11 +- .../concurrency/concurrency_manager_test.go | 22 +-- pkg/server/node.go | 16 +- pkg/server/node_tenant.go | 27 ++- pkg/server/node_tenant_test.go | 14 +- pkg/server/status.go | 16 +- pkg/sql/rowexec/tablereader_test.go | 10 +- pkg/util/tracing/crdbspan.go | 6 +- pkg/util/tracing/recording.go | 19 +- pkg/util/tracing/span_test.go | 4 +- pkg/util/tracing/test_utils.go | 22 +-- pkg/util/tracing/tracingpb/recorded_span.go | 7 +- .../tracing/tracingpb/recorded_span.pb.go | 175 +++++++++++------- .../tracing/tracingpb/recorded_span.proto | 8 +- pkg/util/tracing/utils.go | 3 + 15 files changed, 204 insertions(+), 156 deletions(-) diff --git a/pkg/kv/bulk/sst_batcher_test.go b/pkg/kv/bulk/sst_batcher_test.go index 2752ae1f694f..0b8b602a0385 100644 --- a/pkg/kv/bulk/sst_batcher_test.go +++ b/pkg/kv/bulk/sst_batcher_test.go @@ -16,7 +16,6 @@ import ( "math/rand" "reflect" "runtime" - "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -225,14 +224,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) { } } var splitRetries int - for _, rec := range getRec() { - for _, l := range rec.Logs { - for _, line := range l.Fields { - if strings.Contains(line.Value.StripMarkers(), "SSTable cannot be added spanning range bounds") { - splitRetries++ - } - } - } + for _, sp := range getRec() { + splitRetries += tracing.CountLogMessages(sp, "SSTable cannot be added spanning range bounds") } if splitRetries != expectedSplitRetries { t.Fatalf("expected %d split-caused retries, got %d", expectedSplitRetries, splitRetries) diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index e8cf9b07e241..6d1ecc184a21 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -1018,16 +1018,14 @@ func (m *monitor) collectRecordings() string { rec := g.collect() for _, span := range rec { for _, log := range span.Logs { - for _, field := range log.Fields { - if prev > 0 { - prev-- - continue - } - logs = append(logs, logRecord{ - g: g, value: field.Value.StripMarkers(), - }) - g.prevEvents++ + if prev > 0 { + prev-- + continue } + logs = append(logs, logRecord{ + g: g, value: log.Msg().StripMarkers(), + }) + g.prevEvents++ } } if atomic.LoadInt32(&g.finished) == 1 { @@ -1068,11 +1066,7 @@ func (m *monitor) hasNewEvents(g *monitoredGoroutine) bool { events := 0 rec := g.collect() for _, span := range rec { - for _, log := range span.Logs { - for range log.Fields { - events++ - } - } + events += len(span.Logs) } return events > g.prevEvents } diff --git a/pkg/server/node.go b/pkg/server/node.go index 720aca2a284f..ccc079c34c8d 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -916,11 +916,11 @@ func (n *Node) batchInternal( var br *roachpb.BatchResponse if err := n.stopper.RunTaskWithErr(ctx, "node.Node: batch", func(ctx context.Context) error { - var finishSpan func(*roachpb.BatchResponse) + var finishSpan func(context.Context, *roachpb.BatchResponse) // Shadow ctx from the outer function. Written like this to pass the linter. ctx, finishSpan = n.setupSpanForIncomingRPC(ctx, tenID) // NB: wrapped to delay br evaluation to its value when returning. - defer func() { finishSpan(br) }() + defer func() { finishSpan(ctx, br) }() if log.HasSpanOrEvent(ctx) { log.Eventf(ctx, "node received request: %s", args.Summary()) } @@ -1062,7 +1062,7 @@ func (n *Node) Batch( // be nil in case no response is to be returned to the rpc caller. func (n *Node) setupSpanForIncomingRPC( ctx context.Context, tenID roachpb.TenantID, -) (context.Context, func(*roachpb.BatchResponse)) { +) (context.Context, func(context.Context, *roachpb.BatchResponse)) { // The operation name matches the one created by the interceptor in the // remoteTrace case below. const opName = "/cockroach.roachpb.Internal/Batch" @@ -1083,7 +1083,7 @@ func (n *Node) setupSpanForIncomingRPC( } } - finishSpan := func(br *roachpb.BatchResponse) { + finishSpan := func(ctx context.Context, br *roachpb.BatchResponse) { if newSpan != nil { newSpan.Finish() } @@ -1098,8 +1098,12 @@ func (n *Node) setupSpanForIncomingRPC( // sensitive stripped out of the verbose messages. However, // structured payloads stay untouched. if rec := grpcSpan.GetRecording(); rec != nil { - maybeRedactRecording(tenID, rec) - br.CollectedSpans = append(br.CollectedSpans, rec...) + err := redactRecordingForTenant(tenID, rec) + if err == nil { + br.CollectedSpans = append(br.CollectedSpans, rec...) + } else { + log.Errorf(ctx, "error redacting trace recording: %s", err) + } } } } diff --git a/pkg/server/node_tenant.go b/pkg/server/node_tenant.go index 5145ece7d3c5..8cbd7c2f40a3 100644 --- a/pkg/server/node_tenant.go +++ b/pkg/server/node_tenant.go @@ -14,24 +14,38 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" + "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" ) var sRedactedMarker = redact.RedactableString(redact.EscapeBytes(nil)) -func maybeRedactRecording(tenID roachpb.TenantID, rec tracing.Recording) { +// redactRecordingForTenant redacts the sensitive parts of log messages in the +// recording if the tenant to which this recording is intended is not the system +// tenant (the system tenant gets an. See https://github.com/cockroachdb/cockroach/issues/70407. +// The recording is modified in place. +// +// tenID is the tenant that will receive this recording. +func redactRecordingForTenant(tenID roachpb.TenantID, rec tracing.Recording) error { if tenID == roachpb.SystemTenantID { - return + return nil } - // For tenants, strip the verbose log messages. See: - // https://github.com/cockroachdb/cockroach/issues/70407 for i := range rec { sp := &rec[i] sp.Tags = nil for j := range sp.Logs { record := &sp.Logs[j] - for k := range record.Fields { - field := &record.Fields[k] + if record.Message != "" && !sp.RedactableLogs { + // If Message is set, the record should have been produced by a 22.1 + // node that also sets RedactableLogs. + return errors.AssertionFailedf( + "recording has non-redactable span with the Message field set: %s", sp) + } + record.Message = record.Message.Redact() + + // For compatibility with old versions, also redact DeprecatedFields. + for k := range record.DeprecatedFields { + field := &record.DeprecatedFields[k] if field.Key != tracingpb.LogMessageField { // We don't have any of these fields, but let's not take any // chances (our dependencies might slip them in). @@ -51,4 +65,5 @@ func maybeRedactRecording(tenID roachpb.TenantID, rec tracing.Recording) { } } } + return nil } diff --git a/pkg/server/node_tenant_test.go b/pkg/server/node_tenant_test.go index 75e082502466..6694156068e5 100644 --- a/pkg/server/node_tenant_test.go +++ b/pkg/server/node_tenant_test.go @@ -26,11 +26,11 @@ import ( "go.opentelemetry.io/otel/attribute" ) -// TestMaybeRedactRecording verifies that maybeRedactRecording strips +// TestMaybeRedactRecording verifies that redactRecordingForTenant strips // sensitive details for recordings consumed by tenants. // // See kvccl.TestTenantTracesAreRedacted for an end-to-end test of this. -func TestMaybeRedactRecording(t *testing.T) { +func TestRedactRecordingForTenant(t *testing.T) { defer leaktest.AfterTest(t)() const ( @@ -59,10 +59,10 @@ func TestMaybeRedactRecording(t *testing.T) { t.Run("regular-tenant", func(t *testing.T) { rec := mkRec() - maybeRedactRecording(roachpb.MakeTenantID(100), rec) + require.NoError(t, redactRecordingForTenant(roachpb.MakeTenantID(100), rec)) require.Zero(t, rec[0].Tags) require.Len(t, rec[0].Logs, 1) - msg := rec[0].Logs[0].Fields[0].Value + msg := rec[0].Logs[0].Msg().StripMarkers() t.Log(msg) require.NotContains(t, msg, msgSensitive) require.NotContains(t, msg, tagSensitive) @@ -72,7 +72,7 @@ func TestMaybeRedactRecording(t *testing.T) { t.Run("system-tenant", func(t *testing.T) { rec := mkRec() - maybeRedactRecording(roachpb.SystemTenantID, rec) + require.NoError(t, redactRecordingForTenant(roachpb.SystemTenantID, rec)) require.Equal(t, map[string]string{ "_verbose": "1", "all_span_tags_are_stripped": "because_no_redactability", @@ -80,7 +80,7 @@ func TestMaybeRedactRecording(t *testing.T) { "tag_sensitive": tagSensitive, }, rec[0].Tags) require.Len(t, rec[0].Logs, 1) - msg := rec[0].Logs[0].Fields[0].Value + msg := rec[0].Logs[0].Msg().StripMarkers() t.Log(msg) require.Contains(t, msg, msgSensitive) require.Contains(t, msg, tagSensitive) @@ -93,7 +93,7 @@ func TestMaybeRedactRecording(t *testing.T) { // you're here to see why this test failed to compile, ensure that the // change you're making to RecordedSpan does not include new sensitive data // that may leak from the KV layer to tenants. If it does, update - // maybeRedactRecording appropriately. + // redactRecordingForTenant appropriately. type calcifiedRecordedSpan struct { TraceID uint64 SpanID uint64 diff --git a/pkg/server/status.go b/pkg/server/status.go index 70cf78680954..eb62d8b4c06f 100644 --- a/pkg/server/status.go +++ b/pkg/server/status.go @@ -659,23 +659,11 @@ func (s *statusServer) Allocator( func recordedSpansToTraceEvents(spans []tracingpb.RecordedSpan) []*serverpb.TraceEvent { var output []*serverpb.TraceEvent - var buf bytes.Buffer for _, sp := range spans { for _, entry := range sp.Logs { event := &serverpb.TraceEvent{ - Time: entry.Time, - } - if len(entry.Fields) == 1 { - event.Message = entry.Fields[0].Value.StripMarkers() - } else { - buf.Reset() - for i, f := range entry.Fields { - if i != 0 { - buf.WriteByte(' ') - } - fmt.Fprintf(&buf, "%s:%v", f.Key, f.Value) - } - event.Message = buf.String() + Time: entry.Time, + Message: entry.Msg().StripMarkers(), } output = append(output, event) } diff --git a/pkg/sql/rowexec/tablereader_test.go b/pkg/sql/rowexec/tablereader_test.go index 6c62fd3e84e9..0bbe0a3450d1 100644 --- a/pkg/sql/rowexec/tablereader_test.go +++ b/pkg/sql/rowexec/tablereader_test.go @@ -457,13 +457,11 @@ func TestLimitScans(t *testing.T) { } } for _, l := range span.Logs { - for _, f := range l.Fields { - match := re.FindStringSubmatch(f.Value.StripMarkers()) - if match == nil { - continue - } - ranges[match[1]] = struct{}{} + match := re.FindStringSubmatch(l.Msg().StripMarkers()) + if match == nil { + continue } + ranges[match[1]] = struct{}{} } } if len(ranges) != 1 { diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index f15f180d2b86..87a8c689be09 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -297,8 +297,10 @@ func (s *crdbSpan) record(msg redact.RedactableString) { now = time.Now() } logRecord := &tracingpb.LogRecord{ - Time: now, - Fields: []tracingpb.LogRecord_Field{ + Time: now, + Message: msg, + // Compatibility with 21.2. + DeprecatedFields: []tracingpb.LogRecord_Field{ {Key: tracingpb.LogMessageField, Value: msg}, }, } diff --git a/pkg/util/tracing/recording.go b/pkg/util/tracing/recording.go index 55a121c50ffe..6851b7915555 100644 --- a/pkg/util/tracing/recording.go +++ b/pkg/util/tracing/recording.go @@ -136,6 +136,9 @@ func (r Recording) OrphanSpans() []tracingpb.RecordedSpan { // FindLogMessage returns the first log message in the recording that matches // the given regexp. The bool return value is true if such a message is found. +// +// This method strips the redaction markers from all the log messages, which is +// pretty inefficient. func (r Recording) FindLogMessage(pattern string) (string, bool) { re := regexp.MustCompile(pattern) for _, sp := range r { @@ -207,7 +210,9 @@ func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogDat for _, l := range sp.Logs { lastLog := ownLogs[len(ownLogs)-1] - ownLogs = append(ownLogs, conv("event:"+l.Msg(), l.Time, lastLog.Timestamp)) + var sb redact.StringBuilder + sb.Printf("event:%s", l.Msg()) + ownLogs = append(ownLogs, conv(sb.RedactableString(), l.Time, lastLog.Timestamp)) } // If the span was verbose then the Structured events would have been @@ -353,13 +358,13 @@ func (r Recording) ToJaegerJSON(stmt, comment, nodeStr string) (string, error) { }) } for _, l := range sp.Logs { - jl := jaegerjson.Log{Timestamp: uint64(l.Time.UnixNano() / 1000)} - for _, field := range l.Fields { - jl.Fields = append(jl.Fields, jaegerjson.KeyValue{ - Key: field.Key, - Value: field.Value, + jl := jaegerjson.Log{ + Timestamp: uint64(l.Time.UnixNano() / 1000), + Fields: []jaegerjson.KeyValue{{ + Key: "event", + Value: l.Msg(), Type: "STRING", - }) + }}, } s.Logs = append(s.Logs, jl) } diff --git a/pkg/util/tracing/span_test.go b/pkg/util/tracing/span_test.go index 2d296e37beb1..d442f5d31bef 100644 --- a/pkg/util/tracing/span_test.go +++ b/pkg/util/tracing/span_test.go @@ -309,8 +309,8 @@ func TestSpanRecordLimit(t *testing.T) { first := rec[0].Logs[0] last := rec[0].Logs[len(rec[0].Logs)-1] - require.Equal(t, first.Fields[0].Value.StripMarkers(), msg(extra+1)) - require.Equal(t, last.Fields[0].Value.StripMarkers(), msg(numLogs+extra)) + require.Equal(t, first.Msg().StripMarkers(), msg(extra+1)) + require.Equal(t, last.Msg().StripMarkers(), msg(numLogs+extra)) } // testStructuredImpl is a testing implementation of Structured event. diff --git a/pkg/util/tracing/test_utils.go b/pkg/util/tracing/test_utils.go index e40fc9ce4a09..7f07eddbd98d 100644 --- a/pkg/util/tracing/test_utils.go +++ b/pkg/util/tracing/test_utils.go @@ -35,12 +35,8 @@ func FindMsgInRecording(recording Recording, msg string) int { // LogsContainMsg returns true if a Span's logs contain the given message. func LogsContainMsg(sp tracingpb.RecordedSpan, msg string) bool { for _, l := range sp.Logs { - // NOTE: With our logs, each LogRecord has a single field ("event") and - // value. - for _, f := range l.Fields { - if strings.Contains(f.Value.StripMarkers(), msg) { - return true - } + if strings.Contains(l.Msg().StripMarkers(), msg) { + return true } } return false @@ -50,12 +46,8 @@ func LogsContainMsg(sp tracingpb.RecordedSpan, msg string) bool { func CountLogMessages(sp tracingpb.RecordedSpan, msg string) int { res := 0 for _, l := range sp.Logs { - // NOTE: With our logs, each LogRecord has a single field ("event") and - // value. - for _, f := range l.Fields { - if strings.Contains(f.Value.StripMarkers(), msg) { - res++ - } + if strings.Contains(l.Msg().StripMarkers(), msg) { + res++ } } return res @@ -144,11 +136,7 @@ func CheckRecordedSpans(rec Recording, expected string) error { row(d, " tags: %s", strings.Join(tags, " ")) } for _, l := range rs.Logs { - var msg string - for _, f := range l.Fields { - msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value.StripMarkers()) - } - row(d, "%s", msg) + row(d, " event: %s", l.Msg().StripMarkers()) } } diff --git a/pkg/util/tracing/tracingpb/recorded_span.go b/pkg/util/tracing/tracingpb/recorded_span.go index 9456d0c64934..c3bb88aad0a5 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.go +++ b/pkg/util/tracing/tracingpb/recorded_span.go @@ -52,7 +52,12 @@ func (s *RecordedSpan) Structured(visit func(*types.Any, time.Time)) { // Msg extracts the message of the LogRecord, which is either in an "event" or // "error" field. func (l LogRecord) Msg() redact.RedactableString { - for _, f := range l.Fields { + if l.Message != "" { + return l.Message + } + + // Compatibility with 21.2: look at l.DeprecatedFields. + for _, f := range l.DeprecatedFields { key := f.Key if key == LogMessageField { return f.Value diff --git a/pkg/util/tracing/tracingpb/recorded_span.pb.go b/pkg/util/tracing/tracingpb/recorded_span.pb.go index a4f346ccb853..d70c8c2c1cbd 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.pb.go +++ b/pkg/util/tracing/tracingpb/recorded_span.pb.go @@ -33,8 +33,12 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package type LogRecord struct { // Time of the log record. Time time.Time `protobuf:"bytes,1,opt,name=time,proto3,stdtime" json:"time"` - // Fields with values converted to strings. - Fields []LogRecord_Field `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields"` + // Fields with values converted to strings. In 22.1, the `message` field + // contains the log message, and this field is only used for compatibility + // with 21.2 nodes. + DeprecatedFields []LogRecord_Field `protobuf:"bytes,2,rep,name=deprecated_fields,json=deprecatedFields,proto3" json:"deprecated_fields"` + // The log message. + Message github_com_cockroachdb_redact.RedactableString `protobuf:"bytes,3,opt,name=message,proto3,customtype=github.com/cockroachdb/redact.RedactableString" json:"message"` } func (m *LogRecord) Reset() { *m = LogRecord{} } @@ -275,59 +279,61 @@ func init() { } var fileDescriptor_e9b7b35ae7ab4ca8 = []byte{ - // 828 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x41, 0x6f, 0xe4, 0x34, - 0x14, 0x9e, 0xb4, 0xe9, 0x24, 0xe3, 0x0c, 0x6d, 0xb1, 0x7a, 0xc8, 0x8e, 0x56, 0x49, 0x55, 0x24, - 0xa8, 0x58, 0x29, 0x03, 0x83, 0xb4, 0x54, 0xe5, 0x80, 0x18, 0xba, 0xa0, 0x41, 0xd5, 0x82, 0xd2, - 0x72, 0xd9, 0x4b, 0xe4, 0x89, 0x5d, 0x37, 0xda, 0x34, 0x8e, 0x1c, 0x07, 0x69, 0x10, 0x3f, 0x62, - 0x8f, 0x7b, 0xe4, 0xaf, 0x70, 0xeb, 0x71, 0x8f, 0x2b, 0x0e, 0x03, 0xa4, 0x3f, 0x04, 0x64, 0xc7, - 0xc9, 0x6c, 0x67, 0x2b, 0x8d, 0xa6, 0xe5, 0x94, 0xf8, 0xbd, 0xf7, 0x7d, 0x7e, 0xf6, 0xfb, 0x3e, - 0x83, 0x4f, 0x4b, 0x91, 0xa4, 0x43, 0xc1, 0x51, 0x9c, 0x64, 0xb4, 0xf9, 0xe6, 0xd3, 0x21, 0x27, - 0x31, 0xe3, 0x98, 0xe0, 0xa8, 0xc8, 0x51, 0x16, 0xe4, 0x9c, 0x09, 0x06, 0xf7, 0x63, 0x16, 0xbf, - 0xe4, 0x0c, 0xc5, 0x97, 0x81, 0x44, 0x05, 0xba, 0x3a, 0x68, 0x51, 0x83, 0x3d, 0xca, 0x28, 0x53, - 0xc5, 0x43, 0xf9, 0x57, 0xe3, 0x06, 0x8f, 0x28, 0x63, 0x34, 0x25, 0x43, 0xb5, 0x9a, 0x96, 0x17, - 0x43, 0x94, 0xcd, 0x74, 0xca, 0x5f, 0x4e, 0x89, 0xe4, 0x8a, 0x14, 0x02, 0x5d, 0xe5, 0xba, 0xc0, - 0x5b, 0x2e, 0xc0, 0x25, 0x47, 0x22, 0x61, 0xba, 0xa7, 0x83, 0x7f, 0x0d, 0xd0, 0x3b, 0x65, 0x34, - 0x54, 0xed, 0xc2, 0x23, 0x60, 0x4a, 0x02, 0xd7, 0xd8, 0x37, 0x0e, 0x9d, 0xd1, 0x20, 0xa8, 0xc1, - 0x41, 0x03, 0x0e, 0xce, 0x1b, 0xf6, 0xb1, 0x7d, 0x3d, 0xf7, 0x3b, 0xaf, 0xfe, 0xf2, 0x8d, 0x50, - 0x21, 0xe0, 0x8f, 0xa0, 0x7b, 0x91, 0x90, 0x14, 0x17, 0xee, 0xc6, 0xfe, 0xe6, 0xa1, 0x33, 0xfa, - 0x3c, 0x58, 0x75, 0xd8, 0xa0, 0xdd, 0x36, 0xf8, 0x4e, 0x22, 0xc7, 0xa6, 0xa4, 0x0c, 0x35, 0xcd, - 0x80, 0x82, 0x2d, 0x15, 0x86, 0xbb, 0x60, 0xf3, 0x25, 0x99, 0xa9, 0x96, 0x7a, 0xa1, 0xfc, 0x85, - 0xa7, 0x60, 0xeb, 0x17, 0x94, 0x96, 0xc4, 0xdd, 0x90, 0xb1, 0xf1, 0x53, 0x89, 0xfb, 0x73, 0xee, - 0x07, 0x34, 0x11, 0x97, 0xe5, 0x34, 0x88, 0xd9, 0xd5, 0xb0, 0xdd, 0x1c, 0xcb, 0x51, 0x60, 0x14, - 0x8b, 0x20, 0x54, 0x1f, 0x34, 0x4d, 0xc9, 0x99, 0xe0, 0x49, 0x46, 0xc3, 0x9a, 0xe4, 0xe0, 0x37, - 0xb0, 0x7b, 0x26, 0x78, 0x19, 0x8b, 0x92, 0x13, 0xfc, 0xe0, 0x7b, 0x08, 0x80, 0x95, 0xa3, 0x59, - 0xca, 0x10, 0x56, 0xdd, 0x39, 0xa3, 0xbd, 0xf7, 0xc0, 0xdf, 0x64, 0xb3, 0xb0, 0x29, 0x3a, 0xa8, - 0x2c, 0xd0, 0x0f, 0xb5, 0x56, 0xce, 0x72, 0x94, 0xc1, 0x8f, 0x81, 0x2d, 0xaf, 0x88, 0x44, 0x09, - 0x56, 0xdb, 0x9b, 0x63, 0xa7, 0x9a, 0xfb, 0xd6, 0xb9, 0x8c, 0x4d, 0x4e, 0x42, 0x4b, 0x25, 0x27, - 0x18, 0x7e, 0x04, 0x2c, 0x29, 0x2d, 0x59, 0xb6, 0xa1, 0xca, 0x40, 0x35, 0xf7, 0xbb, 0x92, 0x62, - 0x72, 0x12, 0x76, 0x65, 0x6a, 0x82, 0xe1, 0x53, 0xb0, 0x9d, 0x23, 0x4e, 0x32, 0x11, 0x35, 0xb5, - 0x9b, 0xaa, 0x76, 0xb7, 0x9a, 0xfb, 0xfd, 0x9f, 0x54, 0x46, 0x23, 0xfa, 0xf9, 0x62, 0x85, 0xe1, - 0x63, 0xd0, 0x63, 0x39, 0xa9, 0x85, 0xe2, 0x9a, 0xea, 0xe6, 0x17, 0x01, 0xf8, 0x33, 0xb0, 0xa6, - 0x88, 0x52, 0x44, 0x89, 0xbb, 0xa5, 0x86, 0xfd, 0xd5, 0xea, 0x61, 0xbf, 0x7b, 0xc6, 0x60, 0x5c, - 0xa3, 0x9f, 0x65, 0x82, 0xcf, 0xc2, 0x86, 0x0b, 0x9e, 0x02, 0x53, 0x20, 0x5a, 0xb8, 0x5d, 0xc5, - 0x79, 0xb4, 0x26, 0xe7, 0x39, 0xa2, 0x45, 0x4d, 0xa8, 0x58, 0xe0, 0xb7, 0x00, 0x14, 0x02, 0x71, - 0x11, 0xa9, 0x41, 0x5a, 0x6b, 0x0c, 0xb2, 0xa7, 0x70, 0x32, 0x03, 0xbf, 0x06, 0x76, 0xe3, 0x17, - 0xd7, 0x56, 0x14, 0x8f, 0xde, 0xa3, 0x38, 0xd1, 0x05, 0x35, 0xc3, 0x6b, 0xc9, 0xd0, 0x82, 0xe0, - 0x27, 0x60, 0x87, 0xb7, 0xba, 0x8b, 0x52, 0x46, 0x0b, 0x77, 0x67, 0xdf, 0x38, 0xb4, 0xc3, 0xed, - 0x45, 0xf8, 0x94, 0xd1, 0x02, 0x3e, 0x03, 0xa6, 0xca, 0xf6, 0xd4, 0xe1, 0x9f, 0xac, 0xe1, 0x1e, - 0xed, 0x1b, 0x05, 0x87, 0x2f, 0x80, 0x87, 0x49, 0xce, 0x49, 0x8c, 0x04, 0xc1, 0x51, 0x92, 0x09, - 0xc2, 0x33, 0x94, 0x46, 0x45, 0x2b, 0x70, 0xd7, 0x51, 0x1b, 0xdc, 0xad, 0xca, 0xc7, 0x0b, 0xec, - 0x44, 0x43, 0x17, 0xd6, 0x80, 0x23, 0xd0, 0xa7, 0x8c, 0xb3, 0x52, 0x24, 0x99, 0x52, 0x67, 0x5f, - 0x49, 0x69, 0xa7, 0x9a, 0xfb, 0xce, 0xf7, 0x4d, 0x7c, 0x72, 0x12, 0x3a, 0x6d, 0xd1, 0x04, 0xc3, - 0x01, 0xb0, 0x2f, 0x92, 0x2c, 0x29, 0x2e, 0x09, 0x76, 0x3f, 0x50, 0x07, 0x6f, 0xd7, 0x90, 0x02, - 0xb8, 0xe8, 0x2b, 0xaa, 0x1f, 0xcc, 0xc2, 0xdd, 0x56, 0xfd, 0x8d, 0x56, 0x5f, 0xc0, 0xb2, 0x69, - 0xf5, 0x3d, 0x7c, 0x58, 0x2c, 0xc5, 0x8b, 0xc1, 0x31, 0xe8, 0xbf, 0xab, 0xb8, 0x3b, 0x5e, 0x94, - 0xbd, 0x5b, 0x2f, 0x8a, 0x7e, 0x19, 0x8e, 0x37, 0x8e, 0x8c, 0xc1, 0x97, 0xa0, 0xd7, 0x2a, 0x6b, - 0x1d, 0xe0, 0xb1, 0xf9, 0xfa, 0x77, 0xbf, 0xf3, 0x83, 0x69, 0x83, 0x5d, 0xe7, 0xe0, 0x0f, 0x13, - 0x6c, 0x3f, 0x67, 0xfc, 0x0a, 0xa5, 0xc9, 0xaf, 0xda, 0xe6, 0xb7, 0x1c, 0x66, 0x2c, 0x3b, 0xec, - 0xb9, 0xb6, 0x42, 0xfd, 0x96, 0x1e, 0xaf, 0xbe, 0x8c, 0xdb, 0xec, 0x2b, 0xcc, 0xb0, 0xf9, 0x70, - 0x33, 0x98, 0xf7, 0x31, 0x43, 0xa3, 0xf1, 0xad, 0x87, 0x69, 0xfc, 0x6e, 0xdd, 0x58, 0xff, 0xbb, - 0x6e, 0x60, 0x08, 0xec, 0xf8, 0x32, 0x49, 0x31, 0x27, 0x99, 0x7e, 0x94, 0x3e, 0x5b, 0x77, 0x12, - 0x9a, 0xbc, 0xe5, 0xb9, 0xb7, 0x9e, 0xc6, 0x4f, 0xae, 0xff, 0xf1, 0x3a, 0xd7, 0x95, 0x67, 0xbc, - 0xa9, 0x3c, 0xe3, 0x6d, 0xe5, 0x19, 0x7f, 0x57, 0x9e, 0xf1, 0xea, 0xc6, 0xeb, 0xbc, 0xb9, 0xf1, - 0x3a, 0x6f, 0x6f, 0xbc, 0xce, 0x8b, 0x5e, 0xdb, 0xc4, 0xb4, 0xab, 0x06, 0xf2, 0xc5, 0x7f, 0x01, - 0x00, 0x00, 0xff, 0xff, 0xc8, 0x21, 0x07, 0xaa, 0x9e, 0x08, 0x00, 0x00, + // 849 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x54, 0x4f, 0x6f, 0xdb, 0x36, + 0x1c, 0xb5, 0x6c, 0x25, 0xb2, 0x69, 0x2f, 0x71, 0x89, 0x1c, 0x54, 0xa3, 0x90, 0x8c, 0x0c, 0xd8, + 0x82, 0x15, 0x90, 0x37, 0x0f, 0xe8, 0x82, 0xec, 0x30, 0xcc, 0x4b, 0x37, 0x78, 0x08, 0x8a, 0x42, + 0xc9, 0x2e, 0xbd, 0x18, 0xb4, 0xc8, 0x30, 0x42, 0x65, 0x51, 0x20, 0xa9, 0x01, 0x1e, 0xf6, 0x21, + 0x7a, 0xec, 0x71, 0x5f, 0x65, 0xb7, 0x1c, 0x7b, 0x2c, 0x76, 0xf0, 0x36, 0x05, 0xd8, 0xe7, 0x18, + 0x48, 0xfd, 0x71, 0xe2, 0x06, 0x30, 0x1c, 0xf7, 0x24, 0x91, 0xbf, 0xf7, 0x1e, 0x7f, 0x24, 0xdf, + 0x23, 0xf8, 0x22, 0x95, 0x61, 0x34, 0x90, 0x1c, 0x05, 0x61, 0x4c, 0xcb, 0x6f, 0x32, 0x1d, 0x70, + 0x12, 0x30, 0x8e, 0x09, 0x9e, 0x88, 0x04, 0xc5, 0x5e, 0xc2, 0x99, 0x64, 0xb0, 0x1f, 0xb0, 0xe0, + 0x35, 0x67, 0x28, 0xb8, 0xf2, 0x14, 0xcb, 0x2b, 0xd0, 0x5e, 0xc5, 0xea, 0x1d, 0x50, 0x46, 0x99, + 0x06, 0x0f, 0xd4, 0x5f, 0xce, 0xeb, 0x3d, 0xa6, 0x8c, 0xd1, 0x88, 0x0c, 0xf4, 0x68, 0x9a, 0x5e, + 0x0e, 0x50, 0x3c, 0x2f, 0x4a, 0xee, 0x6a, 0x49, 0x86, 0x33, 0x22, 0x24, 0x9a, 0x25, 0x05, 0xc0, + 0x59, 0x05, 0xe0, 0x94, 0x23, 0x19, 0xb2, 0xa2, 0xa7, 0xc3, 0xff, 0xea, 0xa0, 0x75, 0xc6, 0xa8, + 0xaf, 0xdb, 0x85, 0xc7, 0xc0, 0x54, 0x02, 0xb6, 0xd1, 0x37, 0x8e, 0xda, 0xc3, 0x9e, 0x97, 0x93, + 0xbd, 0x92, 0xec, 0x5d, 0x94, 0xea, 0xa3, 0xe6, 0xf5, 0xc2, 0xad, 0xbd, 0xf9, 0xdb, 0x35, 0x7c, + 0xcd, 0x80, 0x18, 0x3c, 0xc2, 0x24, 0xe1, 0x24, 0x40, 0x92, 0xe0, 0xc9, 0x65, 0x48, 0x22, 0x2c, + 0xec, 0x7a, 0xbf, 0x71, 0xd4, 0x1e, 0x7e, 0xe5, 0xad, 0xdb, 0xb7, 0x57, 0x75, 0xe0, 0xfd, 0xa8, + 0x98, 0x23, 0x53, 0xa9, 0xfb, 0xdd, 0xa5, 0xa2, 0x9e, 0x16, 0xf0, 0x25, 0xb0, 0x66, 0x44, 0x08, + 0x44, 0x89, 0xdd, 0xe8, 0x1b, 0x47, 0xad, 0xd1, 0x33, 0x05, 0xfc, 0x6b, 0xe1, 0x7a, 0x34, 0x94, + 0x57, 0xe9, 0xd4, 0x0b, 0xd8, 0x6c, 0x50, 0xad, 0x86, 0xd5, 0x35, 0x60, 0x14, 0x48, 0xcf, 0xd7, + 0x1f, 0x34, 0x8d, 0xc8, 0xb9, 0xe4, 0x61, 0x4c, 0xfd, 0x52, 0xa6, 0x47, 0xc1, 0x8e, 0xd6, 0x86, + 0x5d, 0xd0, 0x78, 0x4d, 0xe6, 0x7a, 0xe7, 0x2d, 0x5f, 0xfd, 0xc2, 0x33, 0xb0, 0xf3, 0x2b, 0x8a, + 0x52, 0x62, 0xd7, 0xb7, 0x5a, 0x2a, 0x17, 0x39, 0xfc, 0x1d, 0x74, 0xcf, 0x25, 0x4f, 0x03, 0x99, + 0x72, 0x82, 0xb7, 0x3e, 0x6e, 0x0f, 0x58, 0x09, 0x9a, 0x47, 0x0c, 0x61, 0xdd, 0x5d, 0x7b, 0x78, + 0xf0, 0x01, 0xf9, 0xfb, 0x78, 0xee, 0x97, 0xa0, 0xc3, 0xcc, 0x02, 0x1d, 0xbf, 0xb0, 0xe4, 0x79, + 0x82, 0x62, 0xf8, 0x19, 0x68, 0xaa, 0xe3, 0x27, 0x93, 0x10, 0xeb, 0xe5, 0xcd, 0x51, 0x3b, 0x5b, + 0xb8, 0xd6, 0x85, 0x9a, 0x1b, 0x9f, 0xfa, 0x96, 0x2e, 0x8e, 0x31, 0xfc, 0x14, 0x58, 0xca, 0xc1, + 0x0a, 0x56, 0xd7, 0x30, 0x90, 0x2d, 0xdc, 0x5d, 0x25, 0x31, 0x3e, 0xf5, 0x77, 0x55, 0x69, 0x8c, + 0xe1, 0x33, 0xb0, 0x97, 0x20, 0x4e, 0x62, 0x39, 0x29, 0xb1, 0x0d, 0x8d, 0xed, 0x66, 0x0b, 0xb7, + 0xf3, 0x52, 0x57, 0x0a, 0x46, 0x27, 0x59, 0x8e, 0x30, 0x7c, 0x02, 0x5a, 0x2c, 0x21, 0xb9, 0x1f, + 0x6d, 0x53, 0x9f, 0xfc, 0x72, 0x02, 0xfe, 0x02, 0xac, 0x29, 0xa2, 0x54, 0x5d, 0xf6, 0x8e, 0x36, + 0xd2, 0xb7, 0xeb, 0x8d, 0x74, 0x7b, 0x8f, 0xde, 0x28, 0x67, 0x3f, 0x8f, 0x25, 0x9f, 0xfb, 0xa5, + 0x16, 0x3c, 0x03, 0xa6, 0x44, 0x54, 0xd8, 0xbb, 0x5a, 0xf3, 0x78, 0x43, 0xcd, 0x0b, 0x44, 0x45, + 0x2e, 0xa8, 0x55, 0xe0, 0x0f, 0x00, 0x08, 0x89, 0xb8, 0x9c, 0xe8, 0x8b, 0xb4, 0x36, 0xb8, 0xc8, + 0x96, 0xe6, 0xa9, 0x0a, 0xfc, 0x0e, 0x34, 0xcb, 0x58, 0xda, 0x4d, 0x2d, 0xf1, 0xf8, 0x03, 0x89, + 0xd3, 0x02, 0x90, 0x2b, 0xbc, 0x55, 0x0a, 0x15, 0x09, 0x7e, 0x0e, 0xf6, 0x79, 0xe5, 0xbb, 0x49, + 0xc4, 0xa8, 0xb0, 0xf7, 0xfb, 0xc6, 0x51, 0xd3, 0xdf, 0x5b, 0x4e, 0x9f, 0x31, 0x2a, 0xe0, 0x73, + 0x60, 0xea, 0x6a, 0x4b, 0x6f, 0xfe, 0xe9, 0x06, 0xc9, 0x2c, 0x32, 0xa9, 0xe9, 0xf0, 0x15, 0x70, + 0x6e, 0xa5, 0x3d, 0x8c, 0x25, 0xe1, 0x31, 0x8a, 0x26, 0xa2, 0x32, 0xb8, 0xdd, 0xd6, 0x0b, 0xdc, + 0xef, 0xca, 0x27, 0x4b, 0xee, 0xb8, 0xa0, 0x2e, 0xa3, 0x01, 0x87, 0xa0, 0x43, 0x19, 0x67, 0xa9, + 0x0c, 0x63, 0xed, 0xce, 0x8e, 0xb6, 0xd2, 0x7e, 0xb6, 0x70, 0xdb, 0x3f, 0x95, 0xf3, 0xe3, 0x53, + 0xbf, 0x5d, 0x81, 0xc6, 0x18, 0xf6, 0x40, 0xf3, 0x32, 0x8c, 0x43, 0x71, 0x45, 0xb0, 0xfd, 0x89, + 0xde, 0x78, 0x35, 0x86, 0x14, 0xc0, 0x65, 0x5f, 0x93, 0xfc, 0x5d, 0x16, 0xf6, 0x9e, 0xee, 0x6f, + 0xb8, 0xfe, 0x00, 0x56, 0x43, 0x5b, 0x9c, 0xc3, 0x23, 0xb1, 0x32, 0x2f, 0x7a, 0x27, 0xa0, 0x73, + 0xdb, 0x71, 0xf7, 0xbc, 0x28, 0x07, 0x77, 0x5e, 0x94, 0xe2, 0x65, 0x38, 0xa9, 0x1f, 0x1b, 0xbd, + 0x6f, 0x40, 0xab, 0x72, 0xd6, 0x26, 0xc4, 0x13, 0xf3, 0xed, 0x1f, 0x6e, 0xed, 0x67, 0xb3, 0x09, + 0xba, 0xed, 0xc3, 0x3f, 0x4d, 0xb0, 0xf7, 0x82, 0xf1, 0x19, 0x8a, 0xc2, 0xdf, 0x8a, 0x98, 0xdf, + 0x49, 0x98, 0xb1, 0x9a, 0xb0, 0x17, 0x45, 0x14, 0xf2, 0x77, 0xfa, 0x64, 0xfd, 0x61, 0xdc, 0x55, + 0x5f, 0x13, 0x86, 0xc6, 0xf6, 0x61, 0x30, 0x1f, 0x12, 0x86, 0xd2, 0xe3, 0x3b, 0xdb, 0x79, 0xfc, + 0x7e, 0xdf, 0x58, 0x1f, 0xdd, 0x37, 0xd0, 0x07, 0xcd, 0xe0, 0x2a, 0x8c, 0x30, 0x27, 0x71, 0xf1, + 0x28, 0x7d, 0xb9, 0xe9, 0x4d, 0x14, 0xe2, 0x95, 0xce, 0x83, 0xfd, 0x34, 0x7a, 0x7a, 0xfd, 0xaf, + 0x53, 0xbb, 0xce, 0x1c, 0xe3, 0x5d, 0xe6, 0x18, 0xef, 0x33, 0xc7, 0xf8, 0x27, 0x73, 0x8c, 0x37, + 0x37, 0x4e, 0xed, 0xdd, 0x8d, 0x53, 0x7b, 0x7f, 0xe3, 0xd4, 0x5e, 0xb5, 0xaa, 0x26, 0xa6, 0xbb, + 0xfa, 0x42, 0xbe, 0xfe, 0x3f, 0x00, 0x00, 0xff, 0xff, 0x32, 0xb4, 0xfb, 0x84, 0x05, 0x09, 0x00, + 0x00, } func (m *LogRecord) Marshal() (dAtA []byte, err error) { @@ -350,10 +356,17 @@ func (m *LogRecord) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Fields) > 0 { - for iNdEx := len(m.Fields) - 1; iNdEx >= 0; iNdEx-- { + if len(m.Message) > 0 { + i -= len(m.Message) + copy(dAtA[i:], m.Message) + i = encodeVarintRecordedSpan(dAtA, i, uint64(len(m.Message))) + i-- + dAtA[i] = 0x1a + } + if len(m.DeprecatedFields) > 0 { + for iNdEx := len(m.DeprecatedFields) - 1; iNdEx >= 0; iNdEx-- { { - size, err := m.Fields[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + size, err := m.DeprecatedFields[iNdEx].MarshalToSizedBuffer(dAtA[:i]) if err != nil { return 0, err } @@ -762,12 +775,16 @@ func (m *LogRecord) Size() (n int) { _ = l l = github_com_gogo_protobuf_types.SizeOfStdTime(m.Time) n += 1 + l + sovRecordedSpan(uint64(l)) - if len(m.Fields) > 0 { - for _, e := range m.Fields { + if len(m.DeprecatedFields) > 0 { + for _, e := range m.DeprecatedFields { l = e.Size() n += 1 + l + sovRecordedSpan(uint64(l)) } } + l = len(m.Message) + if l > 0 { + n += 1 + l + sovRecordedSpan(uint64(l)) + } return n } @@ -985,7 +1002,7 @@ func (m *LogRecord) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Fields", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field DeprecatedFields", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -1012,11 +1029,43 @@ func (m *LogRecord) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Fields = append(m.Fields, LogRecord_Field{}) - if err := m.Fields[len(m.Fields)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + m.DeprecatedFields = append(m.DeprecatedFields, LogRecord_Field{}) + if err := m.DeprecatedFields[len(m.DeprecatedFields)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Message", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRecordedSpan + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthRecordedSpan + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthRecordedSpan + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Message = github_com_cockroachdb_redact.RedactableString(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipRecordedSpan(dAtA[iNdEx:]) diff --git a/pkg/util/tracing/tracingpb/recorded_span.proto b/pkg/util/tracing/tracingpb/recorded_span.proto index 8ca9697031b0..8259a452ebe9 100644 --- a/pkg/util/tracing/tracingpb/recorded_span.proto +++ b/pkg/util/tracing/tracingpb/recorded_span.proto @@ -26,8 +26,12 @@ message LogRecord { string key = 1; string value = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/redact.RedactableString"]; } - // Fields with values converted to strings. - repeated Field fields = 2 [(gogoproto.nullable) = false]; + // Fields with values converted to strings. In 22.1, the `message` field + // contains the log message, and this field is only used for compatibility + // with 21.2 nodes. + repeated Field deprecated_fields = 2 [(gogoproto.nullable) = false]; + // The log message. + string message = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cockroachdb/redact.RedactableString"]; } // StructuredRecord is a structured message recorded in a traced span. diff --git a/pkg/util/tracing/utils.go b/pkg/util/tracing/utils.go index 92a2f7d34674..d8c323e4e4b2 100644 --- a/pkg/util/tracing/utils.go +++ b/pkg/util/tracing/utils.go @@ -56,6 +56,9 @@ func normalizeSpan(s tracingpb.RecordedSpan, trace Recording) tracingpb.Normaliz // MessageToJSONString converts a protocol message into a JSON string. The // emitDefaults flag dictates whether fields with zero values are rendered or // not. +// +// TODO(andrei): It'd be nice if this function dealt with redactable vs safe +// fields, like EventPayload.AppendJSONFields does. func MessageToJSONString(msg protoutil.Message, emitDefaults bool) (string, error) { // Convert to json. jsonEncoder := jsonpb.Marshaler{EmitDefaults: emitDefaults} From f2f824cd95af08c6e6d005e1ee03ea1326827326 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 22 Sep 2021 21:05:50 -0400 Subject: [PATCH 5/5] changefeedccl: Add metrics to changefeed throttle. Add metrics to changefeed traffic throttler. Release Justification: Small observability changes to the existing functionality. Release Notes: None --- pkg/ccl/changefeedccl/cdcutils/BUILD.bazel | 2 + pkg/ccl/changefeedccl/cdcutils/throttle.go | 57 ++++++++++++++++--- .../changefeedccl/cdcutils/throttle_test.go | 5 +- .../changefeedccl/changefeed_processors.go | 2 +- pkg/ccl/changefeedccl/metrics.go | 4 ++ 5 files changed, 60 insertions(+), 10 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel b/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel index b9ecdcb735c3..993c43479b4e 100644 --- a/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdcutils/BUILD.bazel @@ -9,7 +9,9 @@ go_library( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/settings", "//pkg/util/log", + "//pkg/util/metric", "//pkg/util/quotapool", + "//pkg/util/timeutil", "//pkg/util/tracing", ], ) diff --git a/pkg/ccl/changefeedccl/cdcutils/throttle.go b/pkg/ccl/changefeedccl/cdcutils/throttle.go index febb17a3b5c5..5b4ef59ce831 100644 --- a/pkg/ccl/changefeedccl/cdcutils/throttle.go +++ b/pkg/ccl/changefeedccl/cdcutils/throttle.go @@ -19,7 +19,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" ) @@ -29,6 +31,7 @@ type Throttler struct { messageLimiter *quotapool.RateLimiter byteLimiter *quotapool.RateLimiter flushLimiter *quotapool.RateLimiter + metrics *Metrics } // AcquireMessageQuota acquires quota for a message with the specified size. @@ -43,10 +46,10 @@ func (t *Throttler) AcquireMessageQuota(ctx context.Context, sz int) error { ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-%s", t.name)) defer span.Finish() - if err := t.messageLimiter.WaitN(ctx, 1); err != nil { + if err := waitQuota(ctx, 1, t.messageLimiter, t.metrics.MessagesPushbackNanos); err != nil { return err } - return t.byteLimiter.WaitN(ctx, int64(sz)) + return waitQuota(ctx, int64(sz), t.byteLimiter, t.metrics.BytesPushbackNanos) } // AcquireFlushQuota acquires quota for a message with the specified size. @@ -60,8 +63,7 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error { var span *tracing.Span ctx, span = tracing.ChildSpan(ctx, fmt.Sprintf("quota-wait-flush-%s", t.name)) defer span.Finish() - - return t.flushLimiter.WaitN(ctx, 1) + return waitQuota(ctx, 1, t.flushLimiter, t.metrics.FlushPushbackNanos) } func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) { @@ -85,7 +87,7 @@ func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) { } // NewThrottler creates a new throttler with the specified configuration. -func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Throttler { +func NewThrottler(name string, config changefeedbase.SinkThrottleConfig, m *Metrics) *Throttler { logSlowAcquisition := quotapool.OnSlowAcquisition(500*time.Millisecond, quotapool.LogSlowAcquisition) t := &Throttler{ name: name, @@ -98,6 +100,7 @@ func NewThrottler(name string, config changefeedbase.SinkThrottleConfig) *Thrott flushLimiter: quotapool.NewRateLimiter( fmt.Sprintf("%s-flushes", name), 0, 0, logSlowAcquisition, ), + metrics: m, } t.updateConfig(config) return t @@ -109,7 +112,7 @@ var nodeSinkThrottle = struct { }{} // NodeLevelThrottler returns node level Throttler for changefeeds. -func NodeLevelThrottler(sv *settings.Values) *Throttler { +func NodeLevelThrottler(sv *settings.Values, metrics *Metrics) *Throttler { getConfig := func() (config changefeedbase.SinkThrottleConfig) { configStr := changefeedbase.NodeSinkThrottleConfig.Get(sv) if configStr != "" { @@ -126,7 +129,7 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler { if nodeSinkThrottle.Throttler != nil { panic("unexpected state") } - nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig()) + nodeSinkThrottle.Throttler = NewThrottler("cf.node.throttle", getConfig(), metrics) // Update node throttler configs when settings change. changefeedbase.NodeSinkThrottleConfig.SetOnChange(sv, func(ctx context.Context) { nodeSinkThrottle.Throttler.updateConfig(getConfig()) @@ -135,3 +138,43 @@ func NodeLevelThrottler(sv *settings.Values) *Throttler { return nodeSinkThrottle.Throttler } + +// Metrics is a metric.Struct for kvfeed metrics. +type Metrics struct { + BytesPushbackNanos *metric.Counter + MessagesPushbackNanos *metric.Counter + FlushPushbackNanos *metric.Counter +} + +// MakeMetrics constructs a Metrics struct with the provided histogram window. +func MakeMetrics(histogramWindow time.Duration) Metrics { + makeMetric := func(n string) metric.Metadata { + return metric.Metadata{ + Name: fmt.Sprintf("changefeed.%s.messages_pushback_nanos", n), + Help: fmt.Sprintf("Total time spent throttled for %s quota", n), + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } + } + + return Metrics{ + BytesPushbackNanos: metric.NewCounter(makeMetric("bytes")), + MessagesPushbackNanos: metric.NewCounter(makeMetric("messages")), + FlushPushbackNanos: metric.NewCounter(makeMetric("flush")), + } +} + +var _ metric.Struct = (*Metrics)(nil) + +// MetricStruct makes Metrics a metric.Struct. +func (m Metrics) MetricStruct() {} + +func waitQuota( + ctx context.Context, n int64, limit *quotapool.RateLimiter, c *metric.Counter, +) error { + start := timeutil.Now() + defer func() { + c.Inc(int64(timeutil.Now().Sub(start))) + }() + return limit.WaitN(ctx, n) +} diff --git a/pkg/ccl/changefeedccl/cdcutils/throttle_test.go b/pkg/ccl/changefeedccl/cdcutils/throttle_test.go index 0ef6769889ef..9a0517752e36 100644 --- a/pkg/ccl/changefeedccl/cdcutils/throttle_test.go +++ b/pkg/ccl/changefeedccl/cdcutils/throttle_test.go @@ -11,6 +11,7 @@ package cdcutils import ( "context" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -24,8 +25,8 @@ func TestNodeLevelThrottler(t *testing.T) { defer log.Scope(t).Close(t) sv := &cluster.MakeTestingClusterSettings().SV - - throttler := NodeLevelThrottler(sv) + m := MakeMetrics(time.Minute) + throttler := NodeLevelThrottler(sv, &m) // Default: no throttling require.True(t, throttler.messageLimiter.AdmitN(10000000)) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index b98a8aa799aa..ddb6c2061a2a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -281,7 +281,7 @@ func (ca *changeAggregator) startKVFeed( cfg := ca.flowCtx.Cfg buf := kvevent.NewThrottlingBuffer( kvevent.NewMemBuffer(ca.kvFeedMemMon.MakeBoundAccount(), &cfg.Settings.SV, &ca.metrics.KVFeedMetrics), - cdcutils.NodeLevelThrottler(&cfg.Settings.SV)) + cdcutils.NodeLevelThrottler(&cfg.Settings.SV, &ca.metrics.ThrottleMetrics)) // KVFeed takes ownership of the kvevent.Writer portion of the buffer, while // we return the kvevent.Reader part to the caller. diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 6f7857f339ab..9e18a2729376 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -12,6 +12,7 @@ import ( "context" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -233,6 +234,7 @@ type Metrics struct { Running *metric.Gauge FrontierUpdates *metric.Counter + ThrottleMetrics cdcutils.Metrics mu struct { syncutil.Mutex @@ -270,7 +272,9 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct { Running: metric.NewGauge(metaChangefeedRunning), FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates), + ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow), } + m.mu.resolved = make(map[int]hlc.Timestamp) m.mu.id = 1 // start the first id at 1 so we can detect initialization m.MaxBehindNanos = metric.NewFunctionalGauge(metaChangefeedMaxBehindNanos, func() int64 {