Skip to content

Commit

Permalink
util/log: new experimental integration with Fluentd
Browse files Browse the repository at this point in the history
Release note (cli change): It is now possible to redirect logging to
[Fluentd](https://www.fluentd.org)-compatible network collectors. See
the reference sink documentation for details. This is an alpha-quality
feature.  Note that Fluent-enabled configuration only provide
minimal event buffering, and log entries are dropped if the
logging server becomes unavailable or network errors are
encountered. This is a known limitation and will be likely improved in
a later version.
  • Loading branch information
knz committed Feb 3, 2021
1 parent 4736b8d commit bf4c97d
Show file tree
Hide file tree
Showing 16 changed files with 968 additions and 10 deletions.
84 changes: 84 additions & 0 deletions docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The supported log output sink types are documented below.

- [output to files](#sink-output-to-files)

- [output to Fluentd-compatible log collectors](#sink-output-to-fluentd-compatible-log-collectors)

- [standard error stream](#sink-standard-error-stream)


Expand Down Expand Up @@ -91,6 +93,88 @@ Configuration options shared across all sink types:



<a name="output-to-fluentd-compatible-log-collectors">

## Sink type: output to Fluentd-compatible log collectors


This sink type causes logging data to be sent over the network, to
a log collector that can ingest log data in a
[Fluentd](https://www.fluentd.org)-compatible protocol.

Note that TLS is not supported yet: the connection to the log
collector is neither authenticated nor encrypted. Given that
logging events may contain sensitive information, care should be
taken to keep the log collector and the CockroachDB node close
together on a private network, or connect them using a secure
VPN. TLS support may be added at a later date.

At the time of this writing, a Fluent sink buffers at most one log
entry and retries sending the event at most one time if a network
error is encountered. This is just sufficient to tolerate a restart
of the Fluentd collector after a configuration change under light
logging activity. If the server is unavailable for too long, or if
more than one error is encountered, an error is reported to the
process' standard error output with a copy of the logging event and
the logging event is dropped.

The configuration key under the `sinks` key in the YAML
configuration is `fluent-servers`. Example configuration:

sinks:
fluent-servers: # fluent configurations start here
health: # defines one sink called "health"
channels: HEALTH
address: 127.0.0.1:5170

A cascading defaults mechanism is available for configurations:
every new server sink configured automatically inherits the
configurations set in the `fluent-defaults` section.

For example:

fluent-defaults:
redactable: false # default: disable redaction markers
sinks:
fluent-servers:
health:
channels: HEALTH
# This sink has redactable set to false,
# as the setting is inherited from fluent-defaults
# unless overridden here.

The default output format for Fluent sinks is
`json-fluent-compact`. The `fluent` variants of the JSON formats
include a `tag` field as required by the Fluentd protocol, which
the non-`fluent` JSON format variants do not include.

Users are invited to peruse the `check-log-config` tool to
verify the effect of defaults inheritance.



Type-specific configuration options:

| Field | Description |
|--|--|
| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
| `net` | the protocol for the fluent server. Can be "tcp", "udp", "tcp4", etc. |
| `address` | the network address of the fluent server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. |


Configuration options shared across all sink types:

| Field | Description |
|--|--|
| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. |
| `format` | the entry format to use. |
| `redact` | whether to strip sensitive information before log events are emitted to this sink. |
| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. |
| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. |
| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. |



<a name="standard-error-stream">

## Sink type: standard error stream
Expand Down
4 changes: 4 additions & 0 deletions pkg/cli/exit/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func FatalError() Code { return Code{7} }
// in the logging system.
func TimeoutAfterFatalError() Code { return Code{8} }

// LoggingNetCollectorUnavailable (9) indicates that an error occurred
// during a logging operation to a network collector.
func LoggingNetCollectorUnavailable() Code { return Code{9} }

// Codes that are specific to client commands follow. It's possible
// for codes to be reused across separate client or server commands.
// Command-specific exit codes should be allocated down from 125.
Expand Down
5 changes: 2 additions & 3 deletions pkg/cli/log_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ func TestSetupLogging(t *testing.T) {
reSimplify := regexp.MustCompile(`(?ms:^\s*(auditable: false|redact: false|exit-on-error: true|max-group-size: 100MiB)\n)`)

const defaultFluentConfig = `fluent-defaults: {` +
`buffered-writes: true, ` +
`filter: INFO, ` +
`format: json-fluent-compact, ` +
`redactable: true, ` +
`exit-on-error: false` +
`}, `
`}`
stdFileDefaultsRe := regexp.MustCompile(
`file-defaults: \{dir: (?P<path>[^,]+), max-file-size: 10MiB, buffered-writes: true, filter: INFO, format: crdb-v2, redactable: true\}`)
fileDefaultsNoMaxSizeRe := regexp.MustCompile(
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestSetupLogging(t *testing.T) {
actual = reWhitespace2.ReplaceAllString(actual, "{")

// Shorten the configuration for legibility during reviews of test changes.
actual = strings.ReplaceAll(actual, defaultFluentConfig, "")
actual = strings.ReplaceAll(actual, defaultFluentConfig, "<fluentDefaults>")
actual = stdFileDefaultsRe.ReplaceAllString(actual, "<stdFileDefaults($path)>")
actual = fileDefaultsNoMaxSizeRe.ReplaceAllString(actual, "<fileDefaultsNoMaxSize($path)>")
actual = strings.ReplaceAll(actual, fileDefaultsNoDir, "<fileDefaultsNoDir>")
Expand Down
21 changes: 21 additions & 0 deletions pkg/cli/testdata/logflags
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ run
start
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -33,6 +34,7 @@ run
start-single-node
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -57,12 +59,14 @@ run
sql
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledWarningNoRedaction>}}

run
init
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledWarningNoRedaction>}}


Expand All @@ -74,6 +78,7 @@ run
bank
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}


Expand All @@ -83,6 +88,7 @@ run
demo
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrCfg(NONE,false)>}}


Expand All @@ -97,6 +103,7 @@ start
--store=type=mem,size=3g
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}


