Skip to content

Commit

Permalink
util/log: new experimental integration with Fluentd
Browse files Browse the repository at this point in the history
Release note (cli change): It is now possible to redirect logging to
[Fluentd](https://www.fluentd.org)-compatible network collectors. See
the documentation for details. This is an alpha-quality feature.

Release note (cli change): It is now possible to set the `format`
parameter of any log sink, including file sinks, to `json-fluent` or
`json-fluent-compact` to write entries as structured JSON.
  • Loading branch information
knz committed Nov 30, 2020
1 parent 5662523 commit a6255d5
Show file tree
Hide file tree
Showing 15 changed files with 984 additions and 11 deletions.
4 changes: 4 additions & 0 deletions pkg/cli/exit/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func FatalError() Code { return Code{7} }
// in the logging system.
func TimeoutAfterFatalError() Code { return Code{8} }

// LoggingNetCollectorUnavailable (9) indicates that an error occurred
// during a logging operation to a network collector.
func LoggingNetCollectorUnavailable() Code { return Code{9} }

// Codes that are specific to client commands follow. It's possible
// for codes to be reused across separate client or server commands.
// Command-specific exit codes should be allocated down from 125.
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ go_library(
"file_log_gc.go",
"file_sync_buffer.go",
"flags.go",
"fluent_client.go",
"format_crdb_v1.go",
"format_json.go",
"formats.go",
"get_stacks.go",
"intercept.go",
Expand Down Expand Up @@ -123,6 +125,7 @@ go_test(
"file_log_gc_test.go",
"file_test.go",
"flags_test.go",
"fluent_client_test.go",
"main_test.go",
"redact_test.go",
"secondary_log_test.go",
Expand Down
57 changes: 57 additions & 0 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func init() {
// we cannot keep redaction markers there.
*defaultConfig.Sinks.Stderr.Redactable = false
// Remove all sinks other than stderr.
defaultConfig.Sinks.FluentServers = nil
defaultConfig.Sinks.FileGroups = nil

if _, err := ApplyConfig(defaultConfig); err != nil {
Expand Down Expand Up @@ -274,6 +275,26 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}

// Create the fluent sinks.
for _, fc := range config.Sinks.FluentServers {
if fc.Filter == severity.NONE {
continue
}
fluentSinkInfo, err := newFluentSinkInfo(*fc)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fluentSinkInfo)
allSinkInfos.put(fluentSinkInfo)

// Connect the channels for this sink.
for _, ch := range fc.Channels.Channels {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
}
}

logging.setChannelLoggers(chans, &stderrSinkInfo)
setActive()

Expand All @@ -298,6 +319,18 @@ func newFileSinkInfo(fileNamePrefix string, c logconfig.FileConfig) (*sinkInfo,
return info, fileSink, nil
}

// newFluentSinkInfo creates a new fluentSink and its accompanying sinkInfo
// from the provided configuration.
func newFluentSinkInfo(c logconfig.FluentConfig) (*sinkInfo, error) {
info := &sinkInfo{}
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
fluentSink := newFluentSink(c.Net, c.Address)
info.sink = fluentSink
return info, nil
}

// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
Expand Down Expand Up @@ -421,6 +454,30 @@ func DescribeAppliedConfig() string {
return nil
})

// Describe the fluent sinks.
config.Sinks.FluentServers = make(map[string]*logconfig.FluentConfig)
sIdx := 1
allSinkInfos.iter(func(l *sinkInfo) error {
fluentSink, ok := l.sink.(*fluentSink)
if !ok {
return nil
}

fc := &logconfig.FluentConfig{}
fc.CommonSinkConfig = l.describeAppliedConfig()
fc.Net = fluentSink.network
fc.Address = fluentSink.addr

// Describe the connections to this fluent sink.
for ch, logger := range logging.channels {
describeConnections(logger, ch, l, &fc.Channels)
}
skey := fmt.Sprintf("s%d", sIdx)
sIdx++
config.Sinks.FluentServers[skey] = fc
return nil
})

// Note: we cannot return 'config' directly, because this captures
// certain variables from the loggers by reference and thus could be
// invalidated by concurrent uses of ApplyConfig().
Expand Down
122 changes: 122 additions & 0 deletions pkg/util/log/fluent_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package log

import (
"errors"
"fmt"
"net"

"github.com/cockroachdb/cockroach/pkg/cli/exit"
)

// fluentSink represents a Fluentd-compatible network collector.
type fluentSink struct {
// The network address of the fluentd collector.
network string
addr string

// good indicates that the connection can be used.
good bool
conn net.Conn
}

func newFluentSink(network, addr string) *fluentSink {
f := &fluentSink{
addr: addr,
network: network,
}
return f
}

func (l *fluentSink) String() string {
return fmt.Sprintf("fluent:%s://%s", l.network, l.addr)
}

// activeAtSeverity implements the logSink interface.
func (l *fluentSink) active() bool { return true }

// attachHints implements the logSink interface.
func (l *fluentSink) attachHints(stacks []byte) []byte {
return stacks
}

// exitCode implements the logSink interface.
func (l *fluentSink) exitCode() exit.Code {
return exit.LoggingNetCollectorUnavailable()
}

// output implements the logSink interface.
func (l *fluentSink) output(extraSync bool, b []byte) error {
// Try to write and reconnect immediately if the first write fails.
//
// TODO(knz): Add some net socket write deadlines here.
_ = l.tryWrite(b)
if l.good {
return nil
}

if err := l.ensureConn(b); err != nil {
return err
}
return l.tryWrite(b)
}

// emergencyOutput implements the logSink interface.
func (l *fluentSink) emergencyOutput(b []byte) {
// TODO(knz): Add some net socket write deadlines here.
_ = l.tryWrite(b)
if !l.good {
_ = l.ensureConn(b)
_ = l.tryWrite(b)
}
}

func (l *fluentSink) close() {
l.good = false
if l.conn != nil {
if err := l.conn.Close(); err != nil {
fmt.Fprintf(OrigStderr, "error closing network logger: %v\n", err)
}
l.conn = nil
}
}

func (l *fluentSink) ensureConn(b []byte) error {
if l.good {
return nil
}
l.close()
var err error
l.conn, err = net.Dial(l.network, l.addr)
if err != nil {
fmt.Fprintf(OrigStderr, "%s: error dialing network logger: %v\n%s", l, err, b)
return err
}
fmt.Fprintf(OrigStderr, "%s: connection to network logger resumed\n", l)
l.good = true
return nil
}

var errNoConn = errors.New("no connection opened")

func (l *fluentSink) tryWrite(b []byte) error {
if !l.good {
return errNoConn
}
n, err := l.conn.Write(b)
if err != nil || n < len(b) {
fmt.Fprintf(OrigStderr, "%s: logging error: %v or short write (%d/%d)\n%s",
l, err, n, len(b), b)
l.good = false
}
return err
}
54 changes: 54 additions & 0 deletions pkg/util/log/fluent_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package log

import (
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
)

func TestFluentClientEncodeEntry(t *testing.T) {
tm, err := time.Parse(MessageTimeFormat, "060102 15:04:05.654321")
if err != nil {
t.Fatal(err)
}
testCases := []struct {
entry logpb.Entry
expected string
}{
{logpb.Entry{}, `
{"tag":"log_test.dev","c":0,"t":"0.000000000","s":0,"g":0,"f":"","l":0,"n":0,"r":0,"message":""}
`},
{logpb.Entry{Time: tm.UnixNano(), Severity: severity.INFO, File: "hello", Goroutine: 876, Line: 432, Counter: 543, Redactable: true}, `
{"tag":"log_test.dev","c":0,"t":"1136214245.654321000","s":1,"sev":"I","g":876,"f":"hello","l":432,"n":543,"r":1,"message":""}
`},
// Check escaping of newlines and other special characters.
// However generally unicode characters can be preserved.
{logpb.Entry{Tags: "hai", Message: "hello\nsnowman " + `"☃"`}, `
{"tag":"log_test.dev","c":0,"t":"0.000000000","s":0,"g":0,"f":"","l":0,"n":0,"r":0,"tags":"hai","message":"hello\nsnowman \"☃\""}
`},
}

for _, tc := range testCases {
var f formatFluentJSONCompact
buf := f.formatEntry(tc.entry, nil)
exp := strings.TrimPrefix(tc.expected, "\n")
actual := buf.String()
putBuffer(buf)
if exp != actual {
t.Errorf("expected:\n%s\ngot:\n%s", exp, actual)
}
}
}
Loading

0 comments on commit a6255d5

Please sign in to comment.