Skip to content

Commit

Permalink
Another iteration of loader changes
Browse files Browse the repository at this point in the history
Following the pcap/pdml model
  • Loading branch information
gcla committed Oct 28, 2020
1 parent 41bb24d commit 1e31f81
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 103 deletions.
98 changes: 64 additions & 34 deletions capinfo/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,68 @@ func (c *Loader) StartLoad(pcap string, app gowid.IApp, cb ICapinfoCallbacks) {
func (c *Loader) loadCapinfoAsync(pcapf string, app gowid.IApp, cb ICapinfoCallbacks) {
c.capinfoCtx, c.capinfoCancelFn = context.WithCancel(c.mainCtx)

procChan := make(chan int)
pid := 0

defer func() {
if pid == 0 {
close(procChan)
}
}()

c.capinfoCmd = c.cmds.Capinfo(pcapf)

termshark.TrackedGo(func() {
var err error
var cmd pcap.IPcapCommand
origCmd := c.capinfoCmd
cancelled := c.capinfoCtx.Done()
procChan := procChan

kill := func() {
err := termshark.KillIfPossible(cmd)
if err != nil {
log.Infof("Did not kill tshark capinfos process: %v", err)
}
}

loop:
for {
select {
case pid := <-procChan:
procChan = nil
if pid != 0 {
cmd = origCmd
if cancelled == nil {
kill()
}
}

case <-cancelled:
cancelled = nil
if cmd != nil {
kill()
}
}

if cancelled == nil && procChan == nil {
break loop
}
}
if cmd != nil {
err = cmd.Wait()
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.capinfoCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
}
}, Goroutinewg)

capinfoOut, err := c.capinfoCmd.StdoutReader()
if err != nil {
pcap.HandleError(err, cb)
Expand All @@ -116,47 +176,17 @@ func (c *Loader) loadCapinfoAsync(pcapf string, app gowid.IApp, cb ICapinfoCallb

log.Infof("Started capinfo command %v with pid %d", c.capinfoCmd, c.capinfoCmd.Pid())

procWaitChan := make(chan error, 1)

defer func() {
procWaitChan <- c.capinfoCmd.Wait()
}()

termshark.TrackedGo(func() {
var err error
cancelled := c.capinfoCtx.Done()
loop:
for {
select {
case <-cancelled:
err := termshark.KillIfPossible(c.capinfoCmd)
if err != nil {
log.Infof("Did not kill capinfo process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.capinfoCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
break loop
}
}
c.capinfoCtx = nil
c.capinfoCancelFn = nil
}, Goroutinewg)
pid = c.capinfoCmd.Pid()
procChan <- pid

buf := new(bytes.Buffer)
buf.ReadFrom(capinfoOut)

ch := make(chan struct{})
cb.OnCapinfoData(buf.String(), ch)
<-ch

c.capinfoCancelFn()
}

//======================================================================
Expand Down
98 changes: 64 additions & 34 deletions convs/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,68 @@ func (c *Loader) StartLoad(pcap string, convs []string, filter string, abs bool,
func (c *Loader) loadConvAsync(pcapf string, convs []string, filter string, abs bool, resolve bool, app gowid.IApp, cb IConvsCallbacks) {
c.convsCtx, c.convsCancelFn = context.WithCancel(c.mainCtx)

procChan := make(chan int)
pid := 0

defer func() {
if pid == 0 {
close(procChan)
}
}()

c.convsCmd = c.cmds.Convs(pcapf, convs, filter, abs, resolve)

termshark.TrackedGo(func() {
var err error
var cmd pcap.IPcapCommand
origCmd := c.convsCmd
cancelled := c.convsCtx.Done()
procChan := procChan

kill := func() {
err := termshark.KillIfPossible(cmd)
if err != nil {
log.Infof("Did not kill tshark conv process: %v", err)
}
}

loop:
for {
select {
case pid := <-procChan:
procChan = nil
if pid != 0 {
cmd = origCmd
if cancelled == nil {
kill()
}
}

case <-cancelled:
cancelled = nil
if cmd != nil {
kill()
}
}

if cancelled == nil && procChan == nil {
break loop
}
}
if cmd != nil {
err = cmd.Wait()
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.convsCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
}
}, Goroutinewg)

convsOut, err := c.convsCmd.StdoutReader()
if err != nil {
pcap.HandleError(err, cb)
Expand All @@ -128,47 +188,17 @@ func (c *Loader) loadConvAsync(pcapf string, convs []string, filter string, abs

log.Infof("Started command %v with pid %d", c.convsCmd, c.convsCmd.Pid())

procWaitChan := make(chan error, 1)

defer func() {
procWaitChan <- c.convsCmd.Wait()
}()

termshark.TrackedGo(func() {
var err error
cancelled := c.convsCtx.Done()
loop:
for {
select {
case <-cancelled:
err := termshark.KillIfPossible(c.convsCmd)
if err != nil {
log.Infof("Did not kill tshark conv process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.convsCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
break loop
}
}
c.convsCtx = nil
c.convsCancelFn = nil
}, Goroutinewg)
pid = c.convsCmd.Pid()
procChan <- pid

buf := new(bytes.Buffer)
buf.ReadFrom(convsOut)

ch := make(chan struct{})
cb.OnData(buf.String(), ch)
<-ch

c.convsCancelFn()
}

//======================================================================
Expand Down
99 changes: 64 additions & 35 deletions streams/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,68 @@ type ISavedData interface {
func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int, app gowid.IApp, cb interface{}) {
c.streamCtx, c.streamCancelFn = context.WithCancel(c.mainCtx)

procChan := make(chan int)
pid := 0

defer func() {
if pid == 0 {
close(procChan)
}
}()

c.streamCmd = c.cmds.Stream(pcapf, proto, idx)

termshark.TrackedGo(func() {
var err error
var cmd pcap.IPcapCommand
origCmd := c.streamCmd
cancelled := c.streamCtx.Done()
procChan := procChan

kill := func() {
err := termshark.KillIfPossible(cmd)
if err != nil {
log.Infof("Did not kill tshark stream process: %v", err)
}
}

loop:
for {
select {
case pid := <-procChan:
procChan = nil
if pid != 0 {
cmd = origCmd
if cancelled == nil {
kill()
}
}

case <-cancelled:
cancelled = nil
if cmd != nil {
kill()
}
}

if cancelled == nil && procChan == nil {
break loop
}
}
if cmd != nil {
err = cmd.Wait()
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.streamCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
}
}, Goroutinewg)

streamOut, err := c.streamCmd.StdoutReader()
if err != nil {
pcap.HandleError(err, cb)
Expand All @@ -143,41 +203,8 @@ func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int,

log.Infof("Started stream reassembly command %v with pid %d", c.streamCmd, c.streamCmd.Pid())

procWaitChan := make(chan error, 1)

defer func() {
procWaitChan <- c.streamCmd.Wait()
}()

termshark.TrackedGo(func() {
// Wait for external cancellation. This is the shutdown procedure.
var err error
cancelled := c.streamCtx.Done()
loop:
for {
select {
case <-cancelled:
err = termshark.KillIfPossible(c.streamCmd)
if err != nil {
log.Infof("Did not kill stream reassembly process: %v", err)
}
cancelled = nil
case err = <-procWaitChan:
if !c.SuppressErrors && err != nil {
if _, ok := err.(*exec.ExitError); ok {
cerr := gowid.WithKVs(termshark.BadCommand, map[string]interface{}{
"command": c.streamCmd.String(),
"error": err,
})
pcap.HandleError(cerr, cb)
}
}
break loop
}
}
c.streamCtx = nil
c.streamCancelFn = nil
}, Goroutinewg)
pid = c.streamCmd.Pid()
procChan <- pid

var ops []Option
ops = append(ops, GlobalStore("app", app))
Expand All @@ -189,6 +216,8 @@ func (c *Loader) loadStreamReassemblyAsync(pcapf string, proto string, idx int,
log.Infof("Stream parser reported error: %v", err)
}
}()

c.streamCancelFn()
}

func (c *Loader) startStreamIndexerAsync(pcapf string, proto string, idx int, app gowid.IApp, cb IIndexerCallbacks) {
Expand Down

0 comments on commit 1e31f81

Please sign in to comment.