Skip to content

Commit

Permalink
Rearrange PSML loading code in the same manner as the other loaders
Browse files Browse the repository at this point in the history
  • Loading branch information
gcla committed Oct 20, 2020
1 parent fa38931 commit 6ce2f4e
Showing 1 changed file with 90 additions and 62 deletions.
152 changes: 90 additions & 62 deletions pcap/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1607,29 +1607,57 @@ func (c *Loader) loadPsmlAsync(cb interface{}) {
// a shutdown happens, and we get blocked in the XML parser, this will be able to
// respond

psmlWaitChan := make(chan error, 1)
tailWaitChan := make(chan error, 1)

termshark.TrackedGo(func() {
select {
case <-c.psmlCtx.Done():
intPsmlCancelFn() // start internal shutdown
case <-intPsmlCtx.Done():
}
loop:
for {
select {
case <-c.psmlCtx.Done():
intPsmlCancelFn() // start internal shutdown
case <-intPsmlCtx.Done():
if c.tailCmd != nil {
err := termshark.KillIfPossible(c.tailCmd)
if err != nil {
log.Infof("Did not kill tail process: %v", err)
}
}

if c.tailCmd != nil {
err := termshark.KillIfPossible(c.tailCmd)
if err != nil {
log.Infof("Did not kill tail process: %v", err)
}
}
if c.PsmlCmd != nil {
err := termshark.KillIfPossible(c.PsmlCmd)
if err != nil {
log.Infof("Did not kill psml process: %v", err)
}
}

if c.PsmlCmd != nil {
err := termshark.KillIfPossible(c.PsmlCmd)
if err != nil {
log.Infof("Did not kill psml process: %v", err)
}
}
if psmlOut != nil {
psmlOut.Close() // explicitly close else this goroutine can block
}
break loop
case err = <-psmlWaitChan:
if !c.SuppressErrors {
if err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.PsmlCmd.String(),
"error": err,
})
HandleError(cerr, cb)
}
}
}
// If the psml command generates an error, then we should stop any feed
// from the interface too.
c.stopLoadIface()

if psmlOut != nil {
psmlOut.Close() // explicitly close else this goroutine can block
case err = <-tailWaitChan:
// We need to explicitly close the write side of the pipe. Without this,
// the PSML process Wait() function won't complete, because golang won't
// complete termination until IO has finished, and io.Copy() will be stuck
// in a loop.
pw.Close()
}
}

}, Goroutinewg)
Expand Down Expand Up @@ -1712,25 +1740,14 @@ func (c *Loader) loadPsmlAsync(cb interface{}) {

log.Infof("Started PSML command %v with pid %d", c.PsmlCmd, c.PsmlCmd.Pid())

termshark.TrackedGo(func() {
psmlWaitChan <- c.PsmlCmd.Wait()
}, Goroutinewg)

defer func() {
// These need to close so the tailreader Read() terminates so that the
// PsmlCmd.Wait() below completes.
closePipe()
err := c.PsmlCmd.Wait()
if !c.SuppressErrors {
if err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.PsmlCmd.String(),
"error": err,
})
HandleError(cerr, cb)
}
}
// If the psml command generates an error, then we should stop any feed
// from the interface too.
c.stopLoadIface()
}
}()

//======================================================================
Expand Down Expand Up @@ -1805,16 +1822,9 @@ func (c *Loader) loadPsmlAsync(cb interface{}) {
return
}

// Do this in a goroutine - in a defer, it would block here before the code executes
defer func() {
c.tailCmd.Wait() // this will block the exit of this function until the command is killed

// We need to explicitly close the write side of the pipe. Without this,
// the PSML process Wait() function won't complete, because golang won't
// complete termination until IO has finished, and io.Copy() will be stuck
// in a loop.
pw.Close()
}()
termshark.TrackedGo(func() {
tailWaitChan <- c.tailCmd.Wait()
}, Goroutinewg)
}

//======================================================================
Expand Down Expand Up @@ -1956,12 +1966,42 @@ func (c *Loader) loadIfacesAsync(cb interface{}) {
return
}

procWaitChan := make(chan error, 1)

termshark.TrackedGo(func() {
// Wait for external cancellation. This is the shutdown procedure.
<-c.ifaceCtx.Done()
err := termshark.KillIfPossible(c.ifaceCmd)
if err != nil {
log.Infof("Did not kill iface reader process: %v", err)
procWaitChan <- c.ifaceCmd.Wait()
}, Goroutinewg)

proceedChan := make(chan struct{})

termshark.TrackedGo(func() {
var err error
cancelled := c.ifaceCtx.Done()
loop:
for {
select {
case <-cancelled:
err := termshark.KillIfPossible(c.ifaceCmd)
if err != nil {
log.Infof("Did not kill iface reader process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
// This could be if termshark is started like this: cat nosuchfile.pcap | termshark -i -
// Then dumpcap will be started with /dev/fd/3 as its stdin, but will fail with EOF and
// exit status 1.
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.ifaceCmd.String(),
"error": err,
})
HandleError(cerr, cb)
}
}
proceedChan <- struct{}{}
break loop
}
}
}, Goroutinewg)

Expand All @@ -1976,19 +2016,7 @@ func (c *Loader) loadIfacesAsync(cb interface{}) {
}
}()

err = c.ifaceCmd.Wait() // it definitely started, so we must wait
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
// This could be if termshark is started like this: cat nosuchfile.pcap | termshark -i -
// Then dumpcap will be started with /dev/fd/3 as its stdin, but will fail with EOF and
// exit status 1.
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.ifaceCmd.String(),
"error": err,
})
HandleError(cerr, cb)
}
}
<-proceedChan

// If something killed it, then start the internal shutdown procedure anyway to clean up
// goroutines waiting on the context. This could also happen if tshark -i is reading from
Expand Down

0 comments on commit 6ce2f4e

Please sign in to comment.