Skip to content

Commit

Permalink
Add structured logging to logs input (backport #25299) (#25390)
Browse files Browse the repository at this point in the history
(cherry picked from commit 448afd4)

Co-authored-by: Steffen Siering <steffen.siering@elastic.co>
  • Loading branch information
mergify[bot] and Steffen Siering authored May 3, 2021
1 parent cf66870 commit 3258307
Show file tree
Hide file tree
Showing 11 changed files with 178 additions and 102 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Rename `s3` input to `aws-s3` input. {pull}23469[23469]
- Add User Agent Parser for Azure Sign In Logs Ingest Pipeline {pull}23201[23201]
- Changes filebeat httpjson input's append transform to create a list even with only a single value{pull}25074[25074]
- Change logging in logs input to structure logging. Some log message formats have changed. {pull}25299[25299]
- All url.* fields apart from url.original in the Apache, Nginx, IIS, Traefik, S3Access, Cisco, F5, Fortinet, Google Workspace, Imperva, Microsoft, Netscout, O365, Sophos, Squid, Suricata, Zeek, Zia, Zoom, and ZScaler modules are now url unescaped due to using the Elasticsearch uri_parts processor. {pull}24699[24699]

*Heartbeat*
Expand Down
61 changes: 35 additions & 26 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type OutletFactory func() channel.Outleter

// Harvester contains all harvester related data
type Harvester struct {
logger *logp.Logger

id uuid.UUID
config config
source harvester.Source // the source being watched
Expand Down Expand Up @@ -120,6 +122,7 @@ type harvesterProgressMetrics struct {

// NewHarvester creates a new harvester
func NewHarvester(
logger *logp.Logger,
config *common.Config,
state file.State,
states *file.States,
Expand All @@ -132,7 +135,9 @@ func NewHarvester(
return nil, err
}

logger = logger.Named("harvester").With("harvester_id", id)
h := &Harvester{
logger: logger,
config: defaultConfig(),
state: state,
states: states,
Expand Down Expand Up @@ -204,7 +209,7 @@ func (h *Harvester) Setup() error {
return err
}

logp.Debug("harvester", "Harvester setup successful. Line terminator: %d", h.config.LineTerminator)
h.logger.Debugf("Harvester setup successful. Line terminator: %d", h.config.LineTerminator)

return nil
}
Expand Down Expand Up @@ -234,6 +239,8 @@ func (h *Harvester) updateCurrentSize() error {

// Run start the harvester and reads files line by line and sends events to the defined output
func (h *Harvester) Run() error {
logger := h.logger

// Allow for some cleanup on termination
if h.onTerminate != nil {
defer h.onTerminate()
Expand Down Expand Up @@ -287,19 +294,19 @@ func (h *Harvester) Run() error {
select {
// Applies when timeout is reached
case <-closeTimeout:
logp.Info("Closing harvester because close_timeout was reached: %s", source)
logger.Infof("Closing harvester because close_timeout was reached: %s", source)
// Required when reader loop returns and reader finished
case <-h.done:
}

h.stop()
err := h.reader.Close()
if err != nil {
logp.Err("Failed to stop harvester for file %s: %v", h.state.Source, err)
logger.Errorf("Failed to stop harvester for file: %v", err)
}
}(h.state.Source)

logp.Info("Harvester started for file: %s", h.state.Source)
logger.Info("Harvester started for file.")

h.doneWg.Add(1)
go func() {
Expand All @@ -318,21 +325,21 @@ func (h *Harvester) Run() error {
if err != nil {
switch err {
case ErrFileTruncate:
logp.Info("File was truncated. Begin reading file from offset 0: %s", h.state.Source)
logger.Info("File was truncated. Begin reading file from offset 0.")
h.state.Offset = 0
filesTruncated.Add(1)
case ErrRemoved:
logp.Info("File was removed: %s. Closing because close_removed is enabled.", h.state.Source)
logger.Info("File was removed. Closing because close_removed is enabled.")
case ErrRenamed:
logp.Info("File was renamed: %s. Closing because close_renamed is enabled.", h.state.Source)
logger.Info("File was renamed. Closing because close_renamed is enabled.")
case ErrClosed:
logp.Info("Reader was closed: %s. Closing.", h.state.Source)
logger.Info("Reader was closed. Closing.")
case io.EOF:
logp.Info("End of file reached: %s. Closing because close_eof is enabled.", h.state.Source)
logger.Info("End of file reached. Closing because close_eof is enabled.")
case ErrInactive:
logp.Info("File is inactive: %s. Closing because close_inactive of %v reached.", h.state.Source, h.config.CloseInactive)
logger.Infof("File is inactive. Closing because close_inactive of %v reached.", h.config.CloseInactive)
default:
logp.Err("Read line error: %v; File: %v", err, h.state.Source)
logger.Errorf("Read line error: %v", err)
}
return nil
}
Expand Down Expand Up @@ -370,7 +377,7 @@ func (h *Harvester) monitorFileSize() {
case <-ticker.C:
err := h.updateCurrentSize()
if err != nil {
logp.Err("Error updating file size: %v; File: %v", err, h.state.Source)
h.logger.Errorf("Error updating file size: %v", err)
}
}
}
Expand Down Expand Up @@ -481,7 +488,7 @@ func (h *Harvester) SendStateUpdate() {

h.publishState(h.state)

logp.Debug("harvester", "Update state: %s, offset: %v", h.state.Source, h.state.Offset)
h.logger.Debugf("Update state (offset: %v).", h.state.Offset)
h.states.Update(h.state)
}

Expand All @@ -491,14 +498,14 @@ func (h *Harvester) shouldExportLine(line string) bool {
if len(h.config.IncludeLines) > 0 {
if !harvester.MatchAny(h.config.IncludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does not match any of the include patterns %s", line)
h.logger.Debugf("Drop line as it does not match any of the include patterns %s", line)
return false
}
}
if len(h.config.ExcludeLines) > 0 {
if harvester.MatchAny(h.config.ExcludeLines, line) {
// drop line
logp.Debug("harvester", "Drop line as it does match one of the exclude patterns%s", line)
h.logger.Debugf("Drop line as it does match one of the exclude patterns%s", line)
return false
}
}
Expand Down Expand Up @@ -539,6 +546,8 @@ func (h *Harvester) openFile() error {
}

func (h *Harvester) validateFile(f *os.File) error {
logger := h.logger

info, err := f.Stat()
if err != nil {
return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err)
Expand All @@ -557,9 +566,9 @@ func (h *Harvester) validateFile(f *os.File) error {
if err != nil {

if err == transform.ErrShortSrc {
logp.Info("Initialising encoding for '%v' failed due to file being too short", f)
logger.Infof("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logp.Err("Initialising encoding for '%v' failed: %v", f, err)
logger.Errorf("Initialising encoding for '%v' failed: %v", f, err)
}
return err
}
Expand All @@ -570,7 +579,7 @@ func (h *Harvester) validateFile(f *os.File) error {
return err
}

logp.Debug("harvester", "Setting offset for file: %s. Offset: %d ", h.state.Source, offset)
logger.Debugf("Setting offset: %d ", offset)
h.state.Offset = offset

return nil
Expand All @@ -579,12 +588,12 @@ func (h *Harvester) validateFile(f *os.File) error {
func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
// continue from last known offset
if h.state.Offset > 0 {
logp.Debug("harvester", "Set previous offset for file: %s. Offset: %d ", h.state.Source, h.state.Offset)
h.logger.Debugf("Set previous offset: %d ", h.state.Offset)
return file.Seek(h.state.Offset, os.SEEK_SET)
}

// get offset from file in case of encoding factory was required to read some data.
logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source)
h.logger.Debug("Setting offset to: 0")
return file.Seek(0, os.SEEK_CUR)
}

Expand All @@ -605,8 +614,8 @@ func (h *Harvester) cleanup() {
// Mark harvester as finished
h.state.Finished = true

logp.Debug("harvester", "Stopping harvester for file: %s", h.state.Source)
defer logp.Debug("harvester", "harvester cleanup finished for file: %s", h.state.Source)
h.logger.Debugf("Stopping harvester.")
defer h.logger.Debugf("harvester cleanup finished.")

// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
Expand All @@ -615,14 +624,14 @@ func (h *Harvester) cleanup() {
// close file handler
h.source.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
h.logger.Debugf("Closing file")
harvesterOpenFiles.Add(-1)

// On completion, push offset so we can continue where we left off if we relaunch on the same file
// Only send offset if file object was created successfully
h.SendStateUpdate()
} else {
logp.Warn("Stopping harvester, NOT closing file as file info not available: %s", h.state.Source)
h.logger.Warn("Stopping harvester, NOT closing file as file info not available.")
}

harvesterClosed.Add(1)
Expand All @@ -645,12 +654,12 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

logp.Debug("harvester", "newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)
h.logger.Debugf("newLogFileReader with config.MaxBytes: %d", h.config.MaxBytes)

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
h.log, err = NewLog(h.source, h.config.LogConfig)
h.log, err = NewLog(h.logger, h.source, h.config.LogConfig)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions filebeat/input/log/harvester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
Expand Down Expand Up @@ -75,6 +76,7 @@ func TestReadLine(t *testing.T) {
source := File{File: readFile}

h := Harvester{
logger: logp.NewLogger("harvester"),
config: config{
LogConfig: LogConfig{
CloseInactive: 500 * time.Millisecond,
Expand Down
Loading

0 comments on commit 3258307

Please sign in to comment.