diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md
index a3649de33ae0..6f85194c705f 100644
--- a/docs/generated/logsinks.md
+++ b/docs/generated/logsinks.md
@@ -6,6 +6,8 @@ The supported log output sink types are documented below.
- [output to files](#sink-output-to-files)
+- [output to Fluentd-compatible log collectors](#sink-output-to-fluentd-compatible-log-collectors)
+
- [standard error stream](#sink-standard-error-stream)
@@ -91,6 +93,88 @@ Configuration options shared across all sink types:
+
+
+## Sink type: output to Fluentd-compatible log collectors
+
+
+This sink type causes logging data to be sent over the network, to
+a log collector that can ingest log data in a
+[Fluentd](https://www.fluentd.org)-compatible protocol.
+
+Note that TLS is not supported yet: the connection to the log
+collector is neither authenticated nor encrypted. Given that
+logging events may contain sensitive information, care should be
+taken to keep the log collector and the CockroachDB node close
+together on a private network, or connect them using a secure
+VPN. TLS support may be added at a later date.
+
+At the time of this writing, a Fluent sink buffers at most one log
+entry and retries sending the event at most one time if a network
+error is encountered. This is just sufficient to tolerate a restart
+of the Fluentd collector after a configuration change under light
+logging activity. If the server is unavailable for too long, or if
+more than one error is encountered, an error is reported to the
+process' standard error output with a copy of the logging event and
+the logging event is dropped.
+
+The configuration key under the `sinks` key in the YAML
+configuration is `fluent-servers`. Example configuration:
+
+ sinks:
+ fluent-servers: # fluent configurations start here
+ health: # defines one sink called "health"
+ channels: HEALTH
+ address: 127.0.0.1:5170
+
+A cascading defaults mechanism is available for configurations:
+every new server sink configured automatically inherits the
+configurations set in the `fluent-defaults` section.
+
+For example:
+
+ fluent-defaults:
+ redactable: false # default: disable redaction markers
+ sinks:
+ fluent-servers:
+ health:
+ channels: HEALTH
+ # This sink has redactable set to false,
+ # as the setting is inherited from fluent-defaults
+ # unless overridden here.
+
+The default output format for Fluent sinks is
+`json-fluent-compact`. The `fluent` variants of the JSON formats
+include a `tag` field as required by the Fluentd protocol, which
+the non-`fluent` JSON format variants do not include.
+
+Users are invited to peruse the `check-log-config` tool to
+verify the effect of defaults inheritance.
+
+
+
+Type-specific configuration options:
+
+| Field | Description |
+|--|--|
+| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
+| `net` | the protocol for the fluent server. Can be "tcp", "udp", "tcp4", etc. |
+| `address` | the network address of the fluent server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. |
+
+
+Configuration options shared across all sink types:
+
+| Field | Description |
+|--|--|
+| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. |
+| `format` | the entry format to use. |
+| `redact` | whether to strip sensitive information before log events are emitted to this sink. |
+| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. |
+| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. |
+| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. |
+
+
+
## Sink type: standard error stream
diff --git a/pkg/cli/exit/codes.go b/pkg/cli/exit/codes.go
index 95dcd6d6c2fd..ac41c99f8626 100644
--- a/pkg/cli/exit/codes.go
+++ b/pkg/cli/exit/codes.go
@@ -54,6 +54,10 @@ func FatalError() Code { return Code{7} }
// in the logging system.
func TimeoutAfterFatalError() Code { return Code{8} }
+// LoggingNetCollectorUnavailable (9) indicates that an error occurred
+// during a logging operation to a network collector.
+func LoggingNetCollectorUnavailable() Code { return Code{9} }
+
// Codes that are specific to client commands follow. It's possible
// for codes to be reused across separate client or server commands.
// Command-specific exit codes should be allocated down from 125.
diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go
index 1fe3e09e3f0f..45f0c9003339 100644
--- a/pkg/cli/log_flags_test.go
+++ b/pkg/cli/log_flags_test.go
@@ -35,12 +35,11 @@ func TestSetupLogging(t *testing.T) {
reSimplify := regexp.MustCompile(`(?ms:^\s*(auditable: false|redact: false|exit-on-error: true|max-group-size: 100MiB)\n)`)
const defaultFluentConfig = `fluent-defaults: {` +
- `buffered-writes: true, ` +
`filter: INFO, ` +
`format: json-fluent-compact, ` +
`redactable: true, ` +
`exit-on-error: false` +
- `}, `
+ `}`
stdFileDefaultsRe := regexp.MustCompile(
`file-defaults: \{dir: (?P[^,]+), max-file-size: 10MiB, buffered-writes: true, filter: INFO, format: crdb-v2, redactable: true\}`)
fileDefaultsNoMaxSizeRe := regexp.MustCompile(
@@ -106,7 +105,7 @@ func TestSetupLogging(t *testing.T) {
actual = reWhitespace2.ReplaceAllString(actual, "{")
// Shorten the configuration for legibility during reviews of test changes.
- actual = strings.ReplaceAll(actual, defaultFluentConfig, "")
+ actual = strings.ReplaceAll(actual, defaultFluentConfig, "")
actual = stdFileDefaultsRe.ReplaceAllString(actual, "")
actual = fileDefaultsNoMaxSizeRe.ReplaceAllString(actual, "")
actual = strings.ReplaceAll(actual, fileDefaultsNoDir, "")
diff --git a/pkg/cli/testdata/logflags b/pkg/cli/testdata/logflags
index c6402b426349..d84ee4b68025 100644
--- a/pkg/cli/testdata/logflags
+++ b/pkg/cli/testdata/logflags
@@ -13,6 +13,7 @@ run
start
----
config: {)>,
+,
sinks: {file-groups: {default: )>,
+,
sinks: {file-groups: {default: ,
+,
sinks: {}}
run
init
----
config: {,
+,
sinks: {}}
@@ -74,6 +78,7 @@ run
bank
----
config: {,
+,
sinks: {}}
@@ -83,6 +88,7 @@ run
demo
----
config: {,
+,
sinks: {}}
@@ -97,6 +103,7 @@ start
--store=type=mem,size=3g
----
config: {,
+,
sinks: {}}
@@ -108,6 +115,7 @@ start
--store=path=/pathB
----
config: {,
+,
sinks: {file-groups: {default: ,
+,
sinks: {file-groups: {default: ,
+,
sinks: {file-groups: {default: ,
+,
sinks: {file-groups: {default: )>,
+,
sinks: {file-groups: {default: )>,
+,
sinks: {file-groups: {default: ,
+,
sinks: {}}
@@ -276,6 +290,7 @@ start
--log-dir=/mypath
----
config: {,
+,
sinks: {file-groups: {default: ,
+,
sinks: {file-groups: {default: ,
+,
sinks: {file-groups: {default: {channels: all,
dir: /mypath,
buffered-writes: true,
@@ -337,6 +354,7 @@ start
--logtostderr=INFO
----
config: {)>,
+,
sinks: {file-groups: {default: )>,
+,
sinks: {file-groups: {default: ,
+,
sinks: {}}
# Default when no severity is specified is WARNING.
@@ -387,6 +407,7 @@ init
--logtostderr
----
config: {,
+,
sinks: {}}
diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel
index d31ce2e40151..5abe77344c67 100644
--- a/pkg/util/log/BUILD.bazel
+++ b/pkg/util/log/BUILD.bazel
@@ -15,6 +15,7 @@ go_library(
"file_log_gc.go",
"file_sync_buffer.go",
"flags.go",
+ "fluent_client.go",
"format_crdb_v1.go",
"format_crdb_v2.go",
"format_json.go",
@@ -127,6 +128,7 @@ go_test(
"file_log_gc_test.go",
"file_test.go",
"flags_test.go",
+ "fluent_client_test.go",
"format_crdb_v2_test.go",
"format_json_test.go",
"main_test.go",
diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go
index b8935dd05974..26d3e0d15856 100644
--- a/pkg/util/log/flags.go
+++ b/pkg/util/log/flags.go
@@ -64,6 +64,7 @@ func init() {
// we cannot keep redaction markers there.
*defaultConfig.Sinks.Stderr.Redactable = false
// Remove all sinks other than stderr.
+ defaultConfig.Sinks.FluentServers = nil
defaultConfig.Sinks.FileGroups = nil
if _, err := ApplyConfig(defaultConfig); err != nil {
@@ -279,6 +280,26 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}
+ // Create the fluent sinks.
+ for _, fc := range config.Sinks.FluentServers {
+ if fc.Filter == severity.NONE {
+ continue
+ }
+ fluentSinkInfo, err := newFluentSinkInfo(*fc)
+ if err != nil {
+ cleanupFn()
+ return nil, err
+ }
+ sinkInfos = append(sinkInfos, fluentSinkInfo)
+ allSinkInfos.put(fluentSinkInfo)
+
+ // Connect the channels for this sink.
+ for _, ch := range fc.Channels.Channels {
+ l := chans[ch]
+ l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
+ }
+ }
+
logging.setChannelLoggers(chans, &stderrSinkInfo)
setActive()
@@ -305,6 +326,18 @@ func newFileSinkInfo(
return info, fileSink, nil
}
+// newFluentSinkInfo creates a new fluentSink and its accompanying sinkInfo
+// from the provided configuration.
+func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {
+ info := &sinkInfo{}
+ if err := info.applyConfig(c.CommonSinkConfig); err != nil {
+ return nil, err
+ }
+ fluentSink := newFluentSink(c.Net, c.Address)
+ info.sink = fluentSink
+ return info, nil
+}
+
// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
@@ -438,6 +471,30 @@ func DescribeAppliedConfig() string {
return nil
})
+ // Describe the fluent sinks.
+ config.Sinks.FluentServers = make(map[string]*logconfig.FluentSinkConfig)
+ sIdx := 1
+ _ = allSinkInfos.iter(func(l *sinkInfo) error {
+ fluentSink, ok := l.sink.(*fluentSink)
+ if !ok {
+ return nil
+ }
+
+ fc := &logconfig.FluentSinkConfig{}
+ fc.CommonSinkConfig = l.describeAppliedConfig()
+ fc.Net = fluentSink.network
+ fc.Address = fluentSink.addr
+
+ // Describe the connections to this fluent sink.
+ for ch, logger := range chans {
+ describeConnections(logger, ch, l, &fc.Channels)
+ }
+ skey := fmt.Sprintf("s%d", sIdx)
+ sIdx++
+ config.Sinks.FluentServers[skey] = fc
+ return nil
+ })
+
// Note: we cannot return 'config' directly, because this captures
// certain variables from the loggers by reference and thus could be
// invalidated by concurrent uses of ApplyConfig().
diff --git a/pkg/util/log/fluent_client.go b/pkg/util/log/fluent_client.go
new file mode 100644
index 000000000000..5a681dcaf6ce
--- /dev/null
+++ b/pkg/util/log/fluent_client.go
@@ -0,0 +1,131 @@
+// Copyright 2020 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package log
+
+import (
+ "fmt"
+ "net"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/cli/exit"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/cockroachdb/errors"
+)
+
+// fluentSink represents a Fluentd-compatible network collector.
+type fluentSink struct {
+ // The network address of the fluentd collector.
+ network string
+ addr string
+
+ // good indicates that the connection can be used.
+ good bool
+ conn net.Conn
+}
+
+const fluentDialTimeout = 5 * time.Second
+const fluentWriteTimeout = time.Second
+
+func newFluentSink(network, addr string) *fluentSink {
+ f := &fluentSink{
+ addr: addr,
+ network: network,
+ }
+ return f
+}
+
+func (l *fluentSink) String() string {
+ return fmt.Sprintf("fluent:%s://%s", l.network, l.addr)
+}
+
+// activeAtSeverity implements the logSink interface.
+func (l *fluentSink) active() bool { return true }
+
+// attachHints implements the logSink interface.
+func (l *fluentSink) attachHints(stacks []byte) []byte {
+ return stacks
+}
+
+// exitCode implements the logSink interface.
+func (l *fluentSink) exitCode() exit.Code {
+ return exit.LoggingNetCollectorUnavailable()
+}
+
+// output implements the logSink interface.
+func (l *fluentSink) output(extraSync bool, b []byte) error {
+ // Try to write and reconnect immediately if the first write fails.
+ _ = l.tryWrite(b)
+ if l.good {
+ return nil
+ }
+
+ if err := l.ensureConn(b); err != nil {
+ return err
+ }
+ return l.tryWrite(b)
+}
+
+// emergencyOutput implements the logSink interface.
+func (l *fluentSink) emergencyOutput(b []byte) {
+ _ = l.tryWrite(b)
+ if !l.good {
+ _ = l.ensureConn(b)
+ _ = l.tryWrite(b)
+ }
+}
+
+func (l *fluentSink) close() {
+ l.good = false
+ if l.conn != nil {
+ if err := l.conn.Close(); err != nil {
+ fmt.Fprintf(OrigStderr, "error closing network logger: %v\n", err)
+ }
+ l.conn = nil
+ }
+}
+
+func (l *fluentSink) ensureConn(b []byte) error {
+ if l.good {
+ return nil
+ }
+ l.close()
+ var err error
+ l.conn, err = net.DialTimeout(l.network, l.addr, fluentDialTimeout)
+ if err != nil {
+ fmt.Fprintf(OrigStderr, "%s: error dialing network logger: %v\n%s", l, err, b)
+ return err
+ }
+ fmt.Fprintf(OrigStderr, "%s: connection to network logger resumed\n", l)
+ l.good = true
+ return nil
+}
+
+var errNoConn = errors.New("no connection opened")
+
+func (l *fluentSink) tryWrite(b []byte) error {
+ if !l.good {
+ return errNoConn
+ }
+ if err := l.conn.SetWriteDeadline(timeutil.Now().Add(fluentWriteTimeout)); err != nil {
+ // An error here is suggestive of a bug in the Go runtime.
+ fmt.Fprintf(OrigStderr, "%s: set write deadline error: %v\n%s",
+ l, err, b)
+ l.good = false
+ return err
+ }
+ n, err := l.conn.Write(b)
+ if err != nil || n < len(b) {
+ fmt.Fprintf(OrigStderr, "%s: logging error: %v or short write (%d/%d)\n%s",
+ l, err, n, len(b), b)
+ l.good = false
+ }
+ return err
+}
diff --git a/pkg/util/log/fluent_client_test.go b/pkg/util/log/fluent_client_test.go
new file mode 100644
index 000000000000..7f66f2e624ea
--- /dev/null
+++ b/pkg/util/log/fluent_client_test.go
@@ -0,0 +1,163 @@
+// Copyright 2020 The Cockroach Authors.
+//
+// Use of this software is governed by the Business Source License
+// included in the file licenses/BSL.txt.
+//
+// As of the Change Date specified in that file, in accordance with
+// the Business Source License, use of this software will be governed
+// by the Apache License, Version 2.0, included in the file
+// licenses/APL.txt.
+
+package log
+
+import (
+ "bufio"
+ "context"
+ "encoding/json"
+ "net"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/cockroachdb/cockroach/pkg/util/leaktest"
+ "github.com/cockroachdb/cockroach/pkg/util/log/channel"
+ "github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
+ "github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/stretchr/testify/require"
+)
+
+func TestFluentClient(t *testing.T) {
+ defer leaktest.AfterTest(t)
+ sc := Scope(t)
+ defer sc.Close(t)
+
+ serverAddr, cleanup, fluentData := servePseudoFluent(t)
+ defer cleanup()
+
+ t.Logf("addr: %v", serverAddr)
+
+ // Set up a logging configuration with the server we've just set up
+ // as target for the OPS channel.
+ cfg := logconfig.DefaultConfig()
+ cfg.Sinks.FluentServers = map[string]*logconfig.FluentSinkConfig{
+ "ops": {
+ Address: serverAddr,
+ Channels: logconfig.ChannelList{Channels: []Channel{channel.OPS}}},
+ }
+ // Derive a full config using the same directory as the
+ // TestLogScope.
+ require.NoError(t, cfg.Validate(&sc.logDir))
+
+ // Apply the configuration.
+ TestingResetActive()
+ cleanup, err := ApplyConfig(cfg)
+ require.NoError(t, err)
+ defer cleanup()
+
+ // Send a log event on the OPS channel.
+ Ops.Infof(context.Background(), "hello world")
+
+ // Check that the event was indeed sent via the Fluent sink.
+ var ev []byte
+ select {
+ case <-time.After(time.Second):
+ t.Fatal("timeout")
+ case ev = <-fluentData:
+ }
+
+ var info map[string]interface{}
+ if err := json.Unmarshal(ev, &info); err != nil {
+ t.Fatalf("unable to decode json: %q: %v", ev, err)
+ }
+ // Erase non-deterministic fields.
+ info["t"] = "XXX"
+ info["g"] = 222
+ msg, err := json.Marshal(info)
+ require.NoError(t, err)
+
+ const expected = `{"c":1,"f":"util/log/fluent_client_test.go","g":222,"l":58,"message":"hello world","n":1,"r":1,"s":1,"sev":"I","t":"XXX","tag":"log_test.ops"}`
+ require.Equal(t, expected, string(msg))
+}
+
+// servePseudoFluent creates an in-memory TCP listener which accepts
+// newline-terminated strings of data and reports them over the
+// returned channel.
+func servePseudoFluent(t *testing.T) (serverAddr string, cleanup func(), fluentData chan []byte) {
+ l, err := net.ListenTCP("tcp", nil)
+ require.NoError(t, err)
+
+ fluentData = make(chan []byte, 1)
+
+ serverCtx, serverCancel := context.WithCancel(context.Background())
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ // When canceled at the end of the test, we need to exit the loop.
+ select {
+ case <-serverCtx.Done():
+ return
+ default:
+ }
+
+ // Take one client connection.
+ conn, err := l.Accept()
+ if err != nil {
+ t.Logf("accept error: %v", err)
+ break
+ }
+ t.Logf("got client: %v", conn.RemoteAddr())
+
+ // Serve the connection.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer func() {
+ if err := conn.Close(); err != nil {
+ t.Logf("close error: %v", err)
+ }
+ }()
+
+ buf := bufio.NewReader(conn)
+ for {
+ // When the test finishes, the other side has a connection
+ // open. Use the context cancellation and a read timeout
+ // (below) to detect this.
+ select {
+ case <-serverCtx.Done():
+ return
+ default:
+ }
+
+ t.Logf("client %v: waiting for data", conn.RemoteAddr())
+ if err := conn.SetReadDeadline(timeutil.Now().Add(100 * time.Millisecond)); err != nil {
+ t.Logf("set read deadline: %v", err)
+ return
+ }
+
+ // Read one line of data and report it over the channel.
+ str, err := buf.ReadBytes('\n')
+ if err != nil {
+ t.Logf("read error: %v", err)
+ }
+ if len(str) > 0 {
+ t.Logf("received: %q", string(str))
+ fluentData <- str
+ }
+ }
+ }()
+ }
+ }()
+ cleanup = func() {
+ // Cancel the context. This prevents further progress in the
+ // acceptor loop before.
+ serverCancel()
+ // Close the listen socket. This breaks any currently running call
+ // to Accept().
+ require.NoError(t, l.Close())
+ // Wait until all servers have stopped running.
+ wg.Wait()
+ }
+ return l.Addr().String(), cleanup, fluentData
+}
diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go
index b17abde80267..2305f5545a1e 100644
--- a/pkg/util/log/logconfig/config.go
+++ b/pkg/util/log/logconfig/config.go
@@ -30,6 +30,10 @@ const DefaultFileFormat = `crdb-v2`
// when not specified in a configuration.
const DefaultStderrFormat = `crdb-v2-tty`
+// DefaultFluentFormat is the entry format for fluent sinks
+// when not specified in a configuration.
+const DefaultFluentFormat = `json-fluent-compact`
+
// DefaultConfig returns a suitable default configuration when logging
// is meant to primarily go to files.
func DefaultConfig() (c Config) {
@@ -44,6 +48,11 @@ file-defaults:
max-group-size: 100mib
exit-on-error: true
buffered-writes: true
+fluent-defaults:
+ filter: INFO
+ format: ` + DefaultFluentFormat + `
+ redactable: true
+ exit-on-error: false
sinks:
stderr:
filter: NONE
@@ -87,6 +96,11 @@ type Config struct {
// configuration value.
FileDefaults FileDefaults `yaml:"file-defaults,omitempty"`
+ // FluentDefaults represents the default configuration for fluent sinks,
+ // inherited when a specific fluent sink config does not provide a
+ // configuration value.
+ FluentDefaults FluentDefaults `yaml:"fluent-defaults,omitempty"`
+
// Sinks represents the sink configurations.
Sinks SinkConfig `yaml:",omitempty"`
@@ -147,12 +161,15 @@ type CommonSinkConfig struct {
type SinkConfig struct {
// FileGroups represents the list of configured file sinks.
FileGroups map[string]*FileSinkConfig `yaml:"file-groups,omitempty"`
+ // FluentServer represents the list of configured fluent sinks.
+ FluentServers map[string]*FluentSinkConfig `yaml:"fluent-servers,omitempty"`
// Stderr represents the configuration for the stderr sink.
Stderr StderrSinkConfig `yaml:",omitempty"`
- // sortedFileGroupNames is used internally to
+ // sortedFileGroupNames and sortedServerNames are used internally to
// make the Export() function deterministic.
sortedFileGroupNames []string
+ sortedServerNames []string
}
// StderrSinkConfig represents the configuration for the stderr sink.
@@ -209,6 +226,94 @@ type StderrSinkConfig struct {
CommonSinkConfig `yaml:",inline"`
}
+// FluentDefaults represent configuration defaults for fluent sinks.
+type FluentDefaults struct {
+ CommonSinkConfig `yaml:",inline"`
+}
+
+// FluentSinkConfig represents the configuration for one fluentd sink.
+//
+// User-facing documentation follows.
+// TITLE: output to Fluentd-compatible log collectors
+//
+// This sink type causes logging data to be sent over the network, to
+// a log collector that can ingest log data in a
+// [Fluentd](https://www.fluentd.org)-compatible protocol.
+//
+// Note that TLS is not supported yet: the connection to the log
+// collector is neither authenticated nor encrypted. Given that
+// logging events may contain sensitive information, care should be
+// taken to keep the log collector and the CockroachDB node close
+// together on a private network, or connect them using a secure
+// VPN. TLS support may be added at a later date.
+//
+// At the time of this writing, a Fluent sink buffers at most one log
+// entry and retries sending the event at most one time if a network
+// error is encountered. This is just sufficient to tolerate a restart
+// of the Fluentd collector after a configuration change under light
+// logging activity. If the server is unavailable for too long, or if
+// more than one error is encountered, an error is reported to the
+// process' standard error output with a copy of the logging event and
+// the logging event is dropped.
+//
+// The configuration key under the `sinks` key in the YAML
+// configuration is `fluent-servers`. Example configuration:
+//
+// sinks:
+// fluent-servers: # fluent configurations start here
+// health: # defines one sink called "health"
+// channels: HEALTH
+// address: 127.0.0.1:5170
+//
+// A cascading defaults mechanism is available for configurations:
+// every new server sink configured automatically inherits the
+// configurations set in the `fluent-defaults` section.
+//
+// For example:
+//
+// fluent-defaults:
+// redactable: false # default: disable redaction markers
+// sinks:
+// fluent-servers:
+// health:
+// channels: HEALTH
+// # This sink has redactable set to false,
+// # as the setting is inherited from fluent-defaults
+// # unless overridden here.
+//
+// The default output format for Fluent sinks is
+// `json-fluent-compact`. The `fluent` variants of the JSON formats
+// include a `tag` field as required by the Fluentd protocol, which
+// the non-`fluent` JSON format variants do not include.
+//
+// Users are invited to peruse the `check-log-config` tool to
+// verify the effect of defaults inheritance.
+//
+type FluentSinkConfig struct {
+ // Channels is the list of logging channels that use this sink.
+ Channels ChannelList `yaml:",omitempty,flow"`
+
+ // Net is the protocol for the fluent server. Can be "tcp", "udp",
+ // "tcp4", etc.
+ Net string `yaml:",omitempty"`
+
+ // Address is the network address of the fluent server. The
+ // host/address and port parts are separated with a colon. IPv6
+ // numeric addresses should be included within square brackets,
+ // e.g.: [::1]:1234.
+ Address string `yaml:""`
+
+ // CommonSinkConfig is the configuration common to all sinks. Note
+ // that although the idiom in Go is to place embedded fields at the
+ // beginning of a struct, we purposefully deviate from the idiom
+ // here to ensure that "general" options appear after the
+ // sink-specific options in YAML config dumps.
+ CommonSinkConfig `yaml:",inline"`
+
+ // serverName is populated/used during validation.
+ serverName string
+}
+
// FileDefaults represent configuration defaults for file sinks.
type FileDefaults struct {
// Dir stores the default output directory for file sinks.
diff --git a/pkg/util/log/logconfig/export.go b/pkg/util/log/logconfig/export.go
index a2c0f4b30946..3ec48c720c1a 100644
--- a/pkg/util/log/logconfig/export.go
+++ b/pkg/util/log/logconfig/export.go
@@ -171,6 +171,33 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
links = append(links, "stray --> stderr")
}
+ // Collect the network servers.
+ //
+ // servers maps each server to its box declaration.
+ servers := map[string]string{}
+ for _, fn := range c.Sinks.sortedServerNames {
+ fc := c.Sinks.FluentServers[fn]
+ if fc.Filter == logpb.Severity_NONE {
+ continue
+ }
+ skey := fmt.Sprintf("s__%s", fc.serverName)
+ target, thisprocs, thislinks := process(skey, fc.CommonSinkConfig)
+ hasLink := false
+ for _, ch := range fc.Channels.Channels {
+ if !chanSel.HasChannel(ch) {
+ continue
+ }
+ hasLink = true
+ links = append(links, fmt.Sprintf("%s --> %s", ch, target))
+ }
+ if hasLink {
+ processing = append(processing, thisprocs...)
+ links = append(links, thislinks...)
+ servers[fc.serverName] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"",
+ skey, fc.Net, fc.Address)
+ }
+ }
+
// Export the stderr redirects.
if c.Sinks.Stderr.Filter != logpb.Severity_NONE {
target, thisprocs, thislinks := process("stderr", c.Sinks.Stderr.CommonSinkConfig)
@@ -210,6 +237,15 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) {
buf.WriteString("}\n")
}
+ // Represent the network servers, if any.
+ if len(c.Sinks.sortedServerNames) > 0 {
+ buf.WriteString("cloud network {\n")
+ for _, s := range c.Sinks.sortedServerNames {
+ fmt.Fprintf(&buf, " %s\n", servers[s])
+ }
+ buf.WriteString("}\n")
+ }
+
// Export the relationships.
for _, l := range links {
fmt.Fprintf(&buf, "%s\n", l)
diff --git a/pkg/util/log/logconfig/testdata/export b/pkg/util/log/logconfig/testdata/export
index 65ea232a02fe..270192671ac6 100644
--- a/pkg/util/log/logconfig/testdata/export
+++ b/pkg/util/log/logconfig/testdata/export
@@ -55,6 +55,11 @@ sinks:
channels: ALL
stderr:
filter: WARNING
+ fluent-servers:
+ local:
+ channels: SESSIONS
+ redactable: false
+ address: localhost:5170
----
@startuml
left to right direction
@@ -66,22 +71,30 @@ cloud stray as "stray\nerrors"
queue stderr
card p__1 as "redact"
card p__2 as "format:crdb-v2"
-card p__3 as "format:crdb-v2-tty"
-card p__4 as "filter:W"
+card p__3 as "strip"
+card p__4 as "format:json-fluent-compact"
+card p__5 as "format:crdb-v2-tty"
+card p__6 as "filter:W"
artifact files {
folder "/default-dir" {
file f1 as "cockroach-everything.log"
file stderrfile as "cockroach-stderr.log"
}
}
+cloud network {
+ queue s__local as "fluent: tcp:localhost:5170"
+}
DEV --> p__2
SESSIONS --> p__2
p__1 --> f1
p__2 --> p__1
stray --> stderrfile
-DEV --> p__4
SESSIONS --> p__4
-p__3 --> stderr
+p__3 ..> s__local
p__4 --> p__3
+DEV --> p__6
+SESSIONS --> p__6
+p__5 --> stderr
+p__6 --> p__5
@enduml
-# http://www.plantuml.com/plantuml/uml/R96_JWCn3CPtFuML2OO3zS_K0NLe1nOOJYeBKXKIfur8pyNnLQfGtnrTxig1O8hjxnV7_hBCQaNRGu94GL4P9UxsYZuAEOtSWEF3aHjg53EtuYZZ9zpSucgzwOvwNTUFpqyrkCIjnwnYpsWpcXAzDYJ2aWrSuAEbbZ2h9n5mLZmUjzj9OOMyTMg-gzDI3Im7gmid_gqwJKTrzezQgPv7OjuJCId9uiM05Ot1EiKGKvaV0oTFWkRUKx1jqif7CPrG40pzN8xTkx1r-ufE96VTnsPtbtXdhbo_JGb_yxqmi1Uum6gzmQfwADl1rRInKgpeiZ21Oi2WJA1tj4l7ntvsc__fDeTYqdW5YYM3EeCbDRxxyAy0003__m00
+# http://www.plantuml.com/plantuml/uml/R58zRzim4DtlLnmqjOFSEebTG4EG8Hwwj8E1T2bWiEJHPaFprECfXL7ulnUav2X1Cebyx_4-tjDTLYCw72D4yehAA65_K7H1o6hW19QFFITAYfa7iPJn7tpuYFURX_BPRhRRhz-_RS567Xnc5ND2ax6ffv-9H5Xo0sVuCz10cDMH25WZ3llTRbcrGivORPxHgufwbgFHpehxrJvTpUprfNxeP_3cvPFVcLFhuq19spB-g-AhTugtggTPiHuL8If9zwC18ngyiOe-nBe-UewE19jFZhmPehOkI5E8Aa4_hcNPFWeRUsZfYUIaXv3sYyZxvg8RpQZ7r_gHcBHdE4_k9jA_B8-bqUJcRXVPcZWEN1VkK6tVLVJ0MRlLyildOl_zvW7RzhRQ2vVKPgHcKMv-2JM1YLd266cvpkE-AN03DPZ5ulPv9gYfJFpro_xhDy_NK8EPcq2DOM9NS4V9bN_qVm000F__
diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate
index 61acd0cccb21..5bba86a15907 100644
--- a/pkg/util/log/logconfig/testdata/validate
+++ b/pkg/util/log/logconfig/testdata/validate
@@ -12,6 +12,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
default:
@@ -55,6 +62,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
custom:
@@ -100,6 +114,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
custom:
@@ -146,6 +167,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
custom:
@@ -171,6 +199,66 @@ capture-stray-errors:
dir: /default-dir
max-group-size: 100MiB
+# Check that fluent default network is filled.
+yaml
+sinks:
+ fluent-servers:
+ custom:
+ address: "127.0.0.1:5170"
+ channels: DEV
+----
+file-defaults:
+ dir: /default-dir
+ max-file-size: 10MiB
+ max-group-size: 100MiB
+ buffered-writes: true
+ filter: INFO
+ format: crdb-v2
+ redact: false
+ redactable: true
+ exit-on-error: true
+ auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
+sinks:
+ file-groups:
+ default:
+ channels: all
+ dir: /default-dir
+ max-file-size: 10MiB
+ max-group-size: 100MiB
+ buffered-writes: true
+ filter: INFO
+ format: crdb-v2
+ redact: false
+ redactable: true
+ exit-on-error: true
+ fluent-servers:
+ custom:
+ channels: [DEV]
+ net: tcp
+ address: 127.0.0.1:5170
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ stderr:
+ channels: all
+ filter: NONE
+ format: crdb-v2-tty
+ redact: false
+ redactable: true
+ exit-on-error: true
+capture-stray-errors:
+ enable: true
+ dir: /default-dir
+ max-group-size: 100MiB
# Check that it's possible to capture all channels.
yaml
@@ -190,6 +278,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
custom:
@@ -234,6 +329,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
custom:
@@ -259,6 +361,67 @@ capture-stray-errors:
dir: /default-dir
max-group-size: 100MiB
+# Check that "auditable" is transformed into other fluent flags.
+yaml
+sinks:
+ fluent-servers:
+ custom:
+ channels: DEV
+ address: localhost:5170
+ auditable: true
+----
+file-defaults:
+ dir: /default-dir
+ max-file-size: 10MiB
+ max-group-size: 100MiB
+ buffered-writes: true
+ filter: INFO
+ format: crdb-v2
+ redact: false
+ redactable: true
+ exit-on-error: true
+ auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
+sinks:
+ file-groups:
+ default:
+ channels: all
+ dir: /default-dir
+ max-file-size: 10MiB
+ max-group-size: 100MiB
+ buffered-writes: true
+ filter: INFO
+ format: crdb-v2
+ redact: false
+ redactable: true
+ exit-on-error: true
+ fluent-servers:
+ custom:
+ channels: [DEV]
+ net: tcp
+ address: localhost:5170
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: true
+ stderr:
+ channels: all
+ filter: NONE
+ format: crdb-v2-tty
+ redact: false
+ redactable: true
+ exit-on-error: true
+capture-stray-errors:
+ enable: true
+ dir: /default-dir
+ max-group-size: 100MiB
# Check that "auditable" is transformed into other stderr
yaml
@@ -279,6 +442,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
file-groups:
default:
@@ -319,6 +489,13 @@ file-defaults:
redactable: true
exit-on-error: true
auditable: false
+fluent-defaults:
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ auditable: false
sinks:
stderr:
channels: all
@@ -332,6 +509,25 @@ capture-stray-errors:
dir: /default-dir
max-group-size: 100MiB
+# Check that missing addr is reported.
+yaml
+sinks:
+ fluent-servers:
+ custom:
+----
+ERROR: fluent server "custom": address cannot be empty
+fluent server "custom": no channel selected
+
+# Check that invalid proto is rejected.
+yaml
+sinks:
+ fluent-servers:
+ custom:
+ address: 'abc'
+ net: 'unknown'
+----
+ERROR: fluent server "custom": unknown protocol: "unknown"
+fluent server "custom": no channel selected
# Check that empty dir is rejected.
yaml
diff --git a/pkg/util/log/logconfig/testdata/yaml b/pkg/util/log/logconfig/testdata/yaml
index 05ef9e0030c3..53050ccdd485 100644
--- a/pkg/util/log/logconfig/testdata/yaml
+++ b/pkg/util/log/logconfig/testdata/yaml
@@ -142,6 +142,13 @@ sinks:
filter: INFO
redact: true
+ fluent-servers:
+ default:
+ address: 127.0.0.1:5170
+ other:
+ net: udp
+ address: 127.0.0.1:5111
+
stderr:
filter: WARNING
redact: false
@@ -166,6 +173,12 @@ sinks:
perf:
filter: INFO
redact: true
+ fluent-servers:
+ default:
+ address: 127.0.0.1:5170
+ other:
+ net: udp
+ address: 127.0.0.1:5111
stderr:
filter: WARNING
redact: false
diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go
index 590bdc47f9e4..3acdc4866b40 100644
--- a/pkg/util/log/logconfig/validate.go
+++ b/pkg/util/log/logconfig/validate.go
@@ -46,10 +46,16 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
if c.FileDefaults.Filter == logpb.Severity_UNKNOWN {
c.FileDefaults.Filter = logpb.Severity_INFO
}
+ if c.FluentDefaults.Filter == logpb.Severity_UNKNOWN {
+ c.FluentDefaults.Filter = logpb.Severity_INFO
+ }
// Sinks are not auditable by default.
if c.FileDefaults.Auditable == nil {
c.FileDefaults.Auditable = &bf
}
+ if c.FluentDefaults.Auditable == nil {
+ c.FluentDefaults.Auditable = &bf
+ }
// File sinks are buffered by default.
if c.FileDefaults.BufferedWrites == nil {
c.FileDefaults.BufferedWrites = &bt
@@ -59,18 +65,31 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
s := DefaultFileFormat
c.FileDefaults.Format = &s
}
+ if c.FluentDefaults.Format == nil {
+ s := DefaultFluentFormat
+ c.FluentDefaults.Format = &s
+ }
// No redaction markers -> default keep them.
if c.FileDefaults.Redactable == nil {
c.FileDefaults.Redactable = &bt
}
+ if c.FluentDefaults.Redactable == nil {
+ c.FluentDefaults.Redactable = &bt
+ }
// No redaction specification -> default false.
if c.FileDefaults.Redact == nil {
c.FileDefaults.Redact = &bf
}
- // No criticality -> default true for files.
+ if c.FluentDefaults.Redact == nil {
+ c.FluentDefaults.Redact = &bf
+ }
+ // No criticality -> default true for files, false for fluent.
if c.FileDefaults.Criticality == nil {
c.FileDefaults.Criticality = &bt
}
+ if c.FluentDefaults.Criticality == nil {
+ c.FluentDefaults.Criticality = &bf
+ }
// Validate and fill in defaults for file sinks.
for prefix, fc := range c.Sinks.FileGroups {
@@ -84,6 +103,18 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
}
}
+ // Validate and defaults for fluent.
+ for serverName, fc := range c.Sinks.FluentServers {
+ if fc == nil {
+ fc = &FluentSinkConfig{}
+ c.Sinks.FluentServers[serverName] = fc
+ }
+ fc.serverName = serverName
+ if err := c.validateFluentSinkConfig(fc); err != nil {
+ fmt.Fprintf(&errBuf, "fluent server %q: %v\n", serverName, err)
+ }
+ }
+
// Defaults for stderr.
c.inheritCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, &c.FileDefaults.CommonSinkConfig)
if c.Sinks.Stderr.Filter == logpb.Severity_UNKNOWN {
@@ -103,6 +134,8 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
// fileSinks maps channels to files.
fileSinks := make(map[logpb.Channel]*FileSinkConfig)
+ // fluentSinks maps channels to fluent servers.
+ fluentSinks := make(map[logpb.Channel]*FluentSinkConfig)
// Check that no channel is listed by more than one file sink,
// and every file has at least one channel.
@@ -125,6 +158,23 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
}
}
+ // Check that no channel is listed by more than one fluent sink, and
+ // every sink has at least one channel.
+ for _, fc := range c.Sinks.FluentServers {
+ if len(fc.Channels.Channels) == 0 {
+ fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", fc.serverName)
+ }
+ fc.Channels.Sort()
+ for _, ch := range fc.Channels.Channels {
+ if prev := fluentSinks[ch]; prev != nil {
+ fmt.Fprintf(&errBuf, "fluent server %q: channel %s already captured by server %q\n",
+ fc.serverName, ch, prev.serverName)
+ } else {
+ fluentSinks[ch] = fc
+ }
+ }
+ }
+
// If capture-stray-errors was enabled, then perform some additional
// validation on it.
if c.CaptureFd2.Enable {
@@ -194,10 +244,25 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) {
}
}
+ // serverNames collects the names of the servers. We need this to
+ // store this sorted in c.Sinks.sortedServerNames later.
+ serverNames := make([]string, 0, len(c.Sinks.FluentServers))
+ // Elide all the file sinks without a directory or with severity set
+ // to NONE. Also collect the remaining names for sorting below.
+ for serverName, fc := range c.Sinks.FluentServers {
+ if fc.Filter == logpb.Severity_NONE {
+ delete(c.Sinks.FluentServers, serverName)
+ } else {
+ serverNames = append(serverNames, serverName)
+ }
+ }
+
// Remember the sorted names, so we get deterministic output in
// export.
sort.Strings(fileGroupNames)
c.Sinks.sortedFileGroupNames = fileGroupNames
+ sort.Strings(serverNames)
+ c.Sinks.sortedServerNames = serverNames
return nil
}
@@ -270,6 +335,34 @@ func (c *Config) validateFileSinkConfig(fc *FileSinkConfig, defaultLogDir *strin
return nil
}
+func (c *Config) validateFluentSinkConfig(fc *FluentSinkConfig) error {
+ c.inheritCommonDefaults(&fc.CommonSinkConfig, &c.FluentDefaults.CommonSinkConfig)
+
+ fc.Net = strings.ToLower(strings.TrimSpace(fc.Net))
+ switch fc.Net {
+ case "tcp", "tcp4", "tcp6":
+ case "udp", "udp4", "udp6":
+ case "unix":
+ case "":
+ fc.Net = "tcp"
+ default:
+ return errors.Newf("unknown protocol: %q", fc.Net)
+ }
+ fc.Address = strings.TrimSpace(fc.Address)
+ if fc.Address == "" {
+ return errors.New("address cannot be empty")
+ }
+
+ // Apply the auditable flag if set.
+ if *fc.Auditable {
+ bt := true
+ fc.Criticality = &bt
+ }
+ fc.Auditable = nil
+
+ return nil
+}
+
func normalizeDir(dir **string) error {
if *dir == nil {
return nil
diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go
index 29143e3522af..7a2e5bdd486c 100644
--- a/pkg/util/log/sinks.go
+++ b/pkg/util/log/sinks.go
@@ -49,3 +49,4 @@ type logSink interface {
var _ logSink = (*stderrSink)(nil)
var _ logSink = (*fileSink)(nil)
+var _ logSink = (*fluentSink)(nil)
diff --git a/pkg/util/log/testdata/config b/pkg/util/log/testdata/config
index 692206afcd19..3cddc8e669ac 100644
--- a/pkg/util/log/testdata/config
+++ b/pkg/util/log/testdata/config
@@ -27,6 +27,46 @@ capture-stray-errors:
max-group-size: 100MiB
+# Test the default config with a fluent server.
+yaml
+sinks:
+ fluent-servers: {local: {channels: SESSIONS, address: localhost:5170}}
+----
+sinks:
+ file-groups:
+ default:
+ channels: all
+ dir: TMPDIR
+ max-file-size: 10MiB
+ max-group-size: 100MiB
+ buffered-writes: true
+ filter: INFO
+ format: crdb-v2
+ redact: false
+ redactable: true
+ exit-on-error: true
+ fluent-servers:
+ s1:
+ channels: [SESSIONS]
+ net: tcp
+ address: localhost:5170
+ filter: INFO
+ format: json-fluent-compact
+ redact: false
+ redactable: true
+ exit-on-error: false
+ stderr:
+ channels: all
+ filter: NONE
+ format: crdb-v2-tty
+ redact: false
+ redactable: true
+ exit-on-error: true
+capture-stray-errors:
+ enable: true
+ dir: TMPDIR
+ max-group-size: 100MiB
+
# Test the default config with a catch-all auditable file.
yaml