Skip to content

Commit

Permalink
Eliminate a misleading warning message about failing to kill a proc
Browse files Browse the repository at this point in the history
This occurs when closing termshark after having used the stream
reassembly or conversations function. The warning is printed because
there is a race between the code that runs if the Golang context is
cancelled to indicate the stream/conv processes should end
early (e.g. user hits q while they are still running) and then has to
kill the process just in case it's running still AND the deferred
function that issues a waitpid().

The solution is to wait for both the process to complete AND for
cancellation in the same select statement. If cancellation, then we
haven't called waitpid() yet, so issue a kill (the pid won't be recycled
for sure); if the process has completed/died, then cleanup and break the
select loop.
  • Loading branch information
gcla committed Oct 19, 2020
1 parent 796f4cf commit bc0c2fb
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 66 deletions.
50 changes: 28 additions & 22 deletions convs/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ func (c *Loader) StartLoad(pcap string, convs []string, filter string, abs bool,
func (c *Loader) loadConvAsync(pcapf string, convs []string, filter string, abs bool, resolve bool, app gowid.IApp, cb IConvsCallbacks) {
c.convsCtx, c.convsCancelFn = context.WithCancel(c.mainCtx)

defer func() {
c.convsCtx = nil
c.convsCancelFn = nil
}()

c.convsCmd = c.cmds.Convs(pcapf, convs, filter, abs, resolve)

convsOut, err := c.convsCmd.StdoutReader()
Expand Down Expand Up @@ -133,26 +128,39 @@ func (c *Loader) loadConvAsync(pcapf string, convs []string, filter string, abs

log.Infof("Started command %v with pid %d", c.convsCmd, c.convsCmd.Pid())

procWaitChan := make(chan error, 1)

defer func() {
err = c.convsCmd.Wait() // it definitely started, so we must wait
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.convsCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
procWaitChan <- c.convsCmd.Wait()
}()

termshark.TrackedGo(func() {
// Wait for external cancellation. This is the shutdown procedure.
<-c.convsCtx.Done()
err := termshark.KillIfPossible(c.convsCmd)
if err != nil {
log.Infof("Did not kill tshark conv process: %v", err)
var err error
cancelled := c.convsCtx.Done()
loop:
for {
select {
case <-cancelled:
err := termshark.KillIfPossible(c.convsCmd)
if err != nil {
log.Infof("Did not kill tshark conv process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.convsCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
break loop
}
}
c.convsCtx = nil
c.convsCancelFn = nil
}, Goroutinewg)

buf := new(bytes.Buffer)
Expand All @@ -161,8 +169,6 @@ func (c *Loader) loadConvAsync(pcapf string, convs []string, filter string, abs
ch := make(chan struct{})
cb.OnData(buf.String(), ch)
<-ch

c.StopLoad()
}

//======================================================================
Expand Down
98 changes: 57 additions & 41 deletions streams/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,6 @@ type ISavedData interface {
func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int, app gowid.IApp, cb interface{}) {
c.streamCtx, c.streamCancelFn = context.WithCancel(c.mainCtx)

defer func() {
c.streamCtx = nil
c.streamCancelFn = nil
}()

c.streamCmd = c.cmds.Stream(pcapf, proto, idx)

streamOut, err := c.streamCmd.StdoutReader()
Expand All @@ -148,26 +143,40 @@ func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int,

log.Infof("Started stream reassembly command %v with pid %d", c.streamCmd, c.streamCmd.Pid())

procWaitChan := make(chan error, 1)

defer func() {
err = c.streamCmd.Wait() // it definitely started, so we must wait
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.streamCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
procWaitChan <- c.streamCmd.Wait()
}()

termshark.TrackedGo(func() {
// Wait for external cancellation. This is the shutdown procedure.
<-c.streamCtx.Done()
err := termshark.KillIfPossible(c.streamCmd)
if err != nil {
log.Infof("Did not kill stream reassembly process: %v", err)
var err error
cancelled := c.streamCtx.Done()
loop:
for {
select {
case <-cancelled:
err = termshark.KillIfPossible(c.streamCmd)
if err != nil {
log.Infof("Did not kill stream reassembly process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.streamCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
break loop
}
}
c.streamCtx = nil
c.streamCancelFn = nil
}, Goroutinewg)

var ops []Option
Expand All @@ -187,11 +196,6 @@ func (c *Loader) startStreamIndexerAsync(pcapf string, proto string, idx int, ap

c.indexerCtx, c.indexerCancelFn = context.WithCancel(c.mainCtx)

defer func() {
c.indexerCtx = nil
c.indexerCancelFn = nil
}()

c.indexerCmd = c.cmds.Indexer(pcapf, proto, idx)

streamOut, err := c.indexerCmd.StdoutReader()
Expand All @@ -215,28 +219,40 @@ func (c *Loader) startStreamIndexerAsync(pcapf string, proto string, idx int, ap

log.Infof("Started stream indexer command %v with pid %d", c.indexerCmd, c.indexerCmd.Pid())

procWaitChan := make(chan error, 1)

defer func() {
err = c.indexerCmd.Wait() // it definitely started, so we must wait
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.indexerCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
procWaitChan <- c.indexerCmd.Wait()
}()

termshark.TrackedGo(func() {
// Wait for external cancellation. This is the shutdown procedure.
<-c.indexerCtx.Done()
err := termshark.KillIfPossible(c.indexerCmd)
if err != nil {
log.Infof("Did not kill indexer process: %v", err)
var err error
cancelled := c.indexerCtx.Done()
loop:
for {
select {
case <-cancelled:
err = termshark.KillIfPossible(c.indexerCmd)
if err != nil {
log.Infof("Did not kill indexer process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.indexerCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
streamOut.Close()
break loop
}
}
// Stop main loop
streamOut.Close()
c.indexerCtx = nil
c.indexerCancelFn = nil
}, Goroutinewg)

res = decodeStreamXml(streamOut, proto, c.indexerCtx, cb)
Expand Down
6 changes: 3 additions & 3 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,10 +604,10 @@ func KillIfPossible(p IProcess) error {
return nil
}
err := p.Kill()
if !errProcessAlreadyFinished(err) {
return err
} else {
if errProcessAlreadyFinished(err) {
return nil
} else {
return err
}
}

Expand Down

0 comments on commit bc0c2fb

Please sign in to comment.