Skip to content

Commit

Permalink
Merge pull request elastic#10 from ruflin/state-file
Browse files Browse the repository at this point in the history
State file
  • Loading branch information
monicasarbu committed Sep 14, 2015
2 parents d53af44 + 4b34f0f commit 8bf0dad
Show file tree
Hide file tree
Showing 9 changed files with 131 additions and 71 deletions.
13 changes: 8 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ARCH?=$(shell uname -m)
GODEP=$(GOPATH)/bin/godep

.PHONY: build
build:

filebeat: $(GOFILES)
# first make sure we have godep
go get github.com/tools/godep
$(GODEP) go build
Expand All @@ -21,8 +21,8 @@ clean:
-rm -r cover

.PHONY: run
run: build
./filebeat -c etc/filebeat.yml -e -v
run: filebeat
./filebeat -c etc/filebeat.dev.yml -e -v

.PHONY: test
test:
Expand All @@ -38,5 +38,8 @@ cover:

# Command used by CI Systems
.PHONE: testsuite
testsuite: build
testsuite: filebeat
make cover

filebeat.test: $(GOFILES)
$(GODEP) go test -c -cover -covermode=count -coverpkg ./...
17 changes: 14 additions & 3 deletions beat/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
fb.SpoolChan = make(chan *FileEvent, 16)
fb.publisherChan = make(chan []*FileEvent, 1)
fb.RegistrarChan = make(chan []*FileEvent, 1)

persist := make(map[string]*FileState)

restart := &ProspectorResume{}
restart.LoadState()
registrar := &Registrar{
registryFile: fb.FbConfig.Filebeat.RegistryFile,
}
registrar.Init()

restart := &ProspectorResume{
Persist: make(chan *FileState),
// Load the previous log file locations now, for use in prospector
Files: make(map[string]*FileState),
}

registrar.LoadState(restart.Files)
restart.Scan(fb.FbConfig.Filebeat.Files, persist, fb.SpoolChan)

// Start spooler: Harvesters dump events into the spooler.
Expand All @@ -93,7 +104,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
go Publish(b, fb)

// registrar records last acknowledged positions in all files.
Registrar(persist, fb.RegistrarChan)
registrar.WriteState(persist, fb.RegistrarChan)

return nil
}
Expand Down
43 changes: 38 additions & 5 deletions beat/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,38 @@ import (
"github.com/elastic/libbeat/logp"
)

