From 8be8904d9a115c12e6b6668d0b3a46f047bd766e Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 26 Nov 2020 12:19:32 +0100 Subject: [PATCH] util/log: new experimental integration with Fluentd Release note (cli change): It is now possible to redirect logging to [Fluentd](https://www.fluentd.org)-compatible network collectors. See the documentation for details. This is an alpha-quality feature. --- pkg/cli/exit/codes.go | 4 + pkg/util/log/BUILD.bazel | 1 + pkg/util/log/flags.go | 57 +++++++ pkg/util/log/fluent_client.go | 122 ++++++++++++++ pkg/util/log/logconfig/config.go | 44 ++++- pkg/util/log/logconfig/export.go | 36 +++++ pkg/util/log/logconfig/testdata/export | 23 ++- pkg/util/log/logconfig/testdata/validate | 194 +++++++++++++++++++++++ pkg/util/log/logconfig/testdata/yaml | 13 ++ pkg/util/log/logconfig/validate.go | 95 ++++++++++- pkg/util/log/sinks.go | 1 + pkg/util/log/testdata/config | 40 +++++ 12 files changed, 623 insertions(+), 7 deletions(-) create mode 100644 pkg/util/log/fluent_client.go diff --git a/pkg/cli/exit/codes.go b/pkg/cli/exit/codes.go index 95dcd6d6c2fd..ac41c99f8626 100644 --- a/pkg/cli/exit/codes.go +++ b/pkg/cli/exit/codes.go @@ -54,6 +54,10 @@ func FatalError() Code { return Code{7} } // in the logging system. func TimeoutAfterFatalError() Code { return Code{8} } +// LoggingNetCollectorUnavailable (9) indicates that an error occurred +// during a logging operation to a network collector. +func LoggingNetCollectorUnavailable() Code { return Code{9} } + // Codes that are specific to client commands follow. It's possible // for codes to be reused across separate client or server commands. // Command-specific exit codes should be allocated down from 125. diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 015427371e49..1cb42846cefe 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "file_log_gc.go", "file_sync_buffer.go", "flags.go", + "fluent_client.go", "format_crdb_v1.go", "format_json.go", "formats.go", diff --git a/pkg/util/log/flags.go b/pkg/util/log/flags.go index 24b80f906114..48de755f8453 100644 --- a/pkg/util/log/flags.go +++ b/pkg/util/log/flags.go @@ -64,6 +64,7 @@ func init() { // we cannot keep redaction markers there. *defaultConfig.Sinks.Stderr.Redactable = false // Remove all sinks other than stderr. + defaultConfig.Sinks.FluentServers = nil defaultConfig.Sinks.FileGroups = nil if _, err := ApplyConfig(defaultConfig); err != nil { @@ -279,6 +280,26 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) { } } + // Create the fluent sinks. + for _, fc := range config.Sinks.FluentServers { + if fc.Filter == severity.NONE { + continue + } + fluentSinkInfo, err := newFluentSinkInfo(*fc) + if err != nil { + cleanupFn() + return nil, err + } + sinkInfos = append(sinkInfos, fluentSinkInfo) + allSinkInfos.put(fluentSinkInfo) + + // Connect the channels for this sink. + for _, ch := range fc.Channels.Channels { + l := chans[ch] + l.sinkInfos = append(l.sinkInfos, fluentSinkInfo) + } + } + logging.setChannelLoggers(chans, &stderrSinkInfo) setActive() @@ -303,6 +324,18 @@ func newFileSinkInfo(fileNamePrefix string, c logconfig.FileConfig) (*sinkInfo, return info, fileSink, nil } +// newFluentSinkInfo creates a new fluentSink and its accompanying sinkInfo +// from the provided configuration. +func newFluentSinkInfo(c logconfig.FluentConfig) (*sinkInfo, error) { + info := &sinkInfo{} + if err := info.applyConfig(c.CommonSinkConfig); err != nil { + return nil, err + } + fluentSink := newFluentSink(c.Net, c.Address) + info.sink = fluentSink + return info, nil +} + // applyConfig applies a common sink configuration to a sinkInfo. func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error { l.threshold = c.Filter @@ -426,6 +459,30 @@ func DescribeAppliedConfig() string { return nil }) + // Describe the fluent sinks. + config.Sinks.FluentServers = make(map[string]*logconfig.FluentConfig) + sIdx := 1 + _ = allSinkInfos.iter(func(l *sinkInfo) error { + fluentSink, ok := l.sink.(*fluentSink) + if !ok { + return nil + } + + fc := &logconfig.FluentConfig{} + fc.CommonSinkConfig = l.describeAppliedConfig() + fc.Net = fluentSink.network + fc.Address = fluentSink.addr + + // Describe the connections to this fluent sink. + for ch, logger := range chans { + describeConnections(logger, ch, l, &fc.Channels) + } + skey := fmt.Sprintf("s%d", sIdx) + sIdx++ + config.Sinks.FluentServers[skey] = fc + return nil + }) + // Note: we cannot return 'config' directly, because this captures // certain variables from the loggers by reference and thus could be // invalidated by concurrent uses of ApplyConfig(). diff --git a/pkg/util/log/fluent_client.go b/pkg/util/log/fluent_client.go new file mode 100644 index 000000000000..5bcfd3cd4b89 --- /dev/null +++ b/pkg/util/log/fluent_client.go @@ -0,0 +1,122 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package log + +import ( + "fmt" + "net" + + "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/errors" +) + +// fluentSink represents a Fluentd-compatible network collector. +type fluentSink struct { + // The network address of the fluentd collector. + network string + addr string + + // good indicates that the connection can be used. + good bool + conn net.Conn +} + +func newFluentSink(network, addr string) *fluentSink { + f := &fluentSink{ + addr: addr, + network: network, + } + return f +} + +func (l *fluentSink) String() string { + return fmt.Sprintf("fluent:%s://%s", l.network, l.addr) +} + +// activeAtSeverity implements the logSink interface. +func (l *fluentSink) active() bool { return true } + +// attachHints implements the logSink interface. +func (l *fluentSink) attachHints(stacks []byte) []byte { + return stacks +} + +// exitCode implements the logSink interface. +func (l *fluentSink) exitCode() exit.Code { + return exit.LoggingNetCollectorUnavailable() +} + +// output implements the logSink interface. +func (l *fluentSink) output(extraSync bool, b []byte) error { + // Try to write and reconnect immediately if the first write fails. + // + // TODO(knz): Add some net socket write deadlines here. + _ = l.tryWrite(b) + if l.good { + return nil + } + + if err := l.ensureConn(b); err != nil { + return err + } + return l.tryWrite(b) +} + +// emergencyOutput implements the logSink interface. +func (l *fluentSink) emergencyOutput(b []byte) { + // TODO(knz): Add some net socket write deadlines here. + _ = l.tryWrite(b) + if !l.good { + _ = l.ensureConn(b) + _ = l.tryWrite(b) + } +} + +func (l *fluentSink) close() { + l.good = false + if l.conn != nil { + if err := l.conn.Close(); err != nil { + fmt.Fprintf(OrigStderr, "error closing network logger: %v\n", err) + } + l.conn = nil + } +} + +func (l *fluentSink) ensureConn(b []byte) error { + if l.good { + return nil + } + l.close() + var err error + l.conn, err = net.Dial(l.network, l.addr) + if err != nil { + fmt.Fprintf(OrigStderr, "%s: error dialing network logger: %v\n%s", l, err, b) + return err + } + fmt.Fprintf(OrigStderr, "%s: connection to network logger resumed\n", l) + l.good = true + return nil +} + +var errNoConn = errors.New("no connection opened") + +func (l *fluentSink) tryWrite(b []byte) error { + if !l.good { + return errNoConn + } + n, err := l.conn.Write(b) + if err != nil || n < len(b) { + fmt.Fprintf(OrigStderr, "%s: logging error: %v or short write (%d/%d)\n%s", + l, err, n, len(b), b) + l.good = false + } + return err +} diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index 060cb5cc3825..b328ee73e909 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -30,6 +30,10 @@ const DefaultFileFormat = `crdb-v1` // when not specified in a configuration. const DefaultStderrFormat = `crdb-v1-tty` +// DefaultFluentFormat is the entry format for fluent sinks +// when not specified in a configuration. +const DefaultFluentFormat = `json-fluent-compact` + // DefaultConfig returns a suitable default configuration when logging // is meant to primarily go to files. func DefaultConfig() (c Config) { @@ -43,6 +47,11 @@ file-defaults: max-file-size: 10mib max-group-size: 100mib exit-on-error: true +fluent-defaults: + filter: INFO + format: ` + DefaultFluentFormat + ` + redactable: true + exit-on-error: false sinks: stderr: filter: NONE @@ -86,6 +95,11 @@ type Config struct { // configuration value. FileDefaults FileDefaults `yaml:"file-defaults,omitempty"` + // FluentDefaults represents the default configuration for fluent sinks, + // inherited when a specific fluent sink config does not provide a + // configuration value. + FluentDefaults FluentDefaults `yaml:"fluent-defaults,omitempty"` + // Sinks represents the sink configurations. Sinks SinkConfig `yaml:",omitempty"` @@ -115,12 +129,15 @@ type CaptureFd2Config struct { type SinkConfig struct { // FileGroups represents the list of configured file sinks. FileGroups map[string]*FileConfig `yaml:"file-groups,omitempty"` + // FluentServer represents the list of configured fluent sinks. + FluentServers map[string]*FluentConfig `yaml:"fluent-servers,omitempty"` // Stderr represents the configuration for the stderr sink. Stderr StderrConfig `yaml:",omitempty"` - // sortedFileGroupNames is used internally to + // sortedFileGroupNames and sortedServerNames are used internally to // make the Export() function deterministic. sortedFileGroupNames []string + sortedServerNames []string } // StderrConfig represents the configuration for the stderr sink. @@ -171,6 +188,26 @@ type CommonSinkConfig struct { Auditable *bool `yaml:",omitempty"` } +// FluentConfig represents the configuration for one fluentd sink. +type FluentConfig struct { + // Channels is the list of logging channels that use this sink. + Channels ChannelList `yaml:",omitempty"` + + // Net is the protocol for the fluent server. Can be "tcp", "udp", + // "tcp4", etc. + Net string `yaml:",omitempty"` + // Address is the network address of the fluent server. The + // host/address and port parts are separated with a colon. IPv6 + // numeric addresses should be included within square brackets, + // e.g.: [::1]:1234. + Address string + + CommonSinkConfig `yaml:",inline"` + + // used during validation. + serverName string +} + // FileDefaults represent configuration defaults for file sinks. type FileDefaults struct { // Dir stores the default output directory for file sinks. @@ -200,6 +237,11 @@ type FileDefaults struct { CommonSinkConfig `yaml:",inline"` } +// FluentDefaults represent configuration defaults for fluent sinks. +type FluentDefaults struct { + CommonSinkConfig `yaml:",inline"` +} + // FileConfig represents the configuration for one file sink. type FileConfig struct { // Channels is the list of logging channels that use this sink. diff --git a/pkg/util/log/logconfig/export.go b/pkg/util/log/logconfig/export.go index ab82d1dca0f3..03eb5674436b 100644 --- a/pkg/util/log/logconfig/export.go +++ b/pkg/util/log/logconfig/export.go @@ -171,6 +171,33 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { links = append(links, "stray --> stderr") } + // Collect the network servers. + // + // servers maps each server to its box declaration. + servers := map[string]string{} + for _, fn := range c.Sinks.sortedServerNames { + fc := c.Sinks.FluentServers[fn] + if fc.Filter == logpb.Severity_NONE { + continue + } + skey := fmt.Sprintf("s__%s", fc.serverName) + target, thisprocs, thislinks := process(skey, fc.CommonSinkConfig) + hasLink := false + for _, ch := range fc.Channels.Channels { + if !chanSel.HasChannel(ch) { + continue + } + hasLink = true + links = append(links, fmt.Sprintf("%s --> %s", ch, target)) + } + if hasLink { + processing = append(processing, thisprocs...) + links = append(links, thislinks...) + servers[fc.serverName] = fmt.Sprintf("queue %s as \"fluent: %s:%s\"", + skey, fc.Net, fc.Address) + } + } + // Export the stderr redirects. if c.Sinks.Stderr.Filter != logpb.Severity_NONE { target, thisprocs, thislinks := process("stderr", c.Sinks.Stderr.CommonSinkConfig) @@ -210,6 +237,15 @@ func (c *Config) Export(onlyChans ChannelList) (string, string) { buf.WriteString("}\n") } + // Represent the network servers, if any. + if len(c.Sinks.sortedServerNames) > 0 { + buf.WriteString("cloud network {\n") + for _, s := range c.Sinks.sortedServerNames { + fmt.Fprintf(&buf, " %s\n", servers[s]) + } + buf.WriteString("}\n") + } + // Export the relationships. for _, l := range links { fmt.Fprintf(&buf, "%s\n", l) diff --git a/pkg/util/log/logconfig/testdata/export b/pkg/util/log/logconfig/testdata/export index ffdb22894a02..d8cdf27312b9 100644 --- a/pkg/util/log/logconfig/testdata/export +++ b/pkg/util/log/logconfig/testdata/export @@ -53,6 +53,11 @@ sinks: channels: ALL stderr: filter: WARNING + fluent-servers: + local: + channels: SESSIONS + redactable: false + address: localhost:5170 ---- @startuml left to right direction @@ -65,23 +70,31 @@ queue stderr card sync2 as "sync" card p__1 as "redact" card p__2 as "format:crdb-v1" -card p__3 as "format:crdb-v1-tty" -card p__4 as "filter:W" +card p__3 as "strip" +card p__4 as "format:json-fluent-compact" +card p__5 as "format:crdb-v1-tty" +card p__6 as "filter:W" artifact files { folder "/default-dir" { file f1 as "cockroach-everything.log" file stderrfile as "cockroach-stderr.log" } } +cloud network { + queue s__local as "fluent: tcp:localhost:5170" +} DEV --> p__2 SESSIONS --> p__2 p__1 --> sync2 p__2 --> p__1 sync2 --> f1 stray --> stderrfile -DEV --> p__4 SESSIONS --> p__4 -p__3 --> stderr +p__3 ..> s__local p__4 --> p__3 +DEV --> p__6 +SESSIONS --> p__6 +p__5 --> stderr +p__6 --> p__5 @enduml -# http://www.plantuml.com/plantuml/uml/R98nJ_Cm48Rt-nKdJzyt11JQgGFgq0uiC4Gg2r9bx7DhuThbVAaKeVvt5Bib89XYl-yJ9_V8oooQfJy42EG49I7xtLxGUYOZFaKmwN1CaQ9WJZqRolW1__xZQhqP7zswwnwU7Zim8VKMix0UK6TKPVKIYJbnLd26zvvwmYoMcC5ejfY7QEugF4IZQdZSRjkICLbjP4ehwH8Vj2mCszVcr4xjx8-s4HacObu97uHuyQn0itYdZQ3peGo5BWLBZEhMajDzaCPwLcDH47JrlqmoRvoqsJTq8Xvax-Fk9gITkd9rnBByoTVYmfxX3Alr1flclam7LvDJKbICko8AYeDBsKALDsvT2rLxGRy-_ltq-Q_Jvr2aJQz0KNHfPx2aQCTRyHa00F__ +# http://www.plantuml.com/plantuml/uml/R98nRzim48Nt-nK7JUqWjsfY5z0GP8Y7BkrW85qA62nvjDdGf7eyfH0A__U2f1mbS2UHxpsIT_VfFebY6KyUF5b1YSZkS1GqZacBYm5qF0qnK11CSMHD2V_2k_Vui7tCdzrsj_loxUiEj8-ZmIIi9bG9cxBw4OWvSchW3Bz76WcJ6686hTXWceB-LCDJq4qLX_s-AngJKLeMjIPjv9EINhFvsJvtYtjpUTGDYtZx-iYl54Dh_KX1sjpJcylN_xcy5PcMnAOcd1VY_di3YiLPfGMjysKcQACtnDXyC6JLwAKrZfjib0ZQsfQE-ecZqiUMdeadERfmMFbuQ2wvEg6oV9klnfmzmtaUUI3v4_afFpIFUB_tKIjV2ouDzoXww8jwZ4dwTVVvOsRoi7t4jhqhuuKBoaKfBFAka882OJOxgFJordPGkPVeI_bN5zv20RLQtRtK28NIxD-yhcTpTNm31TJo21Giixk6Umec_yZ_0W00__y0 diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 870a8d45683b..e53b81cccb9f 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -11,6 +11,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: default: @@ -53,6 +60,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: custom: @@ -97,6 +111,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: custom: @@ -142,6 +163,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: custom: @@ -167,6 +195,65 @@ capture-stray-errors: dir: /default-dir max-group-size: 100MiB +# Check that fluent default network is filled. +yaml +sinks: + fluent-servers: + custom: + address: "127.0.0.1:5170" + channels: DEV +---- +file-defaults: + dir: /default-dir + max-file-size: 10MiB + max-group-size: 100MiB + filter: INFO + format: crdb-v1 + redact: false + redactable: true + exit-on-error: true + auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false +sinks: + file-groups: + default: + channels: all + dir: /default-dir + max-file-size: 10MiB + max-group-size: 100MiB + sync-writes: false + filter: INFO + format: crdb-v1 + redact: false + redactable: true + exit-on-error: true + fluent-servers: + custom: + channels: DEV + net: tcp + address: 127.0.0.1:5170 + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + stderr: + channels: all + filter: NONE + format: crdb-v1-tty + redact: false + redactable: true + exit-on-error: true +capture-stray-errors: + enable: true + dir: /default-dir + max-group-size: 100MiB # Check that it's possible to capture all channels. yaml @@ -185,6 +272,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: custom: @@ -228,6 +322,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: custom: @@ -253,6 +354,66 @@ capture-stray-errors: dir: /default-dir max-group-size: 100MiB +# Check that "auditable" is transformed into other fluent flags. +yaml +sinks: + fluent-servers: + custom: + channels: DEV + address: localhost:5170 + auditable: true +---- +file-defaults: + dir: /default-dir + max-file-size: 10MiB + max-group-size: 100MiB + filter: INFO + format: crdb-v1 + redact: false + redactable: true + exit-on-error: true + auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false +sinks: + file-groups: + default: + channels: all + dir: /default-dir + max-file-size: 10MiB + max-group-size: 100MiB + sync-writes: false + filter: INFO + format: crdb-v1 + redact: false + redactable: true + exit-on-error: true + fluent-servers: + custom: + channels: DEV + net: tcp + address: localhost:5170 + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: true + stderr: + channels: all + filter: NONE + format: crdb-v1-tty + redact: false + redactable: true + exit-on-error: true +capture-stray-errors: + enable: true + dir: /default-dir + max-group-size: 100MiB # Check that "auditable" is transformed into other stderr yaml @@ -272,6 +433,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: file-groups: default: @@ -311,6 +479,13 @@ file-defaults: redactable: true exit-on-error: true auditable: false +fluent-defaults: + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + auditable: false sinks: stderr: channels: all @@ -324,6 +499,25 @@ capture-stray-errors: dir: /default-dir max-group-size: 100MiB +# Check that missing addr is reported. +yaml +sinks: + fluent-servers: + custom: +---- +ERROR: fluent server "custom": address cannot be empty +fluent server "custom": no channel selected + +# Check that invalid proto is rejected. +yaml +sinks: + fluent-servers: + custom: + address: 'abc' + net: 'unknown' +---- +ERROR: fluent server "custom": unknown protocol: "unknown" +fluent server "custom": no channel selected # Check that empty dir is rejected. yaml diff --git a/pkg/util/log/logconfig/testdata/yaml b/pkg/util/log/logconfig/testdata/yaml index fe159add810a..0295cdd05b75 100644 --- a/pkg/util/log/logconfig/testdata/yaml +++ b/pkg/util/log/logconfig/testdata/yaml @@ -83,6 +83,13 @@ sinks: filter: INFO redact: true + fluent-servers: + default: + address: 127.0.0.1:5170 + other: + net: udp + address: 127.0.0.1:5111 + stderr: filter: WARNING redact: false @@ -107,6 +114,12 @@ sinks: perf: filter: INFO redact: true + fluent-servers: + default: + address: 127.0.0.1:5170 + other: + net: udp + address: 127.0.0.1:5111 stderr: filter: WARNING redact: false diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 9cd2b1982025..9733b7ecadf3 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -46,27 +46,46 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { if c.FileDefaults.Filter == logpb.Severity_UNKNOWN { c.FileDefaults.Filter = logpb.Severity_INFO } + if c.FluentDefaults.Filter == logpb.Severity_UNKNOWN { + c.FluentDefaults.Filter = logpb.Severity_INFO + } // Sinks are not auditable by default. if c.FileDefaults.Auditable == nil { c.FileDefaults.Auditable = &bf } + if c.FluentDefaults.Auditable == nil { + c.FluentDefaults.Auditable = &bf + } // No format -> populate defaults. if c.FileDefaults.Format == nil { s := DefaultFileFormat c.FileDefaults.Format = &s } + if c.FluentDefaults.Format == nil { + s := DefaultFluentFormat + c.FluentDefaults.Format = &s + } // No redaction markers -> default keep them. if c.FileDefaults.Redactable == nil { c.FileDefaults.Redactable = &bt } + if c.FluentDefaults.Redactable == nil { + c.FluentDefaults.Redactable = &bt + } // No redaction specification -> default false. if c.FileDefaults.Redact == nil { c.FileDefaults.Redact = &bf } - // No criticality -> default true for files. + if c.FluentDefaults.Redact == nil { + c.FluentDefaults.Redact = &bf + } + // No criticality -> default true for files, false for fluent. if c.FileDefaults.Criticality == nil { c.FileDefaults.Criticality = &bt } + if c.FluentDefaults.Criticality == nil { + c.FluentDefaults.Criticality = &bf + } // Validate and fill in defaults for file sinks. for prefix, fc := range c.Sinks.FileGroups { @@ -80,6 +99,18 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } } + // Validate and defaults for fluent. + for serverName, fc := range c.Sinks.FluentServers { + if fc == nil { + fc = &FluentConfig{} + c.Sinks.FluentServers[serverName] = fc + } + fc.serverName = serverName + if err := c.validateFluentConfig(fc); err != nil { + fmt.Fprintf(&errBuf, "fluent server %q: %v\n", serverName, err) + } + } + // Defaults for stderr. c.inheritCommonDefaults(&c.Sinks.Stderr.CommonSinkConfig, &c.FileDefaults.CommonSinkConfig) if c.Sinks.Stderr.Filter == logpb.Severity_UNKNOWN { @@ -99,6 +130,8 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { // fileSinks maps channels to files. fileSinks := make(map[logpb.Channel]*FileConfig) + // fluentSinks maps channels to fluent servers. + fluentSinks := make(map[logpb.Channel]*FluentConfig) // Check that no channel is listed by more than one file sink, // and every file has at least one channel. @@ -121,6 +154,23 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } } + // Check that no channel is listed by more than one fluent sink, and + // every sink has at least one channel. + for _, fc := range c.Sinks.FluentServers { + if len(fc.Channels.Channels) == 0 { + fmt.Fprintf(&errBuf, "fluent server %q: no channel selected\n", fc.serverName) + } + fc.Channels.Sort() + for _, ch := range fc.Channels.Channels { + if prev := fluentSinks[ch]; prev != nil { + fmt.Fprintf(&errBuf, "fluent server %q: channel %s already captured by server %q\n", + fc.serverName, ch, prev.serverName) + } else { + fluentSinks[ch] = fc + } + } + } + // If capture-stray-errors was enabled, then perform some additional // validation on it. if c.CaptureFd2.Enable { @@ -190,10 +240,25 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { } } + // serverNames collects the names of the servers. We need this to + // store this sorted in c.Sinks.sortedServerNames later. + serverNames := make([]string, 0, len(c.Sinks.FluentServers)) + // Elide all the file sinks without a directory or with severity set + // to NONE. Also collect the remaining names for sorting below. + for serverName, fc := range c.Sinks.FluentServers { + if fc.Filter == logpb.Severity_NONE { + delete(c.Sinks.FluentServers, serverName) + } else { + serverNames = append(serverNames, serverName) + } + } + // Remember the sorted names, so we get deterministic output in // export. sort.Strings(fileGroupNames) c.Sinks.sortedFileGroupNames = fileGroupNames + sort.Strings(serverNames) + c.Sinks.sortedServerNames = serverNames return nil } @@ -219,6 +284,34 @@ func (c *Config) inheritCommonDefaults(fc, defaults *CommonSinkConfig) { } } +func (c *Config) validateFluentConfig(fc *FluentConfig) error { + c.inheritCommonDefaults(&fc.CommonSinkConfig, &c.FluentDefaults.CommonSinkConfig) + + fc.Net = strings.ToLower(strings.TrimSpace(fc.Net)) + switch fc.Net { + case "tcp", "tcp4", "tcp6": + case "udp", "udp4", "udp6": + case "unix": + case "": + fc.Net = "tcp" + default: + return errors.Newf("unknown protocol: %q", fc.Net) + } + fc.Address = strings.TrimSpace(fc.Address) + if fc.Address == "" { + return errors.New("address cannot be empty") + } + + // Apply the auditable flag if set. + if *fc.Auditable { + bt := true + fc.Criticality = &bt + } + fc.Auditable = nil + + return nil +} + func (c *Config) validateFileConfig(fc *FileConfig, defaultLogDir *string) error { c.inheritCommonDefaults(&fc.CommonSinkConfig, &c.FileDefaults.CommonSinkConfig) diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index 29143e3522af..7a2e5bdd486c 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -49,3 +49,4 @@ type logSink interface { var _ logSink = (*stderrSink)(nil) var _ logSink = (*fileSink)(nil) +var _ logSink = (*fluentSink)(nil) diff --git a/pkg/util/log/testdata/config b/pkg/util/log/testdata/config index f837102bc6cc..2284789d53c8 100644 --- a/pkg/util/log/testdata/config +++ b/pkg/util/log/testdata/config @@ -27,6 +27,46 @@ capture-stray-errors: max-group-size: 100MiB +# Test the default config with a fluent server. +yaml +sinks: + fluent-servers: {local: {channels: SESSIONS, address: localhost:5170}} +---- +sinks: + file-groups: + default: + channels: all + dir: TMPDIR + max-file-size: 10MiB + max-group-size: 100MiB + sync-writes: false + filter: INFO + format: crdb-v1 + redact: false + redactable: true + exit-on-error: true + fluent-servers: + s1: + channels: SESSIONS + net: tcp + address: localhost:5170 + filter: INFO + format: json-fluent-compact + redact: false + redactable: true + exit-on-error: false + stderr: + channels: all + filter: NONE + format: crdb-v1-tty + redact: false + redactable: true + exit-on-error: true +capture-stray-errors: + enable: true + dir: TMPDIR + max-group-size: 100MiB + # Test the default config with a catch-all auditable file. yaml