Skip to content

Commit

Permalink
fix races in orca test
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Sep 24, 2024
1 parent 1c6a9b1 commit 2eb9103
Showing 1 changed file with 9 additions and 15 deletions.
24 changes: 9 additions & 15 deletions orca/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubCon
}
l := getListenerInfo(addrs[0])
l.listener.cleanup = orca.RegisterOOBListener(sc, l.listener, l.opts)
l.sc = sc
l.scChan <- sc
}
sc, err := w.ClientConn.NewSubConn(addrs, opts)
if err != nil {
Expand All @@ -86,7 +86,7 @@ func (w *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubCon
type listenerInfo struct {
listener *testOOBListener
opts orca.OOBListenerOptions
sc balancer.SubConn // Set by the LB policy
scChan chan balancer.SubConn // Pushed on by the LB policy
}

type listenerInfoKey struct{}
Expand Down Expand Up @@ -150,7 +150,7 @@ func (s) TestProducer(t *testing.T) {
oobLis := newTestOOBListener()

lisOpts := orca.OOBListenerOptions{ReportInterval: 50 * time.Millisecond}
li := &listenerInfo{listener: oobLis, opts: lisOpts}
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
addr := setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -159,10 +159,6 @@ func (s) TestProducer(t *testing.T) {
}
defer cc.Close()

// Ensure the OOB listener is stopped before the client is closed to avoid
// a potential irrelevant error in the logs.
defer oobLis.Stop()

// Set a few metrics and wait for them on the client side.
smr.SetCPUUtilization(10)
smr.SetMemoryUtilization(0.1)
Expand Down Expand Up @@ -209,6 +205,7 @@ testReport:
t.Fatalf("timed out waiting for load report: %v", loadReportWant)
}
}

}

// fakeORCAService is a simple implementation of an ORCA service that pushes
Expand Down Expand Up @@ -320,18 +317,14 @@ func (s) TestProducerBackoff(t *testing.T) {
oobLis := newTestOOBListener()

lisOpts := orca.OOBListenerOptions{ReportInterval: reportInterval}
li := &listenerInfo{listener: oobLis, opts: lisOpts}
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis, opts: lisOpts}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial failed: %v", err)
}
defer cc.Close()

// Ensure the OOB listener is stopped before the client is closed to avoid
// a potential irrelevant error in the logs.
defer oobLis.Stop()

// Define a load report to send and expect the client to see.
loadReportWant := &v3orcapb.OrcaLoadReport{
CpuUtilization: 10,
Expand Down Expand Up @@ -436,7 +429,7 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
r := manual.NewBuilderWithScheme("whatever")
oobLis1 := newTestOOBListener()
lisOpts1 := orca.OOBListenerOptions{ReportInterval: reportInterval1}
li := &listenerInfo{listener: oobLis1, opts: lisOpts1}
li := &listenerInfo{scChan: make(chan balancer.SubConn, 1), listener: oobLis1, opts: lisOpts1}
r.InitialState(resolver.State{Addresses: []resolver.Address{setListenerInfo(resolver.Address{Addr: lis.Addr().String()}, li)}})
cc, err := grpc.Dial("whatever:///whatever", grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"customLB":{}}]}`), grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
Expand Down Expand Up @@ -525,15 +518,16 @@ func (s) TestProducerMultipleListeners(t *testing.T) {
fake.respCh <- loadReportWant
checkReports(1, 0, 0)

sc := <-li.scChan
// Register listener 2 with a less frequent interval; no need to recreate
// stream. Report should go to both listeners.
oobLis2.cleanup = orca.RegisterOOBListener(li.sc, oobLis2, lisOpts2)
oobLis2.cleanup = orca.RegisterOOBListener(sc, oobLis2, lisOpts2)
fake.respCh <- loadReportWant
checkReports(2, 1, 0)

// Register listener 3 with a more frequent interval; stream is recreated
// with this interval. The next report will go to all three listeners.
oobLis3.cleanup = orca.RegisterOOBListener(li.sc, oobLis3, lisOpts3)
oobLis3.cleanup = orca.RegisterOOBListener(sc, oobLis3, lisOpts3)
awaitRequest(reportInterval3)
fake.respCh <- loadReportWant
checkReports(3, 2, 1)
Expand Down

0 comments on commit 2eb9103

Please sign in to comment.