diff --git a/client/dynamicplugins/registry_test.go b/client/dynamicplugins/registry_test.go index c55af5c5a905..3ed638997e8b 100644 --- a/client/dynamicplugins/registry_test.go +++ b/client/dynamicplugins/registry_test.go @@ -16,26 +16,36 @@ func TestPluginEventBroadcaster_SendsMessagesToAllClients(t *testing.T) { b := newPluginEventBroadcaster() defer close(b.stopCh) - var rcv1, rcv2 bool + var rcv1, rcv2 bool ch1 := b.subscribe() ch2 := b.subscribe() - listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) { - select { - case <-ch: - *updateBool = true - } - } + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() - go listenFunc(ch1, &rcv1) - go listenFunc(ch2, &rcv2) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + t.Fail() + return + case <-ch1: + rcv1 = true + case <-ch2: + rcv2 = true + } + if rcv1 && rcv2 { + return + } + } + }() b.broadcast(&PluginUpdateEvent{}) - - require.Eventually(t, func() bool { - return rcv1 == true && rcv2 == true - }, 1*time.Second, 200*time.Millisecond) + wg.Wait() } func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) { @@ -47,24 +57,30 @@ func TestPluginEventBroadcaster_UnsubscribeWorks(t *testing.T) { ch1 := b.subscribe() - listenFunc := func(ch chan *PluginUpdateEvent, updateBool *bool) { - select { - case e := <-ch: - if e == nil { - *updateBool = true + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + t.Fail() + return + case <-ch1: + rcv1 = true + if rcv1 { + return + } } } - } - - go listenFunc(ch1, &rcv1) + }() b.unsubscribe(ch1) - b.broadcast(&PluginUpdateEvent{}) - - require.Eventually(t, func() bool { - return rcv1 == true - }, 1*time.Second, 200*time.Millisecond) + wg.Wait() } func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) { @@ -72,26 +88,27 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) { r := NewRegistry(nil, nil) - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() ch := r.PluginsUpdatedCh(ctx, "csi") - receivedRegistrationEvent := false - listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) { - select { - case e := <-ch: - if e == nil { + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + t.Fail() return - } - - if e.EventType == EventTypeRegistered { - *updateBool = true + case e := <-ch: + if e != nil && e.EventType == EventTypeRegistered { + return + } } } - } - - go listenFunc(ch, &receivedRegistrationEvent) + }() err := r.RegisterPlugin(&PluginInfo{ Type: "csi", @@ -100,10 +117,7 @@ func TestDynamicRegistry_RegisterPlugin_SendsUpdateEvents(t *testing.T) { }) require.NoError(t, err) - - require.Eventually(t, func() bool { - return receivedRegistrationEvent == true - }, 1*time.Second, 200*time.Millisecond) + wg.Wait() } func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) { @@ -111,28 +125,27 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) { r := NewRegistry(nil, nil) - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() + var wg sync.WaitGroup + wg.Add(1) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() ch := r.PluginsUpdatedCh(ctx, "csi") - receivedDeregistrationEvent := false - listenFunc := func(ch <-chan *PluginUpdateEvent, updateBool *bool) { + go func() { + defer wg.Done() for { select { + case <-ctx.Done(): + t.Fail() + return case e := <-ch: - if e == nil { + if e != nil && e.EventType == EventTypeDeregistered { return } - - if e.EventType == EventTypeDeregistered { - *updateBool = true - } } } - } - - go listenFunc(ch, &receivedDeregistrationEvent) + }() err := r.RegisterPlugin(&PluginInfo{ Type: "csi", @@ -144,10 +157,7 @@ func TestDynamicRegistry_DeregisterPlugin_SendsUpdateEvents(t *testing.T) { err = r.DeregisterPlugin("csi", "my-plugin", "alloc-0") require.NoError(t, err) - - require.Eventually(t, func() bool { - return receivedDeregistrationEvent == true - }, 1*time.Second, 200*time.Millisecond) + wg.Wait() } func TestDynamicRegistry_DispensePlugin_Works(t *testing.T) {