Skip to content

Commit

Permalink
Add tests and support for truncated files in filestream input (#24424)
Browse files Browse the repository at this point in the history
## What does this PR do?

Add support for truncated files and adds migrates related tests from `test_harvester.py`. It also adds more tests to cover the following cases:

* truncation is detected only by the `Prospector`
* truncation is detected first by the `Harvester` then by the `Prospector`
* truncation is detected first by the `Prospector` then by the `Harvester`
* file gets truncated when the output is not able to accept events

## Why is it important?

The support for stopping reading from truncated files was already implemented. However, `filestream` input could not start reading it from the beginning.

A new file system event is added called `OpTruncate`. When the size of a file has shrinked compared to the last time the scanner has encountered it, an `OpTruncate` event is emitted. When the prospector gets this event, the `HarvesterGroup` is restarting the `Harvester` of the file. Restarting basically means that the new `Harvester` cancels the previous reader and starts
  • Loading branch information
kvch committed Apr 7, 2021
1 parent 803e8ca commit 2875faf
Show file tree
Hide file tree
Showing 15 changed files with 624 additions and 61 deletions.
114 changes: 98 additions & 16 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/acker"
"github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"
Expand All @@ -56,7 +57,7 @@ type registryEntry struct {
Cursor struct {
Offset int `json:"offset"`
} `json:"cursor"`
Meta interface{} `json:"meta"`
Meta interface{} `json:"meta,omitempty"`
}

func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment {
Expand Down Expand Up @@ -223,6 +224,23 @@ func requireMetadataEquals(one, other fileMeta) bool {
return one == other
}

// waitUntilOffsetInRegistry waits for the expected offset is set for a file.
func (e *inputTestingEnvironment) waitUntilOffsetInRegistry(filename string, expectedOffset int) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
if err != nil {
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

id := getIDFromPath(filepath, fi)
entry, err := e.getRegistryState(id)
for err != nil || entry.Cursor.Offset != expectedOffset {
entry, err = e.getRegistryState(id)
}

require.Equal(e.t, expectedOffset, entry.Cursor.Offset)
}

func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) {
filepath := e.abspath(filename)
fi, err := os.Stat(filepath)
Expand All @@ -231,14 +249,12 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename string) {
}

inputStore, _ := e.stateStore.Access()

identifier, _ := newINodeDeviceIdentifier(nil)
src := identifier.GetSource(loginp.FSEvent{Info: fi, Op: loginp.OpCreate, NewPath: filepath})
id := getIDFromPath(filepath, fi)

var entry registryEntry
err = inputStore.Get(src.Name(), &entry)
err = inputStore.Get(id, &entry)
if err == nil {
e.t.Fatalf("key is not expected to be present '%s'", src.Name())
e.t.Fatalf("key is not expected to be present '%s'", id)
}
}

