Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix data race in dynamic plugin registry tests #12554

Merged
merged 2 commits into from
Apr 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 67 additions & 61 deletions client/dynamicplugins/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,95 @@ func TestPluginEventBroadcaster_SendsMessagesToAllClients(t *testing.T) {

b := newPluginEventBroadcaster()
defer close(b.stopCh)
var rcv1, rcv2 bool

var rcv1, rcv2 bool
ch1 := b.subscribe()
ch2 := b.subscribe()

listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
select {
case <-ch:
*updateBool = true
}
}
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

go listenFunc(ch1, &rcv1)
go listenFunc(ch2, &rcv2)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive event on both subscriptions before timeout")
return
case <-ch1:
rcv1 = true
case <-ch2:
rcv2 = true
}
if rcv1 && rcv2 {
return
}
}
}()

b.broadcast(&PluginUpdateEvent{})

require.Eventually(t, func() bool {
return rcv1 == true && rcv2 == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) {
ci.Parallel(t)

b := newPluginEventBroadcaster()
defer close(b.stopCh)
var rcv1 bool

ch1 := b.subscribe()

listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) {
select {
case e := <-ch:
if e == nil {
*updateBool = true
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive unsubscribe event on subscription before timeout")
return
case <-ch1:
return // done!
}
}
}

go listenFunc(ch1, &rcv1)
}()

b.unsubscribe(ch1)

b.broadcast(&PluginUpdateEvent{})

require.Eventually(t, func() bool {
return rcv1 == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
ci.Parallel(t)

r := NewRegistry(nil, nil)

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

ch := r.PluginsUpdatedCh(ctx, "csi")
receivedRegistrationEvent := false

listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
select {
case e := <-ch:
if e == nil {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive registration event on subscription before timeout")
return
}

if e.EventType == EventTypeRegistered {
*updateBool = true
case e := <-ch:
if e != nil && e.EventType == EventTypeRegistered {
return
}
}
}
}

go listenFunc(ch, &receivedRegistrationEvent)
}()

err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
Expand All @@ -100,39 +113,35 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) {
})

require.NoError(t, err)

require.Eventually(t, func() bool {
return receivedRegistrationEvent == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {
ci.Parallel(t)

r := NewRegistry(nil, nil)

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
var wg sync.WaitGroup
wg.Add(1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

ch := r.PluginsUpdatedCh(ctx, "csi")
receivedDeregistrationEvent := false

listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
t.Errorf("did not receive deregistration event on subscription before timeout")
return
case e := <-ch:
if e == nil {
if e != nil && e.EventType == EventTypeDeregistered {
return
}

if e.EventType == EventTypeDeregistered {
*updateBool = true
}
}
}
}

go listenFunc(ch, &receivedDeregistrationEvent)
}()

err := r.RegisterPlugin(&PluginInfo{
Type: "csi",
Expand All @@ -144,10 +153,7 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) {

err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0")
require.NoError(t, err)

require.Eventually(t, func() bool {
return receivedDeregistrationEvent == true
}, 1*time.Second, 200*time.Millisecond)
wg.Wait()
}

func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {
Expand Down