Skip to content

Commit

Permalink
util/tracing: simplify log messages in trace recordings
Browse files Browse the repository at this point in the history
Before this patch, the RecordedSpan proto stored log messages in a very
awkward way: each message was stored as a collection of key/values, with
only one such pair present (using a well-known key). This was confusing,
unnecessary, hard to work with and hard to track for figuring out what
keys and values are in there (with the answer being only one key). This
patch simplifies the log messages, making them be represented by a
single string as nature intended. A bunch of code gets simplified in
consequence.

Release note: None
  • Loading branch information
andreimatei committed Sep 28, 2021
1 parent 411723d commit 6828a2e
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 153 deletions.
11 changes: 2 additions & 9 deletions pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"math/rand"
"reflect"
"runtime"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
Expand Down Expand Up @@ -225,14 +224,8 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
}
}
var splitRetries int
for _, rec := range getRec() {
for _, l := range rec.Logs {
for _, line := range l.Fields {
if strings.Contains(line.Value.StripMarkers(), "SSTable cannot be added spanning range bounds") {
splitRetries++
}
}
}
for _, sp := range getRec() {
splitRetries += tracing.CountLogMessages(sp, "SSTable cannot be added spanning range bounds")
}
if splitRetries != expectedSplitRetries {
t.Fatalf("expected %d split-caused retries, got %d", expectedSplitRetries, splitRetries)
Expand Down
22 changes: 8 additions & 14 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,16 +1018,14 @@ func (m *monitor) collectRecordings() string {
rec := g.collect()
for _, span := range rec {
for _, log := range span.Logs {
for _, field := range log.Fields {
if prev > 0 {
prev--
continue
}
logs = append(logs, logRecord{
g: g, value: field.Value.StripMarkers(),
})
g.prevEvents++
if prev > 0 {
prev--
continue
}
logs = append(logs, logRecord{
g: g, value: log.Msg().StripMarkers(),
})
g.prevEvents++
}
}
if atomic.LoadInt32(&g.finished) == 1 {
Expand Down Expand Up @@ -1068,11 +1066,7 @@ func (m *monitor) hasNewEvents(g *monitoredGoroutine) bool {
events := 0
rec := g.collect()
for _, span := range rec {
for _, log := range span.Logs {
for range log.Fields {
events++
}
}
events += len(span.Logs)
}
return events > g.prevEvents
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -916,11 +916,11 @@ func (n *Node) batchInternal(

var br *roachpb.BatchResponse
if err := n.stopper.RunTaskWithErr(ctx, "node.Node: batch", func(ctx context.Context) error {
var finishSpan func(*roachpb.BatchResponse)
var finishSpan func(context.Context, *roachpb.BatchResponse)
// Shadow ctx from the outer function. Written like this to pass the linter.
ctx, finishSpan = n.setupSpanForIncomingRPC(ctx, tenID)
// NB: wrapped to delay br evaluation to its value when returning.
defer func() { finishSpan(br) }()
defer func() { finishSpan(ctx, br) }()
if log.HasSpanOrEvent(ctx) {
log.Eventf(ctx, "node received request: %s", args.Summary())
}
Expand Down Expand Up @@ -1062,7 +1062,7 @@ func (n *Node) Batch(
// be nil in case no response is to be returned to the rpc caller.
func (n *Node) setupSpanForIncomingRPC(
ctx context.Context, tenID roachpb.TenantID,
) (context.Context, func(*roachpb.BatchResponse)) {
) (context.Context, func(context.Context, *roachpb.BatchResponse)) {
// The operation name matches the one created by the interceptor in the
// remoteTrace case below.
const opName = "/cockroach.roachpb.Internal/Batch"
Expand All @@ -1083,7 +1083,7 @@ func (n *Node) setupSpanForIncomingRPC(
}
}

finishSpan := func(br *roachpb.BatchResponse) {
finishSpan := func(ctx context.Context, br *roachpb.BatchResponse) {
if newSpan != nil {
newSpan.Finish()
}
Expand All @@ -1098,8 +1098,12 @@ func (n *Node) setupSpanForIncomingRPC(
// sensitive stripped out of the verbose messages. However,
// structured payloads stay untouched.
if rec := grpcSpan.GetRecording(); rec != nil {
maybeRedactRecording(tenID, rec)
br.CollectedSpans = append(br.CollectedSpans, rec...)
err := redactRecordingForTenant(tenID, rec)
if err == nil {
br.CollectedSpans = append(br.CollectedSpans, rec...)
} else {
log.Errorf(ctx, "error redacting trace recording: %s", err)
}
}
}
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/server/node_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,38 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

var sRedactedMarker = redact.RedactableString(redact.EscapeBytes(nil))

func maybeRedactRecording(tenID roachpb.TenantID, rec tracing.Recording) {
// redactRecordingForTenant redacts the sensitive parts of log messages in the
// recording if the tenant to which this recording is intended is not the system
// tenant (the system tenant gets an. See https://github.com/cockroachdb/cockroach/issues/70407.
// The recording is modified in place.
//
// tenID is the tenant that will receive this recording.
func redactRecordingForTenant(tenID roachpb.TenantID, rec tracing.Recording) error {
if tenID == roachpb.SystemTenantID {
return
return nil
}
// For tenants, strip the verbose log messages. See:
// https://github.com/cockroachdb/cockroach/issues/70407
for i := range rec {
sp := &rec[i]
sp.Tags = nil
for j := range sp.Logs {
record := &sp.Logs[j]
for k := range record.Fields {
field := &record.Fields[k]
if record.Message != "" && !sp.RedactableLogs {
// If Message is set, the record should have been produced by a 22.1
// node that also sets RedactableLogs.
return errors.AssertionFailedf(
"recording has non-redactable span with the Message field set: %s", sp)
}
record.Message = record.Message.Redact()

// For compatibility with old versions, also redact DeprecatedFields.
for k := range record.DeprecatedFields {
field := &record.DeprecatedFields[k]
if field.Key != tracingpb.LogMessageField {
// We don't have any of these fields, but let's not take any
// chances (our dependencies might slip them in).
Expand All @@ -51,4 +65,5 @@ func maybeRedactRecording(tenID roachpb.TenantID, rec tracing.Recording) {
}
}
}
return nil
}
8 changes: 4 additions & 4 deletions pkg/server/node_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// TestMaybeRedactRecording verifies that maybeRedactRecording strips
// TestMaybeRedactRecording verifies that redactRecordingForTenant strips
// sensitive details for recordings consumed by tenants.
//
// See kvccl.TestTenantTracesAreRedacted for an end-to-end test of this.
Expand Down Expand Up @@ -59,7 +59,7 @@ func TestMaybeRedactRecording(t *testing.T) {

t.Run("regular-tenant", func(t *testing.T) {
rec := mkRec()
maybeRedactRecording(roachpb.MakeTenantID(100), rec)
require.NoError(t, redactRecordingForTenant(roachpb.MakeTenantID(100), rec))
require.Zero(t, rec[0].Tags)
require.Len(t, rec[0].Logs, 1)
msg := rec[0].Logs[0].Fields[0].Value
Expand All @@ -72,7 +72,7 @@ func TestMaybeRedactRecording(t *testing.T) {

t.Run("system-tenant", func(t *testing.T) {
rec := mkRec()
maybeRedactRecording(roachpb.SystemTenantID, rec)
require.NoError(t, redactRecordingForTenant(roachpb.SystemTenantID, rec))
require.Equal(t, map[string]string{
"_verbose": "1",
"all_span_tags_are_stripped": "because_no_redactability",
Expand All @@ -93,7 +93,7 @@ func TestMaybeRedactRecording(t *testing.T) {
// you're here to see why this test failed to compile, ensure that the
// change you're making to RecordedSpan does not include new sensitive data
// that may leak from the KV layer to tenants. If it does, update
// maybeRedactRecording appropriately.
// redactRecordingForTenant appropriately.
type calcifiedRecordedSpan struct {
TraceID uint64
SpanID uint64
Expand Down
16 changes: 2 additions & 14 deletions pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,23 +659,11 @@ func (s *statusServer) Allocator(

func recordedSpansToTraceEvents(spans []tracingpb.RecordedSpan) []*serverpb.TraceEvent {
var output []*serverpb.TraceEvent
var buf bytes.Buffer
for _, sp := range spans {
for _, entry := range sp.Logs {
event := &serverpb.TraceEvent{
Time: entry.Time,
}
if len(entry.Fields) == 1 {
event.Message = entry.Fields[0].Value.StripMarkers()
} else {
buf.Reset()
for i, f := range entry.Fields {
if i != 0 {
buf.WriteByte(' ')
}
fmt.Fprintf(&buf, "%s:%v", f.Key, f.Value)
}
event.Message = buf.String()
Time: entry.Time,
Message: entry.Msg().StripMarkers(),
}
output = append(output, event)
}
Expand Down
10 changes: 4 additions & 6 deletions pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,11 @@ func TestLimitScans(t *testing.T) {
}
}
for _, l := range span.Logs {
for _, f := range l.Fields {
match := re.FindStringSubmatch(f.Value.StripMarkers())
if match == nil {
continue
}
ranges[match[1]] = struct{}{}
match := re.FindStringSubmatch(l.Msg().StripMarkers())
if match == nil {
continue
}
ranges[match[1]] = struct{}{}
}
}
if len(ranges) != 1 {
Expand Down
6 changes: 4 additions & 2 deletions pkg/util/tracing/crdbspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,10 @@ func (s *crdbSpan) record(msg redact.RedactableString) {
now = time.Now()
}
logRecord := &tracingpb.LogRecord{
Time: now,
Fields: []tracingpb.LogRecord_Field{
Time: now,
Message: msg,
// Compatibility with 21.2.
DeprecatedFields: []tracingpb.LogRecord_Field{
{Key: tracingpb.LogMessageField, Value: msg},
},
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/util/tracing/recording.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ func (r Recording) visitSpan(sp tracingpb.RecordedSpan, depth int) []traceLogDat

for _, l := range sp.Logs {
lastLog := ownLogs[len(ownLogs)-1]
ownLogs = append(ownLogs, conv("event:"+l.Msg(), l.Time, lastLog.Timestamp))
var sb redact.StringBuilder
sb.Printf("event:%s", l.Msg())
ownLogs = append(ownLogs, conv(sb.RedactableString(), l.Time, lastLog.Timestamp))
}

// If the span was verbose then the Structured events would have been
Expand Down Expand Up @@ -353,13 +355,13 @@ func (r Recording) ToJaegerJSON(stmt, comment, nodeStr string) (string, error) {
})
}
for _, l := range sp.Logs {
jl := jaegerjson.Log{Timestamp: uint64(l.Time.UnixNano() / 1000)}
for _, field := range l.Fields {
jl.Fields = append(jl.Fields, jaegerjson.KeyValue{
Key: field.Key,
Value: field.Value,
jl := jaegerjson.Log{
Timestamp: uint64(l.Time.UnixNano() / 1000),
Fields: []jaegerjson.KeyValue{{
Key: "event",
Value: l.Msg(),
Type: "STRING",
})
}},
}
s.Logs = append(s.Logs, jl)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/tracing/span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,8 @@ func TestSpanRecordLimit(t *testing.T) {
first := rec[0].Logs[0]
last := rec[0].Logs[len(rec[0].Logs)-1]

require.Equal(t, first.Fields[0].Value.StripMarkers(), msg(extra+1))
require.Equal(t, last.Fields[0].Value.StripMarkers(), msg(numLogs+extra))
require.Equal(t, first.Msg().StripMarkers(), msg(extra+1))
require.Equal(t, last.Msg().StripMarkers(), msg(numLogs+extra))
}

// testStructuredImpl is a testing implementation of Structured event.
Expand Down
22 changes: 5 additions & 17 deletions pkg/util/tracing/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,8 @@ func FindMsgInRecording(recording Recording, msg string) int {
// LogsContainMsg returns true if a Span's logs contain the given message.
func LogsContainMsg(sp tracingpb.RecordedSpan, msg string) bool {
for _, l := range sp.Logs {
// NOTE: With our logs, each LogRecord has a single field ("event") and
// value.
for _, f := range l.Fields {
if strings.Contains(f.Value.StripMarkers(), msg) {
return true
}
if strings.Contains(l.Msg().StripMarkers(), msg) {
return true
}
}
return false
Expand All @@ -50,12 +46,8 @@ func LogsContainMsg(sp tracingpb.RecordedSpan, msg string) bool {
func CountLogMessages(sp tracingpb.RecordedSpan, msg string) int {
res := 0
for _, l := range sp.Logs {
// NOTE: With our logs, each LogRecord has a single field ("event") and
// value.
for _, f := range l.Fields {
if strings.Contains(f.Value.StripMarkers(), msg) {
res++
}
if strings.Contains(l.Msg().StripMarkers(), msg) {
res++
}
}
return res
Expand Down Expand Up @@ -144,11 +136,7 @@ func CheckRecordedSpans(rec Recording, expected string) error {
row(d, " tags: %s", strings.Join(tags, " "))
}
for _, l := range rs.Logs {
var msg string
for _, f := range l.Fields {
msg = msg + fmt.Sprintf(" %s: %v", f.Key, f.Value.StripMarkers())
}
row(d, "%s", msg)
row(d, " event: %s", l.Msg().StripMarkers())
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/util/tracing/tracingpb/recorded_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ func (s *RecordedSpan) Structured(visit func(*types.Any, time.Time)) {
// Msg extracts the message of the LogRecord, which is either in an "event" or
// "error" field.
func (l LogRecord) Msg() redact.RedactableString {
for _, f := range l.Fields {
if l.Message != "" {
return l.Message
}

// Compatibility with 21.2: look at l.DeprecatedFields.
for _, f := range l.DeprecatedFields {
key := f.Key
if key == LogMessageField {
return f.Value
Expand Down
Loading

0 comments on commit 6828a2e

Please sign in to comment.