Expand All @@ -108,6 +115,7 @@ start
--store=path=/pathB
----
config: {<stdFileDefaults(/pathA/logs)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -130,6 +138,7 @@ start
--log=file-defaults: {dir: /mypath}
----
config: {<stdFileDefaults(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -153,6 +162,7 @@ start
--log=file-defaults: {dir: /pathA/logs}
----
config: {<stdFileDefaults(/pathA/logs)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand Down Expand Up @@ -181,6 +191,7 @@ start
--log=file-defaults: {dir: /mypath}
----
config: {<stdFileDefaults(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -202,6 +213,7 @@ start
--log=sinks: {stderr: {filter: ERROR}}
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -224,6 +236,7 @@ start
--log=capture-stray-errors: {enable: false}
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand Down Expand Up @@ -267,6 +280,7 @@ start
--log-dir=
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}


Expand All @@ -276,6 +290,7 @@ start
--log-dir=/mypath
----
config: {<stdFileDefaults(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -299,6 +314,7 @@ start
--log-dir=/pathA
----
config: {<stdFileDefaults(/pathA)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -322,6 +338,7 @@ init
--log-dir=/mypath
----
config: {<fileDefaultsNoMaxSize(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: {channels: all,
dir: /mypath,
buffered-writes: true,
Expand All @@ -337,6 +354,7 @@ start
--logtostderr=INFO
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -358,6 +376,7 @@ start
--logtostderr
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -379,6 +398,7 @@ init
--logtostderr=INFO
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}

# Default when no severity is specified is WARNING.
Expand All @@ -387,6 +407,7 @@ init
--logtostderr
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledWarningNoRedaction>}}


Expand Down
2 changes: 2 additions & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"file_log_gc.go",
"file_sync_buffer.go",
"flags.go",
"fluent_client.go",
"format_crdb_v1.go",
"format_crdb_v2.go",
"format_json.go",
Expand Down Expand Up @@ -127,6 +128,7 @@ go_test(
"file_log_gc_test.go",
"file_test.go",
"flags_test.go",
"fluent_client_test.go",
"format_crdb_v2_test.go",
"format_json_test.go",
"main_test.go",
Expand Down
57 changes: 57 additions & 0 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func init() {
// we cannot keep redaction markers there.
*defaultConfig.Sinks.Stderr.Redactable = false
// Remove all sinks other than stderr.
defaultConfig.Sinks.FluentServers = nil
defaultConfig.Sinks.FileGroups = nil

if _, err := ApplyConfig(defaultConfig); err != nil {
Expand Down Expand Up @@ -279,6 +280,26 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}

// Create the fluent sinks.
for _, fc := range config.Sinks.FluentServers {
if fc.Filter == severity.NONE {
continue
}
fluentSinkInfo, err := newFluentSinkInfo(*fc)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fluentSinkInfo)
allSinkInfos.put(fluentSinkInfo)

// Connect the channels for this sink.
for _, ch := range fc.Channels.Channels {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
}
}

logging.setChannelLoggers(chans, &stderrSinkInfo)
setActive()

Expand All @@ -305,6 +326,18 @@ func newFileSinkInfo(
return info, fileSink, nil
}

// newFluentSinkInfo creates a new fluentSink and its accompanying sinkInfo
// from the provided configuration.
func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {
info := &sinkInfo{}
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
fluentSink := newFluentSink(c.Net, c.Address)
info.sink = fluentSink
return info, nil
}

// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
Expand Down Expand Up @@ -438,6 +471,30 @@ func DescribeAppliedConfig() string {
return nil
})

// Describe the fluent sinks.
config.Sinks.FluentServers = make(map[string]*logconfig.FluentSinkConfig)
sIdx := 1
_ = allSinkInfos.iter(func(l *sinkInfo) error {
fluentSink, ok := l.sink.(*fluentSink)
if !ok {
return nil
}

fc := &logconfig.FluentSinkConfig{}
fc.CommonSinkConfig = l.describeAppliedConfig()
fc.Net = fluentSink.network
fc.Address = fluentSink.addr

// Describe the connections to this fluent sink.
for ch, logger := range chans {
describeConnections(logger, ch, l, &fc.Channels)
}
skey := fmt.Sprintf("s%d", sIdx)
sIdx++
config.Sinks.FluentServers[skey] = fc
return nil
})

// Note: we cannot return 'config' directly, because this captures
// certain variables from the loggers by reference and thus could be
// invalidated by concurrent uses of ApplyConfig().
Expand Down
Loading

0 comments on commit bf4c97d

Please sign in to comment.