Skip to content

Commit

Permalink
Attempt to prevent the deadlock in the QueueDiskChannel Test again (#…
Browse files Browse the repository at this point in the history
…18415)

* Attempt to prevent the deadlock in the QueueDiskChannel Test again

This time we're going to adjust the pause tests to only test the right
flag.

* Only switch off pushback once we know that we are not pushing anything else
* Ensure full redirection occurs
* More nicely handle a closed datachan
* And handle similar problems in queue_channel_test

Signed-off-by: Andrew Thornton <art27@cantab.net>
  • Loading branch information
zeripath authored Jan 29, 2022
1 parent 726715f commit 92b715e
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 95 deletions.
5 changes: 4 additions & 1 deletion modules/queue/queue_bytefifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ loop:
// tell the pool to shutdown.
q.baseCtxCancel()
return
case data := <-q.dataChan:
case data, ok := <-q.dataChan:
if !ok {
return
}
if err := q.PushBack(data); err != nil {
log.Error("Unable to push back data into queue %s", q.name)
}
Expand Down
5 changes: 4 additions & 1 deletion modules/queue/queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,10 @@ func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
select {
case <-paused:
return nil
case data := <-q.dataChan:
case data, ok := <-q.dataChan:
if !ok {
return nil
}
if unhandled := q.handle(data); unhandled != nil {
log.Error("Unhandled Data whilst flushing queue %d", q.qid)
}
Expand Down
69 changes: 48 additions & 21 deletions modules/queue/queue_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"code.gitea.io/gitea/modules/log"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -111,7 +112,6 @@ func TestChannelQueue_Pause(t *testing.T) {
if pausable, ok := queue.(Pausable); ok {
pausable.Pause()
}
pushBack = false
lock.Unlock()
return data
}
Expand All @@ -123,7 +123,9 @@ func TestChannelQueue_Pause(t *testing.T) {
}
return nil
}
nilFn := func(_ func()) {}

queueShutdown := []func(){}
queueTerminate := []func(){}

queue, err = NewChannelQueue(handle,
ChannelQueueConfiguration{
Expand All @@ -139,7 +141,34 @@ func TestChannelQueue_Pause(t *testing.T) {
}, &testData{})
assert.NoError(t, err)

go queue.Run(nilFn, nilFn)
go queue.Run(func(shutdown func()) {
lock.Lock()
defer lock.Unlock()
queueShutdown = append(queueShutdown, shutdown)
}, func(terminate func()) {
lock.Lock()
defer lock.Unlock()
queueTerminate = append(queueTerminate, terminate)
})

// Shutdown and Terminate in defer
defer func() {
lock.Lock()
callbacks := make([]func(), len(queueShutdown))
copy(callbacks, queueShutdown)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
lock.Lock()
log.Info("Finally terminating")
callbacks = make([]func(), len(queueTerminate))
copy(callbacks, queueTerminate)
lock.Unlock()
for _, callback := range callbacks {
callback()
}
}()

test1 := testData{"A", 1}
test2 := testData{"B", 2}
Expand All @@ -155,14 +184,11 @@ func TestChannelQueue_Pause(t *testing.T) {

pausable.Pause()

paused, resumed := pausable.IsPausedIsResumed()
paused, _ := pausable.IsPausedIsResumed()

select {
case <-paused:
case <-resumed:
assert.Fail(t, "Queue should not be resumed")
return
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}
Expand All @@ -179,10 +205,11 @@ func TestChannelQueue_Pause(t *testing.T) {
assert.Nil(t, result2)

pausable.Resume()
_, resumed := pausable.IsPausedIsResumed()

select {
case <-resumed:
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
}

Expand All @@ -199,47 +226,47 @@ func TestChannelQueue_Pause(t *testing.T) {
pushBack = true
lock.Unlock()

paused, resumed = pausable.IsPausedIsResumed()
_, resumed = pausable.IsPausedIsResumed()

select {
case <-paused:
assert.Fail(t, "Queue should not be paused")
return
case <-resumed:
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not resumed")
return
}

queue.Push(&test1)
paused, _ = pausable.IsPausedIsResumed()

select {
case <-paused:
case <-handleChan:
assert.Fail(t, "handler chan should not contain test1")
return
case <-time.After(500 * time.Millisecond):
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "queue should be paused")
return
}

paused, resumed = pausable.IsPausedIsResumed()
lock.Lock()
pushBack = false
lock.Unlock()

paused, _ = pausable.IsPausedIsResumed()

select {
case <-paused:
case <-resumed:
assert.Fail(t, "Queue should not be resumed")
return
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue is not paused")
return
}

pausable.Resume()
_, resumed = pausable.IsPausedIsResumed()

select {
case <-resumed:
default:
case <-time.After(100 * time.Millisecond):
assert.Fail(t, "Queue should be resumed")
}

Expand Down
15 changes: 7 additions & 8 deletions modules/queue/queue_disk_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,14 +313,13 @@ func (q *PersistableChannelQueue) Shutdown() {
q.channelQueue.Wait()
q.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)
}()
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name)
close(q.channelQueue.dataChan)
for data := range q.channelQueue.dataChan {
_ = q.internal.Push(data)
atomic.AddInt64(&q.channelQueue.numInQueue, -1)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name)

log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name)
}
Expand Down
Loading

0 comments on commit 92b715e

Please sign in to comment.