Expand Down Expand Up @@ -273,10 +289,7 @@ func getIDFromPath(filepath string, fi os.FileInfo) string {
// waitUntilEventCount waits until total count events arrive to the client.
func (e *inputTestingEnvironment) waitUntilEventCount(count int) {
for {
sum := 0
for _, c := range e.pipeline.clients {
sum += len(c.GetEvents())
}
sum := len(e.pipeline.GetAllEvents())
if sum == count {
return
}
Expand Down Expand Up @@ -355,18 +368,20 @@ func (s *testInputStore) CleanupInterval() time.Duration {
}

type mockClient struct {
publishes []beat.Event
publishing []beat.Event
published []beat.Event
ackHandler beat.ACKer
closed bool
mtx sync.Mutex
canceler context.CancelFunc
}

// GetEvents returns the published events
func (c *mockClient) GetEvents() []beat.Event {
c.mtx.Lock()
defer c.mtx.Unlock()

return c.publishes
return c.published
}

// Publish mocks the Client Publish method
Expand All @@ -379,11 +394,21 @@ func (c *mockClient) PublishAll(events []beat.Event) {
c.mtx.Lock()
defer c.mtx.Unlock()

c.publishing = append(c.publishing, events...)
for _, event := range events {
c.publishes = append(c.publishes, event)
c.ackHandler.AddEvent(event, true)
}
c.ackHandler.ACKEvents(len(events))

for _, event := range events {
c.published = append(c.published, event)
}
}

func (c *mockClient) waitUntilPublishingHasStarted() {
for len(c.publishing) == 0 {
time.Sleep(10 * time.Millisecond)
}
}

// Close mocks the Client Close method
Expand All @@ -401,12 +426,16 @@ func (c *mockClient) Close() error {

// mockPipelineConnector mocks the PipelineConnector interface
type mockPipelineConnector struct {
clients []*mockClient
mtx sync.Mutex
blocking bool
clients []*mockClient
mtx sync.Mutex
}

// GetAllEvents returns all events associated with a pipeline
func (pc *mockPipelineConnector) GetAllEvents() []beat.Event {
pc.mtx.Lock()
defer pc.mtx.Unlock()

var evList []beat.Event
for _, clientEvents := range pc.clients {
evList = append(evList, clientEvents.GetEvents()...)
Expand All @@ -425,11 +454,64 @@ func (pc *mockPipelineConnector) ConnectWith(config beat.ClientConfig) (beat.Cli
pc.mtx.Lock()
defer pc.mtx.Unlock()

ctx, cancel := context.WithCancel(context.Background())
c := &mockClient{
ackHandler: config.ACKHandler,
canceler: cancel,
ackHandler: newMockACKHandler(ctx, pc.blocking, config),
}

pc.clients = append(pc.clients, c)

return c, nil

}

func (pc *mockPipelineConnector) cancelAllClients() {
pc.mtx.Lock()
defer pc.mtx.Unlock()

for _, client := range pc.clients {
client.canceler()
}
}

func (pc *mockPipelineConnector) cancelClient(i int) {
pc.mtx.Lock()
defer pc.mtx.Unlock()

if len(pc.clients) < i+1 {
return
}

pc.clients[i].canceler()
}

func newMockACKHandler(starter context.Context, blocking bool, config beat.ClientConfig) beat.ACKer {
if !blocking {
return config.ACKHandler
}

return acker.Combine(blockingACKer(starter), config.ACKHandler)

}

func blockingACKer(starter context.Context) beat.ACKer {
return acker.EventPrivateReporter(func(acked int, private []interface{}) {
for starter.Err() == nil {
}
})
}

func (pc *mockPipelineConnector) clientsCount() int {
pc.mtx.Lock()
defer pc.mtx.Unlock()

return len(pc.clients)
}

func (pc *mockPipelineConnector) invertBlocking() {
pc.mtx.Lock()
defer pc.mtx.Unlock()

pc.blocking = !pc.blocking
}
50 changes: 34 additions & 16 deletions filebeat/input/filestream/fswatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,22 @@ type fileScanner struct {
type fileWatcherConfig struct {
// Interval is the time between two scans.
Interval time.Duration `config:"check_interval"`
// ResendOnModTime if a file has been changed according to modtime but the size is the same
// it is still considered truncation.
ResendOnModTime bool `config:"resend_on_touch"`
// Scanner is the configuration of the scanner.
Scanner fileScannerConfig `config:",inline"`
}

// fileWatcher gets the list of files from a FSWatcher and creates events by
// comparing the files between its last two runs.
type fileWatcher struct {
interval time.Duration
prev map[string]os.FileInfo
scanner loginp.FSScanner
log *logp.Logger
events chan loginp.FSEvent
interval time.Duration
resendOnModTime bool
prev map[string]os.FileInfo
scanner loginp.FSScanner
log *logp.Logger
events chan loginp.FSEvent
}

func newFileWatcher(paths []string, ns *common.ConfigNamespace) (loginp.FSWatcher, error) {
Expand Down Expand Up @@ -98,18 +102,20 @@ func newScannerWatcher(paths []string, c *common.Config) (loginp.FSWatcher, erro
return nil, err
}
return &fileWatcher{
log: logp.NewLogger(watcherDebugKey),
interval: config.Interval,
prev: make(map[string]os.FileInfo, 0),
scanner: scanner,
events: make(chan loginp.FSEvent),
log: logp.NewLogger(watcherDebugKey),
interval: config.Interval,
resendOnModTime: config.ResendOnModTime,
prev: make(map[string]os.FileInfo, 0),
scanner: scanner,
events: make(chan loginp.FSEvent),
}, nil
}

func defaultFileWatcherConfig() fileWatcherConfig {
return fileWatcherConfig{
Interval: 10 * time.Second,
Scanner: defaultFileScannerConfig(),
Interval: 10 * time.Second,
ResendOnModTime: false,
Scanner: defaultFileScannerConfig(),
}
}

Expand Down Expand Up @@ -142,10 +148,18 @@ func (w *fileWatcher) watch(ctx unison.Canceler) {
}

if prevInfo.ModTime() != info.ModTime() {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
if prevInfo.Size() > info.Size() || w.resendOnModTime && prevInfo.Size() == info.Size() {
select {
case <-ctx.Done():
return
case w.events <- truncateEvent(path, info):
}
} else {
select {
case <-ctx.Done():
return
case w.events <- writeEvent(path, info):
}
}
}

Expand Down Expand Up @@ -198,6 +212,10 @@ func writeEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpWrite, OldPath: path, NewPath: path, Info: fi}
}

func truncateEvent(path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpTruncate, OldPath: path, NewPath: path, Info: fi}
}

func renamedEvent(oldPath, path string, fi os.FileInfo) loginp.FSEvent {
return loginp.FSEvent{Op: loginp.OpRename, OldPath: oldPath, NewPath: path, Info: fi}
}
Expand Down
9 changes: 6 additions & 3 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ type fileIdentifier interface {
// fileSource implements the Source interface
// It is required to identify and manage file sources.
type fileSource struct {
info os.FileInfo
newPath string
oldPath string
info os.FileInfo
newPath string
oldPath string
truncated bool

name string
identifierGenerator string
Expand Down Expand Up @@ -103,6 +104,7 @@ func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: i.name + identitySep + file.GetOSState(e.Info).String(),
identifierGenerator: i.name,
}
Expand Down Expand Up @@ -140,6 +142,7 @@ func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: p.name + identitySep + path,
identifierGenerator: p.name,
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/filestream/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource {
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
truncated: e.Op == loginp.OpTruncate,
name: i.name + identitySep + osstate.InodeString() + "-" + i.markerContents(),
identifierGenerator: i.name,
}
Expand Down
Loading

0 comments on commit 2875faf

Please sign in to comment.