Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/log: new experimental integration with Fluentd #57170

Merged
merged 1 commit into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The supported log output sink types are documented below.

- [output to files](#sink-output-to-files)

- [output to Fluentd-compatible log collectors](#sink-output-to-fluentd-compatible-log-collectors)

- [standard error stream](#sink-standard-error-stream)


Expand Down Expand Up @@ -91,6 +93,88 @@ Configuration options shared across all sink types:



<a name="output-to-fluentd-compatible-log-collectors">

## Sink type: output to Fluentd-compatible log collectors


This sink type causes logging data to be sent over the network, to
a log collector that can ingest log data in a
[Fluentd](https://www.fluentd.org)-compatible protocol.

Note that TLS is not supported yet: the connection to the log
collector is neither authenticated nor encrypted. Given that
logging events may contain sensitive information, care should be
taken to keep the log collector and the CockroachDB node close
together on a private network, or connect them using a secure
VPN. TLS support may be added at a later date.

At the time of this writing, a Fluent sink buffers at most one log
entry and retries sending the event at most one time if a network
error is encountered. This is just sufficient to tolerate a restart
of the Fluentd collector after a configuration change under light
logging activity. If the server is unavailable for too long, or if
more than one error is encountered, an error is reported to the
process' standard error output with a copy of the logging event and
the logging event is dropped.

The configuration key under the `sinks` key in the YAML
configuration is `fluent-servers`. Example configuration:

sinks:
fluent-servers: # fluent configurations start here
health: # defines one sink called "health"
channels: HEALTH
address: 127.0.0.1:5170

A cascading defaults mechanism is available for configurations:
every new server sink configured automatically inherits the
configurations set in the `fluent-defaults` section.

For example:

fluent-defaults:
redactable: false # default: disable redaction markers
sinks:
fluent-servers:
health:
channels: HEALTH
# This sink has redactable set to false,
# as the setting is inherited from fluent-defaults
# unless overridden here.

The default output format for Fluent sinks is
`json-fluent-compact`. The `fluent` variants of the JSON formats
include a `tag` field as required by the Fluentd protocol, which
the non-`fluent` JSON format variants do not include.

Users are invited to peruse the `check-log-config` tool to
verify the effect of defaults inheritance.



Type-specific configuration options:

| Field | Description |
|--|--|
| `channels` | the list of logging channels that use this sink. See the [channel selection configuration](#channel-format) section for details. |
| `net` | the protocol for the fluent server. Can be "tcp", "udp", "tcp4", etc. |
| `address` | the network address of the fluent server. The host/address and port parts are separated with a colon. IPv6 numeric addresses should be included within square brackets, e.g.: [::1]:1234. |


Configuration options shared across all sink types:

| Field | Description |
|--|--|
| `filter` | the minimum severity for log events to be emitted to this sink. This can be set to NONE to disable the sink. |
| `format` | the entry format to use. |
| `redact` | whether to strip sensitive information before log events are emitted to this sink. |
| `redactable` | whether to keep redaction markers in the sink's output. The presence of redaction markers makes it possible to strip sensitive data reliably. |
| `exit-on-error` | whether the logging system should terminate the process if an error is encountered while writing to this sink. |
| `auditable` | translated to tweaks to the other settings for this sink during validation. For example, it enables `exit-on-error` and changes the format of files from `crdb-v1` to `crdb-v1-count`. |



<a name="standard-error-stream">

## Sink type: standard error stream
Expand Down
4 changes: 4 additions & 0 deletions pkg/cli/exit/codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ func FatalError() Code { return Code{7} }
// in the logging system.
func TimeoutAfterFatalError() Code { return Code{8} }

// LoggingNetCollectorUnavailable (9) indicates that an error occurred
// during a logging operation to a network collector.
func LoggingNetCollectorUnavailable() Code { return Code{9} }

// Codes that are specific to client commands follow. It's possible
// for codes to be reused across separate client or server commands.
// Command-specific exit codes should be allocated down from 125.
Expand Down
5 changes: 2 additions & 3 deletions pkg/cli/log_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@ func TestSetupLogging(t *testing.T) {
reSimplify := regexp.MustCompile(`(?ms:^\s*(auditable: false|redact: false|exit-on-error: true|max-group-size: 100MiB)\n)`)

const defaultFluentConfig = `fluent-defaults: {` +
`buffered-writes: true, ` +
`filter: INFO, ` +
`format: json-fluent-compact, ` +
`redactable: true, ` +
`exit-on-error: false` +
`}, `
`}`
stdFileDefaultsRe := regexp.MustCompile(
`file-defaults: \{dir: (?P<path>[^,]+), max-file-size: 10MiB, buffered-writes: true, filter: INFO, format: crdb-v2, redactable: true\}`)
fileDefaultsNoMaxSizeRe := regexp.MustCompile(
Expand Down Expand Up @@ -106,7 +105,7 @@ func TestSetupLogging(t *testing.T) {
actual = reWhitespace2.ReplaceAllString(actual, "{")

// Shorten the configuration for legibility during reviews of test changes.
actual = strings.ReplaceAll(actual, defaultFluentConfig, "")
actual = strings.ReplaceAll(actual, defaultFluentConfig, "<fluentDefaults>")
actual = stdFileDefaultsRe.ReplaceAllString(actual, "<stdFileDefaults($path)>")
actual = fileDefaultsNoMaxSizeRe.ReplaceAllString(actual, "<fileDefaultsNoMaxSize($path)>")
actual = strings.ReplaceAll(actual, fileDefaultsNoDir, "<fileDefaultsNoDir>")
Expand Down
21 changes: 21 additions & 0 deletions pkg/cli/testdata/logflags
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ run
start
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -33,6 +34,7 @@ run
start-single-node
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -57,12 +59,14 @@ run
sql
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledWarningNoRedaction>}}

run
init
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledWarningNoRedaction>}}


Expand All @@ -74,6 +78,7 @@ run
bank
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}


Expand All @@ -83,6 +88,7 @@ run
demo
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrCfg(NONE,false)>}}


Expand All @@ -97,6 +103,7 @@ start
--store=type=mem,size=3g
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}