func Registrar(state map[string]*FileState, input chan []*FileEvent) {
type Registrar struct {
registryFile string
}

func (r *Registrar) Init() {
// Set to default in case it is not set
if r.registryFile == "" {
r.registryFile = ".filebeat"
}

logp.Debug("registrar", "Registry file set to: %s", r.registryFile)

}

// loadState fetches the previous reading state from the configure registryFile file
// The default file is .filebeat file which is stored in the same path as the binary is running
func (r *Registrar) LoadState(files map[string]*FileState) {

if existing, e := os.Open(r.registryFile); e == nil {
defer existing.Close()
wd := ""
if wd, e = os.Getwd(); e != nil {
logp.Warn("WARNING: os.Getwd retuned unexpected error %s -- ignoring", e.Error())
}
logp.Info("Loading registrar data from %s/%s", wd, r.registryFile)

decoder := json.NewDecoder(existing)
decoder.Decode(&files)
}
}

func (r *Registrar) WriteState(state map[string]*FileState, input chan []*FileEvent) {
logp.Debug("registrar", "Starting Registrar")
for events := range input {
logp.Debug("registrar", "Registrar: processing %d events", len(events))
Expand All @@ -22,7 +53,7 @@ func Registrar(state map[string]*FileState, input chan []*FileEvent) {
state[*event.Source] = event.GetState()
}

if e := writeRegistry(state, ".filebeat"); e != nil {
if e := r.writeRegistry(state); e != nil {
// REVU: but we should panic, or something, right?
logp.Warn("WARNING: (continuing) update of registry returned error: %s", e)
}
Expand All @@ -31,8 +62,10 @@ func Registrar(state map[string]*FileState, input chan []*FileEvent) {
}

// writeRegistry Writes the new json registry file to disk
func writeRegistry(state map[string]*FileState, path string) error {
tempfile := path + ".new"
func (r *Registrar) writeRegistry(state map[string]*FileState) error {
logp.Debug("registrar", "Write registry file:", r.registryFile)

tempfile := r.registryFile + ".new"
file, e := os.Create(tempfile)
if e != nil {
logp.Err("Failed to create tempfile (%s) for writing: %s", tempfile, e)
Expand All @@ -43,5 +76,5 @@ func writeRegistry(state map[string]*FileState, path string) error {
encoder := json.NewEncoder(file)
encoder.Encode(state)

return SafeFileRotate(path, tempfile)
return SafeFileRotate(r.registryFile, tempfile)
}
3 changes: 2 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type FilebeatConfig struct {
IdleTimeout time.Duration
TailOnRotate bool
Quiet bool
RegistryFile string
}

type FileConfig struct {
Expand Down Expand Up @@ -104,7 +105,7 @@ func mergeConfigFiles(configFiles []string, config *Config) error {
}

// Fetches and merges all config files given by Options.configArgs. All are put into one config object
func (config *Config) FetchConfigs(path string) {
func (config *Config) FetchConfigs(path string) {

configFiles, err := getConfigFiles(path)

Expand Down
67 changes: 40 additions & 27 deletions crawler/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Harvester struct {
func (h *Harvester) Harvest(output chan *FileEvent) {
h.open()
info, e := h.file.Stat()

if e != nil {
panic(fmt.Sprintf("Harvest: unexpected error: %s", e.Error()))
}
Expand All @@ -43,33 +44,22 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
reader := bufio.NewReaderSize(h.file, cfg.CmdlineOptions.HarvesterBufferSize) // 16kb buffer by default
buffer := new(bytes.Buffer)

var read_timeout = 10 * time.Second
last_read_time := time.Now()
var readTimeout = 10 * time.Second
lastReadTime := time.Now()
for {
text, bytesread, err := h.readline(reader, buffer, read_timeout)
text, bytesread, err := h.readline(reader, buffer, readTimeout)

if err != nil {
if err == io.EOF {
// timed out waiting for data, got eof.
// Check to see if the file was truncated
info, _ := h.file.Stat()
if info.Size() < h.Offset {
logp.Debug("harvester", "File truncated, seeking to beginning: %s", h.Path)
h.file.Seek(0, os.SEEK_SET)
h.Offset = 0
} else if age := time.Since(last_read_time); age > h.FileConfig.DeadtimeSpan {
// if last_read_time was more than dead time, this file is probably
// dead. Stop watching it.
logp.Debug("harvester", "Stopping harvest of ", h.Path, "last change was: ", age)
return
}
continue
} else {
logp.Err("Unexpected state reading from %s; error: %s", h.Path, err)
err = h.handleReadlineError(lastReadTime, err)

if err != nil {
return
} else {
continue
}
}
last_read_time = time.Now()

lastReadTime = time.Now()

line++
event := &FileEvent{
Expand All @@ -83,7 +73,30 @@ func (h *Harvester) Harvest(output chan *FileEvent) {
h.Offset += int64(bytesread)

output <- event // ship the new event downstream
} /* forever */
}
}

// Handles eror durint reading file. If EOF and nothing special, exit without errors
func (h *Harvester) handleReadlineError(lastTimeRead time.Time, err error) error {
if err == io.EOF {
// timed out waiting for data, got eof.
// Check to see if the file was truncated
info, _ := h.file.Stat()
if info.Size() < h.Offset {
logp.Debug("harvester", "File truncated, seeking to beginning: %s", h.Path)
h.file.Seek(0, os.SEEK_SET)
h.Offset = 0
} else if age := time.Since(lastTimeRead); age > h.FileConfig.DeadtimeSpan {
// if lastTimeRead was more than dead time, this file is probably
// dead. Stop watching it.
logp.Debug("harvester", "Stopping harvest of ", h.Path, "last change was: ", age)
return err
}
} else {
logp.Err("Unexpected state reading from %s; error: %s", h.Path, err)
return err
}
return nil
}

// initOffset finds the current offset of the file and sets it in the harvester as position
Expand Down Expand Up @@ -111,7 +124,6 @@ func (h *Harvester) setFileOffset() {
} else {
h.file.Seek(0, os.SEEK_SET)
}

}

func (h *Harvester) open() *os.File {
Expand All @@ -126,6 +138,7 @@ func (h *Harvester) open() *os.File {
h.file, err = os.Open(h.Path)

if err != nil {
// TODO: This is currently end endless retry, should be set to a max?
// retry on failure.
logp.Err("Failed opening %s: %s", h.Path, err)
time.Sleep(5 * time.Second)
Expand All @@ -149,7 +162,7 @@ func (h *Harvester) open() *os.File {
}

func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_timeout time.Duration) (*string, int, error) {
var is_partial bool = true
var isPartial bool = true
var newline_length int = 1
start_time := time.Now()

Expand All @@ -159,7 +172,7 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim
if segment != nil && len(segment) > 0 {
if segment[len(segment)-1] == '\n' {
// Found a complete line
is_partial = false
isPartial = false

// Check if also a CR present
if len(segment) > 1 && segment[len(segment)-2] == '\r' {
Expand All @@ -172,7 +185,7 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim
}

if err != nil {
if err == io.EOF && is_partial {
if err == io.EOF && isPartial {
time.Sleep(1 * time.Second) // TODO(sissel): Implement backoff

// Give up waiting for data after a certain amount of time.
Expand All @@ -188,7 +201,7 @@ func (h *Harvester) readline(reader *bufio.Reader, buffer *bytes.Buffer, eof_tim
}

// If we got a full line, return the whole line without the EOL chars (CRLF or LF)
if !is_partial {
if !isPartial {
// Get the str length with the EOL chars (LF or CRLF)
bufferSize := buffer.Len()
str := new(string)
Expand Down
23 changes: 0 additions & 23 deletions crawler/prospector.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package crawler

import (
"encoding/json"
"os"
"path/filepath"
"time"
Expand Down Expand Up @@ -30,28 +29,6 @@ type ProspectorInfo struct {
Last_seen uint32 /* int number of the last iterations in which we saw this file */
}

// loadState fetches the previes reading state from the .filebeat file
// The .filebeat file is stored in the same path as the binary is running
func (restart *ProspectorResume) LoadState() {
restart.Persist = make(chan *FileState)

// Load the previous log file locations now, for use in prospector
restart.Files = make(map[string]*FileState)

// TODO: Should the location and path of .filebeat be configurable?
if existing, e := os.Open(".filebeat"); e == nil {
defer existing.Close()
wd := ""
if wd, e = os.Getwd(); e != nil {
logp.Warn("WARNING: os.Getwd retuned unexpected error %s -- ignoring", e.Error())
}
logp.Info("Loading registrar data from %s/.filebeat", wd)

decoder := json.NewDecoder(existing)
decoder.Decode(&restart.Files)
}
}

func (restart *ProspectorResume) Scan(files []cfg.FileConfig, persist map[string]*FileState, eventChan chan *FileEvent) {
pendingProspectorCnt := 0

Expand Down
13 changes: 7 additions & 6 deletions etc/filebeat.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ filebeat:
# Paths that should be crawled and fetched
paths:
#- /var/log/app*.log
- /var/log/*.log
- /var/log/*
# Type of the files. Annotated in every documented
type: syslog
# Optional additional fields
Expand All @@ -27,12 +27,13 @@ filebeat:
type: stdin
paths:
- "-"
spoolSize:
harvesterBufferSize:
cpuProfileFile:
idleTimeout:
tailOnRotate:
spoolsize:
harvesterbuffersize:
cpuprofilefile:
idletimeout:
tailonRotate:
quiet:
registryfile: .filebeat

############################# Shipper ############################################
shipper:
Expand Down
2 changes: 2 additions & 0 deletions etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ filebeat:
idleTimeout:
tailOnRotate:
quiet:
registryfile: .filebeat


############################# Shipper ############################################
shipper:
Expand Down
21 changes: 20 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
package main

// Files is needed as otherwise not test binary is generated
// This file is mandatory as otherwise the packetbeat.test binary is not generated correctly. Reason???

import (
"flag"
"testing"
)

var integration *bool

func init() {
integration = flag.Bool("integration", false, "Set to true when running integration tests")
}

// Test started when the test binary is started. Only calls main.
func TestIntegration(t *testing.T) {

if *integration {
main()
}
}

0 comments on commit 8bf0dad

Please sign in to comment.