diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 8e4fc3d799a2..e5de5aabd05c 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -39,6 +39,7 @@ import ( "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" + "github.com/elastic/go-concert/unison" ) type inputTestingEnvironment struct { @@ -50,7 +51,8 @@ type inputTestingEnvironment struct { pluginInitOnce sync.Once plugin v2.Plugin - wg sync.WaitGroup + wg sync.WaitGroup + grp unison.TaskGroup } type registryEntry struct { @@ -70,7 +72,9 @@ func newInputTestingEnvironment(t *testing.T) *inputTestingEnvironment { } func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{}) v2.Input { + e.grp = unison.TaskGroup{} manager := e.getManager() + manager.Init(&e.grp, v2.ModeRun) c := common.MustNewConfigFrom(config) inp, err := manager.Create(c) if err != nil { @@ -88,12 +92,13 @@ func (e *inputTestingEnvironment) getManager() v2.InputManager { func (e *inputTestingEnvironment) startInput(ctx context.Context, inp v2.Input) { e.wg.Add(1) - go func(wg *sync.WaitGroup) { + go func(wg *sync.WaitGroup, grp *unison.TaskGroup) { defer wg.Done() + defer grp.Stop() inputCtx := input.Context{Logger: logp.L(), Cancelation: ctx} inp.Run(inputCtx, e.pipeline) - }(&e.wg) + }(&e.wg, &e.grp) } func (e *inputTestingEnvironment) waitUntilInputStops() {