Expand All @@ -108,6 +115,7 @@ start
--store=path=/pathB
----
config: {<stdFileDefaults(/pathA/logs)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -130,6 +138,7 @@ start
--log=file-defaults: {dir: /mypath}
----
config: {<stdFileDefaults(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -153,6 +162,7 @@ start
--log=file-defaults: {dir: /pathA/logs}
----
config: {<stdFileDefaults(/pathA/logs)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand Down Expand Up @@ -181,6 +191,7 @@ start
--log=file-defaults: {dir: /mypath}
----
config: {<stdFileDefaults(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -202,6 +213,7 @@ start
--log=sinks: {stderr: {filter: ERROR}}
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -224,6 +236,7 @@ start
--log=capture-stray-errors: {enable: false}
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand Down Expand Up @@ -267,6 +280,7 @@ start
--log-dir=
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}


Expand All @@ -276,6 +290,7 @@ start
--log-dir=/mypath
----
config: {<stdFileDefaults(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -299,6 +314,7 @@ start
--log-dir=/pathA
----
config: {<stdFileDefaults(/pathA)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -322,6 +338,7 @@ init
--log-dir=/mypath
----
config: {<fileDefaultsNoMaxSize(/mypath)>,
<fluentDefaults>,
sinks: {file-groups: {default: {channels: all,
dir: /mypath,
buffered-writes: true,
Expand All @@ -337,6 +354,7 @@ start
--logtostderr=INFO
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -358,6 +376,7 @@ start
--logtostderr
----
config: {<stdFileDefaults(<defaultLogDir>)>,
<fluentDefaults>,
sinks: {file-groups: {default: <fileCfg([DEV,
OPS,
HEALTH,
Expand All @@ -379,6 +398,7 @@ init
--logtostderr=INFO
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledInfoNoRedaction>}}

# Default when no severity is specified is WARNING.
Expand All @@ -387,6 +407,7 @@ init
--logtostderr
----
config: {<fileDefaultsNoDir>,
<fluentDefaults>,
sinks: {<stderrEnabledWarningNoRedaction>}}


Expand Down
2 changes: 2 additions & 0 deletions pkg/util/log/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"file_log_gc.go",
"file_sync_buffer.go",
"flags.go",
"fluent_client.go",
"format_crdb_v1.go",
"format_crdb_v2.go",
"format_json.go",
Expand Down Expand Up @@ -127,6 +128,7 @@ go_test(
"file_log_gc_test.go",
"file_test.go",
"flags_test.go",
"fluent_client_test.go",
"format_crdb_v2_test.go",
"format_json_test.go",
"main_test.go",
Expand Down
57 changes: 57 additions & 0 deletions pkg/util/log/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func init() {
// we cannot keep redaction markers there.
*defaultConfig.Sinks.Stderr.Redactable = false
// Remove all sinks other than stderr.
defaultConfig.Sinks.FluentServers = nil
defaultConfig.Sinks.FileGroups = nil

if _, err := ApplyConfig(defaultConfig); err != nil {
Expand Down Expand Up @@ -279,6 +280,26 @@ func ApplyConfig(config logconfig.Config) (cleanupFn func(), err error) {
}
}

// Create the fluent sinks.
for _, fc := range config.Sinks.FluentServers {
if fc.Filter == severity.NONE {
continue
}
fluentSinkInfo, err := newFluentSinkInfo(*fc)
if err != nil {
cleanupFn()
return nil, err
}
sinkInfos = append(sinkInfos, fluentSinkInfo)
allSinkInfos.put(fluentSinkInfo)

// Connect the channels for this sink.
for _, ch := range fc.Channels.Channels {
l := chans[ch]
l.sinkInfos = append(l.sinkInfos, fluentSinkInfo)
}
}

logging.setChannelLoggers(chans, &stderrSinkInfo)
setActive()

Expand All @@ -305,6 +326,18 @@ func newFileSinkInfo(
return info, fileSink, nil
}

// newFluentSinkInfo creates a new fluentSink and its accompanying sinkInfo
// from the provided configuration.
func newFluentSinkInfo(c logconfig.FluentSinkConfig) (*sinkInfo, error) {
info := &sinkInfo{}
if err := info.applyConfig(c.CommonSinkConfig); err != nil {
return nil, err
}
fluentSink := newFluentSink(c.Net, c.Address)
info.sink = fluentSink
return info, nil
}

// applyConfig applies a common sink configuration to a sinkInfo.
func (l *sinkInfo) applyConfig(c logconfig.CommonSinkConfig) error {
l.threshold = c.Filter
Expand Down Expand Up @@ -438,6 +471,30 @@ func DescribeAppliedConfig() string {
return nil
})

// Describe the fluent sinks.
config.Sinks.FluentServers = make(map[string]*logconfig.FluentSinkConfig)
sIdx := 1
_ = allSinkInfos.iter(func(l *sinkInfo) error {
fluentSink, ok := l.sink.(*fluentSink)
if !ok {
return nil
}

fc := &logconfig.FluentSinkConfig{}
fc.CommonSinkConfig = l.describeAppliedConfig()
fc.Net = fluentSink.network
fc.Address = fluentSink.addr

// Describe the connections to this fluent sink.
for ch, logger := range chans {
describeConnections(logger, ch, l, &fc.Channels)
}
skey := fmt.Sprintf("s%d", sIdx)
sIdx++
config.Sinks.FluentServers[skey] = fc
return nil
})

// Note: we cannot return 'config' directly, because this captures
// certain variables from the loggers by reference and thus could be
// invalidated by concurrent uses of ApplyConfig().
Expand Down
Loading