Skip to content

Commit

Permalink
util/log: delay the formatting of log entries
Browse files Browse the repository at this point in the history
Prior to this patch, the logging events were converted to a
`logpb.Entry` very early in the logging pipeline. This was forcing the
conversion of the logging tags to a flat string too early, and making
it hard for (e.g.) a JSON formatter to preserve the structure of
logging tags.

This patch averts this by introducing a new `logEntry` type which has
more-or-less the same structure as `logpb.Entry` but keep the logging
tags structured until the point the entry is formatted.

Release note: None
  • Loading branch information
knz committed Dec 22, 2020
1 parent f41f07c commit a74b36a
Show file tree
Hide file tree
Showing 21 changed files with 330 additions and 209 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug_merge_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func writeLogStream(
if _, err = w.Write(prefixBytes); err != nil {
return err
}
return log.FormatEntry(ei.Entry, w)
return log.FormatLegacyEntry(ei.Entry, w)
}

g, ctx := errgroup.WithContext(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func runDebugZip(cmd *cobra.Command, args []string) (retErr error) {
// We're also going to print a warning at the end.
warnRedactLeak = true
}
if err := log.FormatEntry(e, logOut); err != nil {
if err := log.FormatLegacyEntry(e, logOut); err != nil {
return err
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/server/debug/logspy.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ func (spy *logSpy) run(ctx context.Context, w io.Writer, opts logSpyOptions) (er
defer func() {
if err == nil {
if dropped := atomic.LoadInt32(&countDropped); dropped > 0 {
entry := log.MakeEntry(
entry := log.MakeLegacyEntry(
ctx, severity.WARNING, channel.DEV,
0 /* depth */, false, /* redactable */
0 /* depth */, true, /* redactable */
"%d messages were dropped", log.Safe(dropped))
err = log.FormatEntry(entry, w) // modify return value
err = log.FormatLegacyEntry(entry, w) // modify return value
}
}
}()
Expand All @@ -176,9 +176,9 @@ func (spy *logSpy) run(ctx context.Context, w io.Writer, opts logSpyOptions) (er
entries := make(chan logpb.Entry, logSpyChanCap)

{
entry := log.MakeEntry(
entry := log.MakeLegacyEntry(
ctx, severity.INFO, channel.DEV,
0 /* depth */, false, /* redactable */
0 /* depth */, true, /* redactable */
"intercepting logs with options %+v", opts)
entries <- entry
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (spy *logSpy) run(ctx context.Context, w io.Writer, opts logSpyOptions) (er
return

case entry := <-entries:
if err := log.FormatEntry(entry, w); err != nil {
if err := log.FormatLegacyEntry(entry, w); err != nil {
return errors.Wrapf(err, "while writing entry %v", entry)
}
count++
Expand Down
3 changes: 2 additions & 1 deletion pkg/testutils/lint/passes/fmtsafe/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ var requireConstFmt = map[string]bool{
// Note: More of the logging functions are populated here via the
// init() function below.

"github.com/cockroachdb/cockroach/pkg/util/log.MakeEntry": true,
"github.com/cockroachdb/cockroach/pkg/util/log.MakeLegacyEntry": true,
"github.com/cockroachdb/cockroach/pkg/util/log.makeUnstructuredEntry": true,
"github.com/cockroachdb/cockroach/pkg/util/log.FormatWithContextTags": true,
"github.com/cockroachdb/cockroach/pkg/util/log.renderArgsAsRedactable": true,
"github.com/cockroachdb/cockroach/pkg/util/log.formatArgs": true,
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/log/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func logfDepth(
}

logger := logging.getLogger(ch)
entry := MakeEntry(
entry := makeUnstructuredEntry(
ctx, sev, ch,
depth+1, true /* redactable */, format, args...)
if sp, el, ok := getSpanOrEventLog(ctx); ok {
eventInternal(sp, el, (sev >= severity.ERROR), entry)
eventInternal(sp, el, (sev >= severity.ERROR), entry.convertToLegacy())
}
logger.outputLogEntry(entry)
}
Expand Down
32 changes: 17 additions & 15 deletions pkg/util/log/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -248,36 +247,35 @@ func SetTenantIDs(tenantID string, sqlInstanceID int32) {
// outputLogEntry marshals a log entry proto into bytes, and writes
// the data to the log files. If a trace location is set, stack traces
// are added to the entry before marshaling.
func (l *loggerT) outputLogEntry(entry logpb.Entry) {
func (l *loggerT) outputLogEntry(entry logEntry) {
if f, ok := logging.interceptor.Load().(InterceptorFn); ok && f != nil {
f(entry)
f(entry.convertToLegacy())
return
}

// Mark the logger as active, so that further configuration changes
// are disabled. See IsActive() and its callers for details.
setActive()
var stacks []byte
var fatalTrigger chan struct{}
extraSync := false

if entry.Severity == severity.FATAL {
if entry.sev == severity.FATAL {
extraSync = true
logging.signalFatalCh()

switch traceback {
case tracebackSingle:
stacks = getStacks(false)
entry.stacks = getStacks(false)
case tracebackAll:
stacks = getStacks(true)
entry.stacks = getStacks(true)
}

for _, s := range l.sinkInfos {
stacks = s.sink.attachHints(stacks)
entry.stacks = s.sink.attachHints(entry.stacks)
}

// Explain to the (human) user that we would like to hear from them.
stacks = append(stacks, []byte(fatalErrorPostamble)...)
entry.stacks = append(entry.stacks, []byte(fatalErrorPostamble)...)

// We don't want to hang forever writing our final log message. If
// things are broken (for example, if the disk fills up and there
Expand All @@ -296,7 +294,7 @@ func (l *loggerT) outputLogEntry(entry logpb.Entry) {
logging.mu.Lock()
if logging.mu.exitOverride.f != nil {
if logging.mu.exitOverride.hideStack {
stacks = []byte("stack trace omitted via SetExitFunc()\n")
entry.stacks = []byte("stack trace omitted via SetExitFunc()\n")
}
exitFunc = logging.mu.exitOverride.f
}
Expand Down Expand Up @@ -337,17 +335,21 @@ func (l *loggerT) outputLogEntry(entry logpb.Entry) {
// Stopper's Stop() call (e.g. the pgwire async processing
// goroutine). These asynchronous log calls are concurrent with
// the stderrSinkInfo update in (*TestLogScope).Close().
if entry.Severity < s.threshold.Get() || !s.sink.active() {
if entry.sev < s.threshold.Get() || !s.sink.active() {
continue
}
editedEntry := maybeRedactEntry(entry, s.editor)
editedEntry := entry

// Add a counter. This is important for e.g. the SQL audit logs.
// Note: whether the counter is displayed or not depends on
// the formatter.
editedEntry.Counter = atomic.AddUint64(&s.msgCount, 1)
editedEntry.counter = atomic.AddUint64(&s.msgCount, 1)

bufs.b[i] = s.formatter.formatEntry(editedEntry, stacks)
// Process the redation spec.
editedEntry.payload = maybeRedactEntry(editedEntry.payload, s.editor)

// Format the entry for this sink.
bufs.b[i] = s.formatter.formatEntry(editedEntry)
someSinkActive = true
}

Expand Down Expand Up @@ -398,7 +400,7 @@ func (l *loggerT) outputLogEntry(entry logpb.Entry) {
}

// Flush and exit on fatal logging.
if entry.Severity == severity.FATAL {
if entry.sev == severity.FATAL {
close(fatalTrigger)
// Note: although it seems like the function is allowed to return
// below when s == severity.FATAL, this is not so, because the
Expand Down
61 changes: 31 additions & 30 deletions pkg/util/log/clog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ func TestEntryDecoder(t *testing.T) {
Tags: tags,
Message: msg,
}
var f formatCrdbV1
buf := f.formatEntry(entry, nil /* stacks */)
defer putBuffer(buf)
var buf bytes.Buffer
_ = FormatLegacyEntry(entry, &buf)
return buf.String()
}

Expand Down Expand Up @@ -635,36 +634,39 @@ func TestRollover(t *testing.T) {
// right now clog writes straight to os.StdErr.
func TestFatalStacktraceStderr(t *testing.T) {
defer leaktest.AfterTest(t)()
defer ScopeWithoutShowLogs(t).Close(t)

SetExitFunc(false /* hideStack */, func(exit.Code) {})
for _, level := range []int{tracebackNone, tracebackSingle, tracebackAll} {
t.Run(fmt.Sprintf("%d", level), func(t *testing.T) {
defer ScopeWithoutShowLogs(t).Close(t)

defer capture()()
SetExitFunc(false /* hideStack */, func(exit.Code) {})

for _, level := range []int{tracebackNone, tracebackSingle, tracebackAll} {
traceback = level
Fatalf(context.Background(), "cinap")
cont := contents()
if !strings.Contains(cont, " cinap") {
t.Fatalf("panic output does not contain cinap:\n%s", cont)
}
if !strings.Contains(cont, "clog_test") {
t.Fatalf("stack trace does not contain file name: %s", cont)
}
switch traceback {
case tracebackNone:
if strings.Count(cont, "goroutine ") > 0 {
t.Fatalf("unexpected stack trace:\n%s", cont)
defer capture()()

traceback = level
Fatalf(context.Background(), "cinap")
cont := contents()
if !strings.Contains(cont, " cinap") {
t.Fatalf("panic output does not contain cinap:\n%s", cont)
}
case tracebackSingle:
if strings.Count(cont, "goroutine ") != 1 {
t.Fatalf("stack trace contains too many goroutines: %s", cont)
if !strings.Contains(cont, "clog_test") {
t.Fatalf("stack trace does not contain file name: %s", cont)
}
case tracebackAll:
if strings.Count(cont, "goroutine ") < 2 {
t.Fatalf("stack trace contains less than two goroutines: %s", cont)
switch traceback {
case tracebackNone:
if strings.Count(cont, "goroutine ") > 0 {
t.Fatalf("unexpected stack trace:\n%s", cont)
}
case tracebackSingle:
if strings.Count(cont, "goroutine ") != 1 {
t.Fatalf("stack trace contains too many goroutines: %s", cont)
}
case tracebackAll:
if strings.Count(cont, "goroutine ") < 2 {
t.Fatalf("stack trace contains less than two goroutines: %s", cont)
}
}
}
})
}
}

Expand Down Expand Up @@ -773,10 +775,9 @@ func BenchmarkHeader(b *testing.B) {
File: "file.go",
Line: 100,
}
var f formatCrdbV1
for i := 0; i < b.N; i++ {
buf := f.formatEntry(entry, nil /* stacks */)
putBuffer(buf)
var w bytes.Buffer
_ = FormatLegacyEntry(entry, &w)
}
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/util/log/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package log

import (
"context"
"encoding/json"

"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
Expand All @@ -29,19 +28,20 @@ func StructuredEvent(ctx context.Context, event eventpb.EventPayload) {
if len(common.EventType) == 0 {
common.EventType = eventpb.GetEventTypeName(event)
}
// TODO(knz): Avoid marking all the JSON payload as redactable. Do
// redaction per-field.
b, err := json.Marshal(event)
if err != nil {
Fatalf(ctx, "unexpected JSON encoding error: %+v", err)
}

// TODO(knz): Avoid escaping the JSON format when emitting the payload
// to an external sink.
entry := makeStructuredEntry(ctx,
severity.INFO,
event.LoggingChannel(),
// Note: we use depth 0 intentionally here, so that structured
// events can be reliably detected (their source filename will
// always be log/event_log.go).
0, /* depth */
event)

if sp, el, ok := getSpanOrEventLog(ctx); ok {
eventInternal(sp, el, (entry.sev >= severity.ERROR), entry.convertToLegacy())
}

// Note: we use depth 0 intentionally here, so that structured
// events can be reliably detected (their source filename will
// always be log/event_log.go).
// TODO(knz): Consider another way to mark structured events.
logfDepth(ctx, 0, severity.INFO, event.LoggingChannel(), "Structured event: %s", string(b))
logger := logging.getLogger(entry.ch)
logger.outputLogEntry(entry)
}
7 changes: 4 additions & 3 deletions pkg/util/log/exit_override.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ func (l *loggerT) exitLocked(err error, code exit.Code) {
// This assumes l.outputMu is held, but l.fileSink.mu is not held.
func (l *loggerT) reportErrorEverywhereLocked(ctx context.Context, err error) {
// Make a valid log entry for this error.
entry := MakeEntry(
entry := makeUnstructuredEntry(
ctx, severity.ERROR, channel.OPS,
2 /* depth */, true, /* redactable */
2, /* depth */
true, /* redactable */
"logging error: %v", err)

// Either stderr or our log file is broken. Try writing the error to both
Expand All @@ -96,7 +97,7 @@ func (l *loggerT) reportErrorEverywhereLocked(ctx context.Context, err error) {
for _, s := range l.sinkInfos {
sink := s.sink
if logpb.Severity_ERROR >= s.threshold && sink.active() {
buf := s.formatter.formatEntry(entry, nil /*stack*/)
buf := s.formatter.formatEntry(entry)
sink.emergencyOutput(buf.Bytes())
putBuffer(buf)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/util/log/file_log_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ func TestSecondaryGC(t *testing.T) {

testLogGC(t, logger,
func(ctx context.Context, format string, args ...interface{}) {
entry := MakeEntry(ctx, severity.INFO, channel.DEV, 1, si.redactable,
entry := makeUnstructuredEntry(ctx, severity.INFO, channel.DEV, 1,
true, /* redactable */
format, /* nolint:fmtsafe */
args...)
logger.outputLogEntry(entry)
Expand Down
5 changes: 4 additions & 1 deletion pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {

// Force a log entry. This does two things: it forces the creation
// of a file and it also introduces a timestamp marker.
entry := MakeEntry(secLoggersCtx, severity.INFO, channel.DEV, 0, false,
entry := makeUnstructuredEntry(secLoggersCtx, severity.INFO, channel.DEV, 0,
// Note: we need this entry to be marked as non-redactable since
// it's going to be followed by junk printed by the go runtime.
false, /* redactable */
"stderr capture started")
secLogger.outputLogEntry(entry)

Expand Down
Loading

0 comments on commit a74b36a

Please sign in to comment.