Skip to content

Commit

Permalink
util/log: delay the formatting of log entries
Browse files Browse the repository at this point in the history
Prior to this patch, the logging events were converted to a
`logpb.Entry` very early in the logging pipeline. This was forcing the
conversion of the logging tags to a flat string too early, and making
it hard for (e.g.) a JSON formatter to preserve the structure of
logging tags.

This patch averts this by introducing a new `logEntry` type which has
more-or-less the same structure as `logpb.Entry` but keep the logging
tags structured until the point the entry is formatted.

Release note: None
  • Loading branch information
knz committed Dec 18, 2020
1 parent ba9e26d commit 14d01f0
Show file tree
Hide file tree
Showing 20 changed files with 286 additions and 176 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug_merge_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func writeLogStream(
if _, err = w.Write(prefixBytes); err != nil {
return err
}
return log.FormatEntry(ei.Entry, w)
return log.FormatLegacyEntry(ei.Entry, w)
}

g, ctx := errgroup.WithContext(context.Background())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cli/zip.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func runDebugZip(cmd *cobra.Command, args []string) (retErr error) {
// We're also going to print a warning at the end.
warnRedactLeak = true
}
if err := log.FormatEntry(e, logOut); err != nil {
if err := log.FormatLegacyEntry(e, logOut); err != nil {
return err
}
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/server/debug/logspy.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ func (spy *logSpy) run(ctx context.Context, w io.Writer, opts logSpyOptions) (er
defer func() {
if err == nil {
if dropped := atomic.LoadInt32(&countDropped); dropped > 0 {
entry := log.MakeEntry(
entry := log.MakeLegacyEntry(
ctx, severity.WARNING, channel.DEV,
0 /* depth */, false, /* redactable */
0 /* depth */, true, /* redactable */
"%d messages were dropped", log.Safe(dropped))
err = log.FormatEntry(entry, w) // modify return value
err = log.FormatLegacyEntry(entry, w) // modify return value
}
}
}()
Expand All @@ -176,9 +176,9 @@ func (spy *logSpy) run(ctx context.Context, w io.Writer, opts logSpyOptions) (er
entries := make(chan logpb.Entry, logSpyChanCap)

{
entry := log.MakeEntry(
entry := log.MakeLegacyEntry(
ctx, severity.INFO, channel.DEV,
0 /* depth */, false, /* redactable */
0 /* depth */, true, /* redactable */
"intercepting logs with options %+v", opts)
entries <- entry
}
Expand Down Expand Up @@ -218,7 +218,7 @@ func (spy *logSpy) run(ctx context.Context, w io.Writer, opts logSpyOptions) (er
return

case entry := <-entries:
if err := log.FormatEntry(entry, w); err != nil {
if err := log.FormatLegacyEntry(entry, w); err != nil {
return errors.Wrapf(err, "while writing entry %v", entry)
}
count++
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/log/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func logfDepth(
}

logger := logging.getLogger(ch)
entry := MakeEntry(
entry := makeUnstructuredEntry(
ctx, sev, ch,
depth+1, true /* redactable */, format, args...)
depth+1, format, args...)
if sp, el, ok := getSpanOrEventLog(ctx); ok {
eventInternal(sp, el, (sev >= severity.ERROR), entry)
eventInternal(sp, el, (sev >= severity.ERROR), entry.convertToLegacy())
}
logger.outputLogEntry(entry)
}
Expand Down
32 changes: 17 additions & 15 deletions pkg/util/log/clog.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/cockroachdb/cockroach/pkg/cli/exit"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -248,36 +247,35 @@ func SetTenantIDs(tenantID string, sqlInstanceID int32) {
// outputLogEntry marshals a log entry proto into bytes, and writes
// the data to the log files. If a trace location is set, stack traces
// are added to the entry before marshaling.
func (l *loggerT) outputLogEntry(entry logpb.Entry) {
func (l *loggerT) outputLogEntry(entry logEntry) {
if f, ok := logging.interceptor.Load().(InterceptorFn); ok && f != nil {
f(entry)
f(entry.convertToLegacy())
return
}

// Mark the logger as active, so that further configuration changes
// are disabled. See IsActive() and its callers for details.
setActive()
var stacks []byte
var fatalTrigger chan struct{}
extraSync := false

if entry.Severity == severity.FATAL {
if entry.sev == severity.FATAL {
extraSync = true
logging.signalFatalCh()

switch traceback {
case tracebackSingle:
stacks = getStacks(false)
entry.stacks = getStacks(false)
case tracebackAll:
stacks = getStacks(true)
entry.stacks = getStacks(true)
}

for _, s := range l.sinkInfos {
stacks = s.sink.attachHints(stacks)
entry.stacks = s.sink.attachHints(entry.stacks)
}

// Explain to the (human) user that we would like to hear from them.
stacks = append(stacks, []byte(fatalErrorPostamble)...)
entry.stacks = append(entry.stacks, []byte(fatalErrorPostamble)...)

// We don't want to hang forever writing our final log message. If
// things are broken (for example, if the disk fills up and there
Expand All @@ -296,7 +294,7 @@ func (l *loggerT) outputLogEntry(entry logpb.Entry) {
logging.mu.Lock()
if logging.mu.exitOverride.f != nil {
if logging.mu.exitOverride.hideStack {
stacks = []byte("stack trace omitted via SetExitFunc()\n")
entry.stacks = []byte("stack trace omitted via SetExitFunc()\n")
}
exitFunc = logging.mu.exitOverride.f
}
Expand Down Expand Up @@ -337,17 +335,21 @@ func (l *loggerT) outputLogEntry(entry logpb.Entry) {
// Stopper's Stop() call (e.g. the pgwire async processing
// goroutine). These asynchronous log calls are concurrent with
// the stderrSinkInfo update in (*TestLogScope).Close().
if entry.Severity < s.threshold.Get() || !s.sink.active() {
if entry.sev < s.threshold.Get() || !s.sink.active() {
continue
}
editedEntry := maybeRedactEntry(entry, s.editor)
editedEntry := entry

// Add a counter. This is important for e.g. the SQL audit logs.
// Note: whether the counter is displayed or not depends on
// the formatter.
editedEntry.Counter = atomic.AddUint64(&s.msgCount, 1)
editedEntry.counter = atomic.AddUint64(&s.msgCount, 1)

bufs.b[i] = s.formatter.formatEntry(editedEntry, stacks)
// Process the redation spec.
editedEntry.payload = maybeRedactEntry(editedEntry.payload, s.editor)

// Format the entry for this sink.
bufs.b[i] = s.formatter.formatEntry(editedEntry)
someSinkActive = true
}

Expand Down Expand Up @@ -398,7 +400,7 @@ func (l *loggerT) outputLogEntry(entry logpb.Entry) {
}

// Flush and exit on fatal logging.
if entry.Severity == severity.FATAL {
if entry.sev == severity.FATAL {
close(fatalTrigger)
// Note: although it seems like the function is allowed to return
// below when s == severity.FATAL, this is not so, because the
Expand Down
10 changes: 4 additions & 6 deletions pkg/util/log/clog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,8 @@ func TestEntryDecoder(t *testing.T) {
Tags: tags,
Message: msg,
}
var f formatCrdbV1
buf := f.formatEntry(entry, nil /* stacks */)
defer putBuffer(buf)
var buf bytes.Buffer
FormatLegacyEntry(entry, &buf)
return buf.String()
}

Expand Down Expand Up @@ -773,10 +772,9 @@ func BenchmarkHeader(b *testing.B) {
File: "file.go",
Line: 100,
}
var f formatCrdbV1
for i := 0; i < b.N; i++ {
buf := f.formatEntry(entry, nil /* stacks */)
putBuffer(buf)
var w bytes.Buffer
FormatLegacyEntry(entry, &w)
}
}

Expand Down
28 changes: 14 additions & 14 deletions pkg/util/log/event_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ package log

import (
"context"
"encoding/json"

"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
Expand All @@ -29,19 +28,20 @@ func StructuredEvent(ctx context.Context, event eventpb.EventPayload) {
if len(common.EventType) == 0 {
common.EventType = eventpb.GetEventTypeName(event)
}
// TODO(knz): Avoid marking all the JSON payload as redactable. Do
// redaction per-field.
b, err := json.Marshal(event)
if err != nil {
Fatalf(ctx, "unexpected JSON encoding error: %+v", err)
}

// TODO(knz): Avoid escaping the JSON format when emitting the payload
// to an external sink.
entry := makeStructuredEntry(ctx,
severity.INFO,
event.LoggingChannel(),
// Note: we use depth 0 intentionally here, so that structured
// events can be reliably detected (their source filename will
// always be log/event_log.go).
0, /* depth */
event)

if sp, el, ok := getSpanOrEventLog(ctx); ok {
eventInternal(sp, el, (entry.sev >= severity.ERROR), entry.convertToLegacy())
}

// Note: we use depth 0 intentionally here, so that structured
// events can be reliably detected (their source filename will
// always be log/event_log.go).
// TODO(knz): Consider another way to mark structured events.
logfDepth(ctx, 0, severity.INFO, event.LoggingChannel(), "Structured event: %s", string(b))
logger := logging.getLogger(entry.ch)
logger.outputLogEntry(entry)
}
6 changes: 3 additions & 3 deletions pkg/util/log/exit_override.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ func (l *loggerT) exitLocked(err error, code exit.Code) {
// This assumes l.outputMu is held, but l.fileSink.mu is not held.
func (l *loggerT) reportErrorEverywhereLocked(ctx context.Context, err error) {
// Make a valid log entry for this error.
entry := MakeEntry(
entry := makeUnstructuredEntry(
ctx, severity.ERROR, channel.OPS,
2 /* depth */, true, /* redactable */
2, /* depth */
"logging error: %v", err)

// Either stderr or our log file is broken. Try writing the error to both
Expand All @@ -96,7 +96,7 @@ func (l *loggerT) reportErrorEverywhereLocked(ctx context.Context, err error) {
for _, s := range l.sinkInfos {
sink := s.sink
if logpb.Severity_ERROR >= s.threshold && sink.active() {
buf := s.formatter.formatEntry(entry, nil /*stack*/)
buf := s.formatter.formatEntry(entry)
sink.emergencyOutput(buf.Bytes())
putBuffer(buf)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/log/file_log_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestSecondaryGC(t *testing.T) {

testLogGC(t, logger,
func(ctx context.Context, format string, args ...interface{}) {
entry := MakeEntry(ctx, severity.INFO, channel.DEV, 1, si.redactable,
entry := makeUnstructuredEntry(ctx, severity.INFO, channel.DEV, 1,
format, /* nolint:fmtsafe */
args...)
logger.outputLogEntry(entry)
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {

// Force a log entry. This does two things: it forces the creation
// of a file and it also introduces a timestamp marker.
entry := MakeEntry(secLoggersCtx, severity.INFO, channel.DEV, 0, false,
entry := makeUnstructuredEntry(secLoggersCtx, severity.INFO, channel.DEV, 0,
"stderr capture started")
secLogger.outputLogEntry(entry)

Expand Down
25 changes: 13 additions & 12 deletions pkg/util/log/format_crdb_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import (
"github.com/cockroachdb/ttycolor"
)

// FormatEntry writes the log entry to the specified writer.
func FormatEntry(e logpb.Entry, w io.Writer) error {
var f formatCrdbV1WithCounter
buf := f.formatEntry(e, nil)
// FormatLegacyEntry writes the legacy log entry to the specified writer.
func FormatLegacyEntry(e logpb.Entry, w io.Writer) error {
buf := formatLogEntryInternal(e, true /*showCounter*/, nil, nil)
defer putBuffer(buf)
_, err := w.Write(buf.Bytes())
return err
Expand All @@ -39,8 +38,8 @@ type formatCrdbV1 struct{}

func (formatCrdbV1) formatterName() string { return "crdb-v1" }

func (formatCrdbV1) formatEntry(entry logpb.Entry, stacks []byte) *buffer {
return formatLogEntryInternal(entry, false /*showCounter*/, nil, stacks)
func (formatCrdbV1) formatEntry(entry logEntry) *buffer {
return formatLogEntryInternal(entry.convertToLegacy(), false /*showCounter*/, nil, entry.stacks)
}

// formatCrdbV1WithCounter is the canonical log format including a
Expand All @@ -49,8 +48,8 @@ type formatCrdbV1WithCounter struct{}

func (formatCrdbV1WithCounter) formatterName() string { return "crdb-v1-count" }

func (formatCrdbV1WithCounter) formatEntry(entry logpb.Entry, stacks []byte) *buffer {
return formatLogEntryInternal(entry, true /*showCounter*/, nil, stacks)
func (formatCrdbV1WithCounter) formatEntry(entry logEntry) *buffer {
return formatLogEntryInternal(entry.convertToLegacy(), true /*showCounter*/, nil, entry.stacks)
}

// formatCrdbV1TTY is like formatCrdbV1 and includes VT color codes if
Expand All @@ -60,12 +59,12 @@ type formatCrdbV1TTY struct{}

func (formatCrdbV1TTY) formatterName() string { return "crdb-v1-tty" }

func (formatCrdbV1TTY) formatEntry(entry logpb.Entry, stacks []byte) *buffer {
func (formatCrdbV1TTY) formatEntry(entry logEntry) *buffer {
cp := ttycolor.StderrProfile
if logging.stderrSink.noColor.Get() {
cp = nil
}
return formatLogEntryInternal(entry, false /*showCounter*/, cp, stacks)
return formatLogEntryInternal(entry.convertToLegacy(), false /*showCounter*/, cp, entry.stacks)
}

// formatCrdbV1ColorWithCounter is like formatCrdbV1WithCounter and
Expand All @@ -75,14 +74,16 @@ type formatCrdbV1TTYWithCounter struct{}

func (formatCrdbV1TTYWithCounter) formatterName() string { return "crdb-v1-tty-count" }

func (formatCrdbV1TTYWithCounter) formatEntry(entry logpb.Entry, stacks []byte) *buffer {
func (formatCrdbV1TTYWithCounter) formatEntry(entry logEntry) *buffer {
cp := ttycolor.StderrProfile
if logging.stderrSink.noColor.Get() {
cp = nil
}
return formatLogEntryInternal(entry, true /*showCounter*/, cp, stacks)
return formatLogEntryInternal(entry.convertToLegacy(), true /*showCounter*/, cp, entry.stacks)
}

const severityChar = "IWEF"

// formatEntryInternal renders a log entry.
// Log lines are colorized depending on severity.
// It uses a newly allocated *buffer. The caller is responsible
Expand Down
6 changes: 2 additions & 4 deletions pkg/util/log/formats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@

package log

import "github.com/cockroachdb/cockroach/pkg/util/log/logpb"

type logFormatter interface {
formatterName() string
// formatEntry formats a logpb.Entry into a newly allocated *buffer.
// formatEntry formats a logEntry into a newly allocated *buffer.
// The caller is responsible for calling putBuffer() afterwards.
formatEntry(entry logpb.Entry, stacks []byte) *buffer
formatEntry(entry logEntry) *buffer
}

var formatters = func() map[string]logFormatter {
Expand Down
Loading

0 comments on commit 14d01f0

Please sign in to comment.