diff --git a/pcap/loader.go b/pcap/loader.go index 07a0106..0895551 100644 --- a/pcap/loader.go +++ b/pcap/loader.go @@ -1607,29 +1607,57 @@ func (c *Loader) loadPsmlAsync(cb interface{}) { // a shutdown happens, and we get blocked in the XML parser, this will be able to // respond + psmlWaitChan := make(chan error, 1) + tailWaitChan := make(chan error, 1) + termshark.TrackedGo(func() { - select { - case <-c.psmlCtx.Done(): - intPsmlCancelFn() // start internal shutdown - case <-intPsmlCtx.Done(): - } + loop: + for { + select { + case <-c.psmlCtx.Done(): + intPsmlCancelFn() // start internal shutdown + case <-intPsmlCtx.Done(): + if c.tailCmd != nil { + err := termshark.KillIfPossible(c.tailCmd) + if err != nil { + log.Infof("Did not kill tail process: %v", err) + } + } - if c.tailCmd != nil { - err := termshark.KillIfPossible(c.tailCmd) - if err != nil { - log.Infof("Did not kill tail process: %v", err) - } - } + if c.PsmlCmd != nil { + err := termshark.KillIfPossible(c.PsmlCmd) + if err != nil { + log.Infof("Did not kill psml process: %v", err) + } + } - if c.PsmlCmd != nil { - err := termshark.KillIfPossible(c.PsmlCmd) - if err != nil { - log.Infof("Did not kill psml process: %v", err) - } - } + if psmlOut != nil { + psmlOut.Close() // explicitly close else this goroutine can block + } + break loop + case err = <-psmlWaitChan: + if !c.SuppressErrors { + if err != nil { + if _, ok := err.(*exec.ExitError); ok { + cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{ + "command": c.PsmlCmd.String(), + "error": err, + }) + HandleError(cerr, cb) + } + } + } + // If the psml command generates an error, then we should stop any feed + // from the interface too. + c.stopLoadIface() - if psmlOut != nil { - psmlOut.Close() // explicitly close else this goroutine can block + case err = <-tailWaitChan: + // We need to explicitly close the write side of the pipe. Without this, + // the PSML process Wait() function won't complete, because golang won't + // complete termination until IO has finished, and io.Copy() will be stuck + // in a loop. + pw.Close() + } } }, Goroutinewg) @@ -1712,25 +1740,14 @@ func (c *Loader) loadPsmlAsync(cb interface{}) { log.Infof("Started PSML command %v with pid %d", c.PsmlCmd, c.PsmlCmd.Pid()) + termshark.TrackedGo(func() { + psmlWaitChan <- c.PsmlCmd.Wait() + }, Goroutinewg) + defer func() { // These need to close so the tailreader Read() terminates so that the // PsmlCmd.Wait() below completes. closePipe() - err := c.PsmlCmd.Wait() - if !c.SuppressErrors { - if err != nil { - if _, ok := err.(*exec.ExitError); ok { - cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{ - "command": c.PsmlCmd.String(), - "error": err, - }) - HandleError(cerr, cb) - } - } - // If the psml command generates an error, then we should stop any feed - // from the interface too. - c.stopLoadIface() - } }() //====================================================================== @@ -1805,16 +1822,9 @@ func (c *Loader) loadPsmlAsync(cb interface{}) { return } - // Do this in a goroutine - in a defer, it would block here before the code executes - defer func() { - c.tailCmd.Wait() // this will block the exit of this function until the command is killed - - // We need to explicitly close the write side of the pipe. Without this, - // the PSML process Wait() function won't complete, because golang won't - // complete termination until IO has finished, and io.Copy() will be stuck - // in a loop. - pw.Close() - }() + termshark.TrackedGo(func() { + tailWaitChan <- c.tailCmd.Wait() + }, Goroutinewg) } //====================================================================== @@ -1956,12 +1966,42 @@ func (c *Loader) loadIfacesAsync(cb interface{}) { return } + procWaitChan := make(chan error, 1) + termshark.TrackedGo(func() { - // Wait for external cancellation. This is the shutdown procedure. - <-c.ifaceCtx.Done() - err := termshark.KillIfPossible(c.ifaceCmd) - if err != nil { - log.Infof("Did not kill iface reader process: %v", err) + procWaitChan <- c.ifaceCmd.Wait() + }, Goroutinewg) + + proceedChan := make(chan struct{}) + + termshark.TrackedGo(func() { + var err error + cancelled := c.ifaceCtx.Done() + loop: + for { + select { + case <-cancelled: + err := termshark.KillIfPossible(c.ifaceCmd) + if err != nil { + log.Infof("Did not kill iface reader process: %v", err) + } + cancelled = nil + case err = <-procWaitChan: + if !c.SuppressErrors && err != nil { + if _, ok := err.(*exec.ExitError); ok { + // This could be if termshark is started like this: cat nosuchfile.pcap | termshark -i - + // Then dumpcap will be started with /dev/fd/3 as its stdin, but will fail with EOF and + // exit status 1. + cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{ + "command": c.ifaceCmd.String(), + "error": err, + }) + HandleError(cerr, cb) + } + } + proceedChan <- struct{}{} + break loop + } } }, Goroutinewg) @@ -1976,19 +2016,7 @@ func (c *Loader) loadIfacesAsync(cb interface{}) { } }() - err = c.ifaceCmd.Wait() // it definitely started, so we must wait - if !c.SuppressErrors && err != nil { - if _, ok := err.(*exec.ExitError); ok { - // This could be if termshark is started like this: cat nosuchfile.pcap | termshark -i - - // Then dumpcap will be started with /dev/fd/3 as its stdin, but will fail with EOF and - // exit status 1. - cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{ - "command": c.ifaceCmd.String(), - "error": err, - }) - HandleError(cerr, cb) - } - } + <-proceedChan // If something killed it, then start the internal shutdown procedure anyway to clean up // goroutines waiting on the context. This could also happen if tshark -i is reading from