Skip to content

Commit

Permalink
Bug fix - resolve a deadlock(!) in the event handling.
Browse files Browse the repository at this point in the history
This does not really fix the design, but resolves the issue in
practice. The stream reassembly parser issues a callback every time it
receives a piece of the stream from tshark. Prior to this change, I
issued a call to app.Run() to update the UI with this data. For large
pcaps with many chunks, this could call app.Run() hundreds of times per
second. The event handling queue is only 1000 long, and a deadlock could
result if

(a) the UI is running a callback on the app goroutine
(b) meanwhile, 1000 pieces of data arrive and are queued up on the event
    handling channel
(c) the callback then itself issues a call to app.Run()

The deadlock occurs because the app goroutine is the goroutine that
listens to and processes UI input events; so if it is blocked from
calling app.Run(), it never returns to process UI events.

This resolves the issue in practice by updating the UI no more than 5
times a second during stream reassembly (which is more than enough).

Meanwhile, I'll consider changes to gowid's event handling logic...
  • Loading branch information
gcla committed Nov 25, 2019
1 parent b80def4 commit 412a94d
Showing 1 changed file with 54 additions and 26 deletions.
80 changes: 54 additions & 26 deletions ui/streamui.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type streamParseHandler struct {
app gowid.IApp
tick *time.Ticker // for updating the spinner
stop chan struct{}
chunks chan streams.IChunk
pktIndices chan int
name string
proto streams.Protocol
idx int
Expand All @@ -165,6 +167,29 @@ type streamParseHandler struct {
var _ streams.IOnStreamChunk = (*streamParseHandler)(nil)
var _ streams.IOnStreamHeader = (*streamParseHandler)(nil)

// Run from the app goroutine
func (t *streamParseHandler) drainChunks() {
curLen := len(t.chunks)
for i := 0; i < curLen; i++ {
chunk := <-t.chunks
if !t.pleaseWaitClosed {
t.pleaseWaitClosed = true
ClosePleaseWait(t.app)
}

t.wid.AddChunkEntire(chunk, t.app)
}
}

// Run from the app goroutine
func (t *streamParseHandler) drainPacketIndices() {
curLen := len(t.pktIndices)
for i := 0; i < curLen; i++ {
packet := <-t.pktIndices
t.wid.TrackPayloadPacket(packet)
}
}

func (t *streamParseHandler) BeforeBegin(closeMe chan<- struct{}) {
close(closeMe)
t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
Expand All @@ -173,6 +198,29 @@ func (t *streamParseHandler) BeforeBegin(closeMe chan<- struct{}) {

t.tick = time.NewTicker(time.Duration(200) * time.Millisecond)
t.stop = make(chan struct{})
t.chunks = make(chan streams.IChunk, 1000)
t.pktIndices = make(chan int, 1000)

// Start this after widgets have been cleared, to get focus change
termshark.TrackedGo(func() {
fn := func() {
t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
t.drainChunks()
t.drainPacketIndices()

if !t.openedStreams {
appViewNoKeys.SetSubWidget(streamView, app)
openStreamUi(t.wid, app)
t.openedStreams = true
}
}))
}

termshark.RunOnDoubleTicker(t.stop, fn,
time.Duration(200)*time.Millisecond,
time.Duration(200)*time.Millisecond,
10)
}, Goroutinewg)

termshark.TrackedGo(func() {
Loop:
Expand Down Expand Up @@ -206,6 +254,10 @@ func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) {
t.openedStreams = true
}

// Clear out anything lingering from last ticker run to now
t.drainChunks()
t.drainPacketIndices()

if t.wid.NumChunks() == 0 {
OpenMessage("No stream payloads found.", appView, app)
}
Expand All @@ -214,9 +266,7 @@ func (t *streamParseHandler) AfterEnd(closeMe chan<- struct{}) {
}

func (t *streamParseHandler) TrackPayloadPacket(packet int) {
t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
t.wid.TrackPayloadPacket(packet)
}))
t.pktIndices <- packet
}

func (t *streamParseHandler) OnStreamHeader(hdr streams.FollowHeader, ch chan struct{}) {
Expand All @@ -229,29 +279,7 @@ func (t *streamParseHandler) OnStreamHeader(hdr streams.FollowHeader, ch chan st
// Handle a line/chunk of input - one piece of reassembled data, which comes with
// a client/server direction.
func (t *streamParseHandler) OnStreamChunk(chunk streams.IChunk, ch chan struct{}) {
t.Lock()
defer t.Unlock()

t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
if !t.pleaseWaitClosed {
t.pleaseWaitClosed = true
ClosePleaseWait(t.app)
}
}))

t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
t.wid.AddChunkEntire(chunk, app)
}))

// Do after adding chunk so I can set focus on chunks correctly
t.app.Run(gowid.RunFunction(func(app gowid.IApp) {
if !t.openedStreams {
appViewNoKeys.SetSubWidget(streamView, app)
openStreamUi(t.wid, app)
t.openedStreams = true
}
}))

t.chunks <- chunk
close(ch)
}

Expand Down

0 comments on commit 412a94d

Please sign in to comment.