Skip to content

Commit

Permalink
testing: fix test-only data race in dynamic plugin registry
Browse files Browse the repository at this point in the history
The dynamic plugin registry tests have data races becasue the test
assertion is reading values set in the `listenFunc` goroutines. Have
the assertion happen inside the polling goroutine to remove the race.
  • Loading branch information
tgross committed Apr 12, 2022
1 parent 762acf8 commit 5e71aae
Showing 1 changed file with 70 additions and 60 deletions.
130 changes: 70 additions & 60 deletions client/dynamicplugins/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,36 @@ func TestPluginEventBroadcaster_SendsMessagesToAllClients(t *testing.T) {

b := newPluginEventBroadcaster()
defer close(b.stopCh)
var rcv1, rcv2 bool

var rcv1, rcv2 bool
ch1 := b.subscribe()
ch2 := b.subscribe()

listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
select {
case <-ch:
*updateBool = true
}
}
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

go listenFunc(ch1, &rcv1)
go listenFunc(ch2, &rcv2)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Fail()
return
case <-ch1:
rcv1 = true
case <-ch2:
rcv2 = true
}
if rcv1 && rcv2 {
return
}
}
}()

b.broadcast(&PluginUpdateEvent{})

require.Eventually(t, func() bool {
return rcv1 == true && rcv2 == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {
Expand All @@ -47,51 +57,58 @@ func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {

ch1 := b.subscribe()

listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
select {
case e := <-ch:
if e == nil {
*updateBool = true
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Fail()
return
case <-ch1:
rcv1 = true
if rcv1 {
return
}
}
}
}

go listenFunc(ch1, &rcv1)
}()

b.unsubscribe(ch1)

b.broadcast(&PluginUpdateEvent{})

require.Eventually(t, func() bool {
return rcv1 == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
ci.Parallel(t)

r := NewRegistry(nil, nil)

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

ch := r.PluginsUpdatedCh(ctx, "csi")
receivedRegistrationEvent := false

listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
select {
case e := <-ch:
if e == nil {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Fail()
return
}

if e.EventType == EventTypeRegistered {
*updateBool = true
case e := <-ch:
if e != nil && e.EventType == EventTypeRegistered {
return
}
}
}
}

go listenFunc(ch, &receivedRegistrationEvent)
}()

err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
Expand All @@ -100,39 +117,35 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
})

require.NoError(t, err)

require.Eventually(t, func() bool {
return receivedRegistrationEvent == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
ci.Parallel(t)

r := NewRegistry(nil, nil)

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

ch := r.PluginsUpdatedCh(ctx, "csi")
receivedDeregistrationEvent := false

listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Fail()
return
case e := <-ch:
if e == nil {
if e != nil && e.EventType == EventTypeDeregistered {
return
}

if e.EventType == EventTypeDeregistered {
*updateBool = true
}
}
}
}

go listenFunc(ch, &receivedDeregistrationEvent)
}()

err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
Expand All @@ -144,10 +157,7 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {

err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0")
require.NoError(t, err)

require.Eventually(t, func() bool {
return receivedDeregistrationEvent == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {
Expand Down

0 comments on commit 5e71aae

Please sign in to comment.