Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cursor input and manager implementation #19571

Merged
merged 2 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 118 additions & 2 deletions filebeat/input/v2/input-cursor/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
package cursor

import (
"context"
"fmt"
"runtime/debug"
"time"

"github.com/urso/sderr"

"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Input interface for cursor based inputs. This interface must be implemented
Expand Down Expand Up @@ -62,7 +70,29 @@ func (inp *managedInput) Name() string { return inp.input.Name() }

// Test runs the Test method for each configured source.
func (inp *managedInput) Test(ctx input.TestContext) error {
panic("TODO: implement me")
var grp unison.MultiErrGroup
for _, source := range inp.sources {
source := source
grp.Go(func() (err error) {
return inp.testSource(ctx, source)
})
}

errs := grp.Wait()
if len(errs) > 0 {
return sderr.WrapAll(errs, "input tests failed")
}
return nil
}

func (inp *managedInput) testSource(ctx input.TestContext, source Source) (err error) {
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack())
ctx.Logger.Errorf("Input crashed with: %+v", err)
}
}()
return inp.input.Test(source, ctx)
}

// Run creates a go-routine per source, waiting until all go-routines have
Expand All @@ -73,7 +103,68 @@ func (inp *managedInput) Run(
ctx input.Context,
pipeline beat.PipelineConnector,
) (err error) {
panic("TODO: implement me")
// Setup cancellation using a custom cancel context. All workers will be
// stopped if one failed badly by returning an error.
cancelCtx, cancel := context.WithCancel(ctxtool.FromCanceller(ctx.Cancelation))
defer cancel()
ctx.Cancelation = cancelCtx

var grp unison.MultiErrGroup
for _, source := range inp.sources {
source := source
grp.Go(func() (err error) {
// refine per worker context
inpCtx := ctx
inpCtx.ID = ctx.ID + "::" + source.Name()
inpCtx.Logger = ctx.Logger.With("source", source.Name())

if err = inp.runSource(inpCtx, inp.manager.store, source, pipeline); err != nil {
cancel()
}
return err
})
}

if errs := grp.Wait(); len(errs) > 0 {
return sderr.WrapAll(errs, "input %{id} failed", ctx.ID)
}
return nil
}

func (inp *managedInput) runSource(
ctx input.Context,
store *store,
source Source,
pipeline beat.PipelineConnector,
) (err error) {
defer func() {
if v := recover(); v != nil {
err = fmt.Errorf("input panic with: %+v\n%s", v, debug.Stack())
ctx.Logger.Errorf("Input crashed with: %+v", err)
}
}()

client, err := pipeline.ConnectWith(beat.ClientConfig{
CloseRef: ctx.Cancelation,
ACKEvents: newInputACKHandler(ctx.Logger),
})
if err != nil {
return err
}
defer client.Close()

resourceKey := inp.createSourceID(source)
resource, err := inp.manager.lock(ctx, resourceKey)
if err != nil {
return err
}
defer releaseResource(resource)

store.UpdateTTL(resource, inp.cleanTimeout)

cursor := makeCursor(store, resource)
publisher := &cursorPublisher{canceler: ctx.Cancelation, client: client, cursor: &cursor}
return inp.input.Run(ctx, source, cursor, publisher)
}

func (inp *managedInput) createSourceID(s Source) string {
Expand All @@ -82,3 +173,28 @@ func (inp *managedInput) createSourceID(s Source) string {
}
return fmt.Sprintf("%v::%v", inp.manager.Type, s.Name())
}

func newInputACKHandler(log *logp.Logger) func([]interface{}) {
return func(private []interface{}) {
var n uint
var last int
for i := 0; i < len(private); i++ {
current := private[i]
if current == nil {
continue
}

if _, ok := current.(*updateOp); !ok {
continue
}

n++
last = i
}

if n == 0 {
return
}
private[last].(*updateOp).Execute(n)
}
}
112 changes: 107 additions & 5 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package cursor

import (
"errors"
"sync"
"time"

"github.com/urso/sderr"

"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/statestore"

"github.com/elastic/go-concert/unison"
)

// InputManager is used to create, manage, and coordinate stateful inputs and
Expand Down Expand Up @@ -60,7 +64,9 @@ type InputManager struct {
// that will be used to collect events from each source.
Configure func(cfg *common.Config) ([]Source, Input, error)

store *store
initOnce sync.Once
initErr error
store *store
}

// Source describe a source the input can collect data from.
Expand All @@ -70,22 +76,118 @@ type Source interface {
Name() string
}

var errNoSourceConfigured = errors.New("no source has been configured")
var errNoInputRunner = errors.New("no input runner available")

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access() (*statestore.Store, error)
CleanupInterval() time.Duration
}

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {
if cim.DefaultCleanTimeout <= 0 {
cim.DefaultCleanTimeout = 30 * time.Minute
}

log := cim.Logger.With("input_type", cim.Type)
var store *store
store, cim.initErr = openStore(log, cim.StateStore, cim.Type)
if cim.initErr != nil {
return
}

cim.store = store
})

return cim.initErr
}

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
panic("TODO: implement me")
if mode != v2.ModeRun {
return nil
}

if err := cim.init(); err != nil {
return err
}

log := cim.Logger.With("input_type", cim.Type)

store := cim.store
cleaner := &cleaner{log: log}
store.Retain()
err := group.Go(func(canceler unison.Canceler) error {
defer cim.shutdown()
defer store.Release()
interval := cim.StateStore.CleanupInterval()
if interval <= 0 {
interval = 5 * time.Minute
}
cleaner.run(canceler, store, interval)
return nil
})
if err != nil {
store.Release()
cim.shutdown()
return sderr.Wrap(err, "Can not start registry cleanup process")
}

return nil
}

func (cim *InputManager) shutdown() {
cim.store.Release()
}

// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *common.Config) (input.Input, error) {
panic("TODO: implement me")
if err := cim.init(); err != nil {
return nil, err
}

settings := struct {
ID string `config:"id"`
CleanTimeout time.Duration `config:"clean_timeout"`
}{ID: "", CleanTimeout: cim.DefaultCleanTimeout}
if err := config.Unpack(&settings); err != nil {
return nil, err
}

sources, inp, err := cim.Configure(config)
if err != nil {
return nil, err
}
if len(sources) == 0 {
return nil, errNoSourceConfigured
}
if inp == nil {
return nil, errNoInputRunner
}

return &managedInput{
manager: cim,
userID: settings.ID,
sources: sources,
input: inp,
cleanTimeout: settings.CleanTimeout,
}, nil
}

// Lock locks a key for exclusive access and returns an resource that can be used to modify
// the cursor state and unlock the key.
func (cim *InputManager) lock(ctx input.Context, key string) (*resource, error) {
resource := cim.store.Get(key)
err := lockResource(ctx.Logger, resource, ctx.Cancelation)
if err != nil {
resource.Release()
return nil, err
}
return resource, nil
}

func lockResource(log *logp.Logger, resource *resource, canceler input.Canceler) error {
Expand Down