Skip to content

Commit

Permalink
Simplify the pcap and pdml loaders
Browse files Browse the repository at this point in the history
Another in the series of changes intended to rule out races where
termshark will send SIGTERM to tshark processes that have already
terminated.
  • Loading branch information
gcla committed Oct 25, 2020
1 parent 7ea620e commit 56ba55a
Showing 1 changed file with 75 additions and 77 deletions.
152 changes: 75 additions & 77 deletions pcap/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,6 @@ type Loader struct {
ifaceCtx context.Context // cancels the iface reader process
ifaceCancelFn context.CancelFunc

//psmlDecodingProcessChan chan struct{} // signalled by psml load stage when the XML decoding is complete - signals rest of stage 1 to shut down
stage2GoroutineDoneChan chan struct{} // signalled by a goroutine in stage 2 for pcap/pdml - always signalled at end. When x2, signals rest of stage 2 to shut down

//stage1Wg sync.WaitGroup
stage2Wg sync.WaitGroup

Expand Down Expand Up @@ -225,15 +222,14 @@ func NewPcapLoader(cmds ILoaderCmds, opts ...Options) *Loader {
}

res := &Loader{
cmds: cmds,
IfaceFinishedChan: make(chan struct{}),
stage2GoroutineDoneChan: make(chan struct{}),
PsmlFinishedChan: make(chan struct{}),
Stage2FinishedChan: make(chan struct{}),
onStateChange: make([]runFnInState, 0),
RowCurrentlyLoading: -1,
highestCachedRow: -1,
opt: opt,
cmds: cmds,
IfaceFinishedChan: make(chan struct{}),
PsmlFinishedChan: make(chan struct{}),
Stage2FinishedChan: make(chan struct{}),
onStateChange: make([]runFnInState, 0),
RowCurrentlyLoading: -1,
highestCachedRow: -1,
opt: opt,
}

res.resetData()
Expand Down Expand Up @@ -1046,7 +1042,6 @@ func (c *Loader) signalStage2Starting(cb interface{}) {
// - cancel ticker with ctxt2
// - wait for all to shut down
// - final UI update
//func loadPcapAsync(ctx context.Context, pcapFile string, filter string, app gowid.IApp) error {
func (c *Loader) loadPcapAsync(row int, cb interface{}) {

// Used to cancel the tickers below which update list widgets with the latest data and
Expand All @@ -1058,8 +1053,6 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {
c.stage2Ctx, c.stage2CancelFn = context.WithCancel(c.thisSrcCtx)
c.Unlock()

intStage2Ctx, intStage2CancelFn := context.WithCancel(context.Background())

// Set to true by a goroutine started within here if ctxCancel() is called i.e. the outer context
var stageIsCancelled int32
c.startPdmlChan = make(chan struct{})
Expand Down Expand Up @@ -1089,36 +1082,12 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {
sidx := -1
eidx := -1

// When we start a command (in service of loading pcaps), add it to this list. Then we wait
// for finished signals on a channel -
//procs := []ICommand{}

// signal to updater that we're about to start. This will block until cb completes
c.signalStage2Starting(cb)

// This should correctly wait for all resources, no matter where in the process of creating them
// an interruption or error occurs
defer func() {
procsDoneCount := 0
L:
for {
// pdml and psml make 2
select {
// Don't need to wait for ctx.Done. if that gets cancelled, then it will propagate
// to context2. The two tshark processes will wait on context2.Done, and complete -
// then their defer blocks will send procDoneChan messages. When the count hits 2, this
// select block will exit. Note that we also issue a cancel if count==2 because it might
// just be that the tshark processes finish normally - then we need to stop the other
// goroutines using ctxt2.
case <-c.stage2GoroutineDoneChan:
procsDoneCount++
if procsDoneCount == 2 {
intStage2CancelFn() // stop the ticker
break L
}
}
}

// Wait for all other goroutines to complete
c.stage2Wg.Wait()

Expand All @@ -1139,8 +1108,6 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {
case <-c.stage2Ctx.Done():
setCancelled()
return
case <-intStage2Ctx.Done():
return // shutdown signalled - don't start the pdml/pcap processes
}

// Do this - but if we're cancelled first (stage2Ctx.Done), then they
Expand Down Expand Up @@ -1206,23 +1173,69 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {

//======================================================================

var pdmlWaitChan chan error
var pcapWaitChan chan error
var stateChan chan struct{} = make(chan struct{})

termshark.TrackedGo(func() {
var err error
stage2CtxChan := c.stage2Ctx.Done()

loop:
for {
select {

case <-stateChan:
// so the select loop picks up new values of other channels (nil -> non-nil)

case <-pdmlWaitChan:
close(pdmlWaitChan)
pdmlWaitChan = nil

case <-pcapWaitChan:
close(pcapWaitChan)
pcapWaitChan = nil

case <-stage2CtxChan:
stage2CtxChan = nil

setCancelled()
if pcapWaitChan != nil {
err := termshark.KillIfPossible(c.PcapCmd)
if err != nil {
log.Infof("Did not kill pcap process: %v", err)
}
}
if pdmlWaitChan != nil {
err = termshark.KillIfPossible(c.PdmlCmd)
if err != nil {
log.Infof("Did not kill pdml process: %v", err)
}
}
}

// Make sure the first time we fall through here that it's not the case
// that neither process has started yet, because neither will start now
if pdmlWaitChan == nil && pcapWaitChan == nil {
break loop
}
}

}, Goroutinewg)

//======================================================================

//
// Goroutine to run pdml process
//
termshark.TrackedGo(func() {
defer func() {
c.stage2GoroutineDoneChan <- struct{}{}
}()

// Wait for stage 2 to be kicked off (potentially by psml load, then mapping table row to frame num); or
// quit if that happens first
select {
case <-c.startPdmlChan:
case <-c.stage2Ctx.Done():
setCancelled()
return
case <-intStage2Ctx.Done():
return
}

c.Lock()
Expand All @@ -1244,8 +1257,11 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {

log.Infof("Started PDML command %v with pid %d", c.PdmlCmd, c.PdmlCmd.Pid())

pdmlWaitChan = make(chan error, 1)
stateChan <- struct{}{}

defer func() {
c.PdmlCmd.Wait()
pdmlWaitChan <- c.PdmlCmd.Wait()
}()

d := xml.NewDecoder(pdmlOut)
Expand Down Expand Up @@ -1283,7 +1299,6 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {
}
packets = append(packets, cpacket)
c.updateCacheEntryWithPdml(row, packets, false)
//if len(pdml2) == abcdex {
if len(packets) == c.KillAfterReadingThisMany {
// Shortcut - we never take more than abcdex - so just kill here
issuedKill = true
Expand Down Expand Up @@ -1330,19 +1345,13 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {
// Goroutine to run pcap process
//
termshark.TrackedGo(func() {
defer func() {
c.stage2GoroutineDoneChan <- struct{}{}
}()

// Wait for stage 2 to be kicked off (potentially by psml load, then mapping table row to frame num); or
// quit if that happens first
select {
case <-c.startPcapChan:
case <-c.stage2Ctx.Done():
setCancelled()
return
case <-intStage2Ctx.Done():
return
}

c.Lock()
Expand All @@ -1365,8 +1374,11 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {

log.Infof("Started pcap command %v with pid %d", c.PcapCmd, c.PcapCmd.Pid())

pcapWaitChan = make(chan error, 1)
stateChan <- struct{}{}

defer func() {
c.PcapCmd.Wait()
pcapWaitChan <- c.PcapCmd.Wait()
}()

packets := make([][]byte, 0, c.opt.PacketsPerLoad)
Expand Down Expand Up @@ -1429,27 +1441,6 @@ func (c *Loader) loadPcapAsync(row int, cb interface{}) {
}

}, &c.stage2Wg, Goroutinewg)

//
// Goroutine to track an external shutdown - kills processes in case the external
// shutdown comes first. If it's an internal shutdown, no need to kill because
// that would only be triggered once processes are dead
//
termshark.TrackedGo(func() {
select {
case <-c.stage2Ctx.Done():
setCancelled()
err := termshark.KillIfPossible(c.PcapCmd)
if err != nil {
log.Infof("Did not kill pcap process: %v", err)
}
err = termshark.KillIfPossible(c.PdmlCmd)
if err != nil {
log.Infof("Did not kill pdml process: %v", err)
}
case <-intStage2Ctx.Done():
}
}, Goroutinewg)
}

func (c *Loader) TurnOffPipe() {
Expand Down Expand Up @@ -1609,13 +1600,18 @@ func (c *Loader) loadPsmlAsync(cb interface{}) {

var psmlWaitChan chan error
var tailWaitChan chan error
var stateChan chan struct{} = make(chan struct{})

termshark.TrackedGo(func() {
psmlCtxChan := c.psmlCtx.Done()
intPsmlCtxChan := intPsmlCtx.Done()
loop:
for {
select {
case <-stateChan:
// do nothing, just allow select to pick up new values of channels
// that might change from nil to non-nil

case <-psmlCtxChan:
intPsmlCancelFn() // start internal shutdown
psmlCtxChan = nil
Expand Down Expand Up @@ -1754,6 +1750,7 @@ func (c *Loader) loadPsmlAsync(cb interface{}) {
}

psmlWaitChan = make(chan error, 1)
stateChan <- struct{}{}

log.Infof("Started PSML command %v with pid %d", c.PsmlCmd, c.PsmlCmd.Pid())

Expand Down Expand Up @@ -1838,6 +1835,7 @@ func (c *Loader) loadPsmlAsync(cb interface{}) {
}

tailWaitChan = make(chan error, 1)
stateChan <- struct{}{}

defer func() {
tailWaitChan <- c.tailCmd.Wait()
Expand Down

0 comments on commit 56ba55a

Please sign in to comment.