diff --git a/pubsublite/internal/test/verifier.go b/pubsublite/internal/test/verifier.go index 99ae135bc39c..a8afa9a4dfc5 100644 --- a/pubsublite/internal/test/verifier.go +++ b/pubsublite/internal/test/verifier.go @@ -250,11 +250,11 @@ func (sv *streamVerifiers) Push(v *RPCVerifier) { sv.verifiers.PushBack(v) } -func (sv *streamVerifiers) Pop() (*RPCVerifier, error) { +func (sv *streamVerifiers) Pop(key string) (*RPCVerifier, error) { sv.numStreams++ elem := sv.verifiers.Front() if elem == nil { - sv.t.Errorf("stream(%d): unexpected connection with no verifiers", sv.numStreams) + sv.t.Errorf("unexpected stream index %d for key %s", sv.numStreams, key) return nil, status.Error(codes.FailedPrecondition, "mockserver: got unexpected stream connection") } @@ -294,7 +294,7 @@ func (kv *keyedStreamVerifiers) Pop(key string) (*RPCVerifier, error) { if !ok { return nil, status.Error(codes.FailedPrecondition, "mockserver: unexpected connection with no configured responses") } - return sv.Pop() + return sv.Pop(key) } func (kv *keyedStreamVerifiers) Flush() { diff --git a/pubsublite/internal/wire/publish_batcher.go b/pubsublite/internal/wire/publish_batcher.go index 606928c13e65..9b36b450544f 100644 --- a/pubsublite/internal/wire/publish_batcher.go +++ b/pubsublite/internal/wire/publish_batcher.go @@ -29,6 +29,11 @@ var errPublishQueueEmpty = errors.New("pubsublite: received publish response fro // PublishResultFunc receives the result of a publish. type PublishResultFunc func(*MessageMetadata, error) +type publishResult struct { + Metadata *MessageMetadata + OnResult PublishResultFunc +} + // messageHolder stores a message to be published, with associated metadata. type messageHolder struct { msg *pb.PubSubMessage @@ -133,26 +138,29 @@ func (b *publishMessageBatcher) AddBatch(batch *publishBatch) { b.publishQueue.PushBack(batch) } -func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) error { +func (b *publishMessageBatcher) OnPublishResponse(firstOffset int64) ([]*publishResult, error) { frontElem := b.publishQueue.Front() if frontElem == nil { - return errPublishQueueEmpty + return nil, errPublishQueueEmpty } if firstOffset < b.minExpectedNextOffset { - return fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset) + return nil, fmt.Errorf("pubsublite: server returned publish response with inconsistent start offset = %d, expected >= %d", firstOffset, b.minExpectedNextOffset) } batch, _ := frontElem.Value.(*publishBatch) + var results []*publishResult for i, msgHolder := range batch.msgHolders { // Messages are ordered, so the offset of each message is firstOffset + i. - mm := &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)} - msgHolder.onResult(mm, nil) - b.availableBufferBytes += msgHolder.size + results = append(results, &publishResult{ + Metadata: &MessageMetadata{Partition: b.partition, Offset: firstOffset + int64(i)}, + OnResult: msgHolder.onResult, + }) } + b.availableBufferBytes += batch.totalSize b.minExpectedNextOffset = firstOffset + int64(len(batch.msgHolders)) b.publishQueue.Remove(frontElem) - return nil + return results, nil } func (b *publishMessageBatcher) OnPermanentError(err error) { diff --git a/pubsublite/internal/wire/publish_batcher_test.go b/pubsublite/internal/wire/publish_batcher_test.go index bb6fa63a2eef..7a100740346a 100644 --- a/pubsublite/internal/wire/publish_batcher_test.go +++ b/pubsublite/internal/wire/publish_batcher_test.go @@ -285,7 +285,8 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) { batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch) t.Run("empty in-flight batches", func(t *testing.T) { - if gotErr, wantErr := batcher.OnPublishResponse(0), errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) { + _, gotErr := batcher.OnPublishResponse(0) + if wantErr := errPublishQueueEmpty; !test.ErrorEqual(gotErr, wantErr) { t.Errorf("OnPublishResponse() got err: %v, want err: %v", gotErr, wantErr) } }) @@ -297,30 +298,40 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) { // Batch 2 msg3 := &pb.PubSubMessage{Data: []byte{'3'}} - pubResult1 := newTestPublishResultReceiver(t, msg1) - pubResult2 := newTestPublishResultReceiver(t, msg2) - pubResult3 := newTestPublishResultReceiver(t, msg3) - batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1, pubResult1), makeMsgHolder(msg2, pubResult2))) - batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3, pubResult3))) - if err := batcher.OnPublishResponse(70); err != nil { + batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1), makeMsgHolder(msg2))) + batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3))) + + got, err := batcher.OnPublishResponse(70) + if err != nil { t.Errorf("OnPublishResponse() got err: %v", err) } - if err := batcher.OnPublishResponse(80); err != nil { - t.Errorf("OnPublishResponse() got err: %v", err) + want := []*publishResult{ + {Metadata: &MessageMetadata{Partition: partition, Offset: 70}}, + {Metadata: &MessageMetadata{Partition: partition, Offset: 71}}, + } + if diff := testutil.Diff(got, want); diff != "" { + t.Errorf("Results got: -, want: +\n%s", diff) } - pubResult1.ValidateResult(partition, 70) - pubResult2.ValidateResult(partition, 71) - pubResult3.ValidateResult(partition, 80) + got, err = batcher.OnPublishResponse(80) + if err != nil { + t.Errorf("OnPublishResponse() got err: %v", err) + } + want = []*publishResult{ + {Metadata: &MessageMetadata{Partition: partition, Offset: 80}}, + } + if diff := testutil.Diff(got, want); diff != "" { + t.Errorf("Results got: -, want: +\n%s", diff) + } }) t.Run("inconsistent offset", func(t *testing.T) { msg := &pb.PubSubMessage{Data: []byte{'4'}} - pubResult := newTestPublishResultReceiver(t, msg) - batcher.AddBatch(makePublishBatch(makeMsgHolder(msg, pubResult))) + batcher.AddBatch(makePublishBatch(makeMsgHolder(msg))) - if gotErr, wantMsg := batcher.OnPublishResponse(80), "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) { + _, gotErr := batcher.OnPublishResponse(80) + if wantMsg := "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) { t.Errorf("OnPublishResponse() got err: %v, want err msg: %q", gotErr, wantMsg) } }) diff --git a/pubsublite/internal/wire/publisher.go b/pubsublite/internal/wire/publisher.go index 5c5424486b7b..147b15d53222 100644 --- a/pubsublite/internal/wire/publisher.go +++ b/pubsublite/internal/wire/publisher.go @@ -67,10 +67,11 @@ type singlePartitionPublisher struct { // singlePartitionPublisherFactory creates instances of singlePartitionPublisher // for given partition numbers. type singlePartitionPublisherFactory struct { - ctx context.Context - pubClient *vkit.PublisherClient - settings PublishSettings - topicPath string + ctx context.Context + pubClient *vkit.PublisherClient + settings PublishSettings + topicPath string + unloadDelay time.Duration } func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher { @@ -113,9 +114,6 @@ func (pp *singlePartitionPublisher) Stop() { // Publish a pub/sub message. func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) { - pp.mu.Lock() - defer pp.mu.Unlock() - processMessage := func() error { // Messages are accepted while the service is starting up or active. During // startup, messages are queued in the batcher and will be published once @@ -134,10 +132,17 @@ func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult Publ return nil } + pp.mu.Lock() + err := processMessage() // If the new message cannot be published, flush pending messages and then // terminate the stream once results are received. - if err := processMessage(); err != nil { + if err != nil { pp.unsafeInitiateShutdown(serviceTerminating, err) + } + pp.mu.Unlock() + + if err != nil { + // Invoke callback without lock held. onResult(nil, err) } } @@ -199,24 +204,27 @@ func (pp *singlePartitionPublisher) onNewBatch(batch *publishBatch) { } func (pp *singlePartitionPublisher) onResponse(response interface{}) { - pp.mu.Lock() - defer pp.mu.Unlock() - - processResponse := func() error { + processResponse := func() ([]*publishResult, error) { pubResponse, _ := response.(*pb.PublishResponse) if pubResponse.GetMessageResponse() == nil { - return errInvalidMsgPubResponse + return nil, errInvalidMsgPubResponse } firstOffset := pubResponse.GetMessageResponse().GetStartCursor().GetOffset() - if err := pp.batcher.OnPublishResponse(firstOffset); err != nil { - return err - } - pp.unsafeCheckDone() - return nil + return pp.batcher.OnPublishResponse(firstOffset) } - if err := processResponse(); err != nil { + + pp.mu.Lock() + results, err := processResponse() + if err != nil { pp.unsafeInitiateShutdown(serviceTerminated, err) } + pp.unsafeCheckDone() + pp.mu.Unlock() + + // Invoke callbacks without lock held. + for _, r := range results { + r.OnResult(r.Metadata, nil) + } } // unsafeInitiateShutdown must be provided a target serviceStatus, which must be @@ -274,6 +282,107 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() { } } +// lazyPartitionPublisher lazily creates an underlying singlePartitionPublisher +// and unloads it after a period of inactivity. +type lazyPartitionPublisher struct { + // Immutable after creation. + pubFactory *singlePartitionPublisherFactory + partition int + idleTimer *streamIdleTimer + + // Fields below must be guarded with mu. + publisher *singlePartitionPublisher + outstandingMessages int + + abstractService +} + +func newLazyPartitionPublisher(partition int, pubFactory *singlePartitionPublisherFactory) *lazyPartitionPublisher { + pub := &lazyPartitionPublisher{ + pubFactory: pubFactory, + partition: partition, + } + pub.idleTimer = newStreamIdleTimer(pubFactory.unloadDelay, pub.onIdle) + return pub +} + +func (lp *lazyPartitionPublisher) Start() { + lp.mu.Lock() + defer lp.mu.Unlock() + lp.unsafeUpdateStatus(serviceActive, nil) +} + +func (lp *lazyPartitionPublisher) Stop() { + lp.mu.Lock() + defer lp.mu.Unlock() + + lp.idleTimer.Shutdown() + if lp.publisher == nil { + lp.unsafeUpdateStatus(serviceTerminated, nil) + } else if lp.unsafeUpdateStatus(serviceTerminating, nil) { + lp.publisher.Stop() + } +} + +func (lp *lazyPartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) { + publisher, err := func() (*singlePartitionPublisher, error) { + lp.mu.Lock() + defer lp.mu.Unlock() + + if lp.status >= serviceTerminating { + return nil, ErrServiceStopped + } + if lp.publisher == nil { + lp.publisher = lp.pubFactory.New(lp.partition) + lp.publisher.AddStatusChangeReceiver(lp.Handle(), lp.onStatusChange) + lp.publisher.Start() + } + lp.idleTimer.Stop() // Prevent the underlying publisher from being unloaded + lp.outstandingMessages++ + return lp.publisher, nil + }() + if err != nil { + onResult(nil, err) + return + } + // Publish without lock held, as the callback may be invoked inline. + publisher.Publish(msg, func(metadata *MessageMetadata, err error) { + lp.onResult() + onResult(metadata, err) + }) +} + +func (lp *lazyPartitionPublisher) onStatusChange(handle serviceHandle, status serviceStatus, err error) { + if status >= serviceTerminating { + lp.mu.Lock() + defer lp.mu.Unlock() + lp.unsafeUpdateStatus(status, err) + } +} + +func (lp *lazyPartitionPublisher) onResult() { + lp.mu.Lock() + defer lp.mu.Unlock() + + lp.outstandingMessages-- + if lp.outstandingMessages == 0 { + // Schedule the underlying publisher for unload if no new messages are + // published before the timer expires. + lp.idleTimer.Restart() + } +} + +func (lp *lazyPartitionPublisher) onIdle() { + lp.mu.Lock() + defer lp.mu.Unlock() + + if lp.outstandingMessages == 0 && lp.publisher != nil { + lp.publisher.RemoveStatusChangeReceiver(lp.Handle()) + lp.publisher.Stop() + lp.publisher = nil + } +} + // routingPublisher publishes messages to multiple topic partitions, each // managed by a singlePartitionPublisher. It supports increasing topic partition // count, but not decreasing. @@ -285,7 +394,7 @@ type routingPublisher struct { // Fields below must be guarded with mu. msgRouter messageRouter - publishers []*singlePartitionPublisher + publishers []*lazyPartitionPublisher compositeService } @@ -319,7 +428,7 @@ func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) { prevPartitionCount := len(rp.publishers) for i := prevPartitionCount; i < partitionCount; i++ { - pub := rp.pubFactory.New(i) + pub := newLazyPartitionPublisher(i, rp.pubFactory) rp.publishers = append(rp.publishers, pub) rp.unsafeAddServices(pub) } @@ -335,7 +444,7 @@ func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResul pub.Publish(msg, onResult) } -func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) { +func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*lazyPartitionPublisher, error) { rp.mu.Lock() defer rp.mu.Unlock() @@ -395,10 +504,11 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano()))) pubFactory := &singlePartitionPublisherFactory{ - ctx: ctx, - pubClient: pubClient, - settings: settings, - topicPath: topicPath, + ctx: ctx, + pubClient: pubClient, + settings: settings, + topicPath: topicPath, + unloadDelay: time.Minute * 5, } return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil } diff --git a/pubsublite/internal/wire/publisher_test.go b/pubsublite/internal/wire/publisher_test.go index 8aee651bc5ad..de3eb885ba5e 100644 --- a/pubsublite/internal/wire/publisher_test.go +++ b/pubsublite/internal/wire/publisher_test.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "math/rand" + "strconv" "testing" "time" @@ -54,10 +55,11 @@ func newTestSinglePartitionPublisher(t *testing.T, topic topicPartition, setting } pubFactory := &singlePartitionPublisherFactory{ - ctx: ctx, - pubClient: pubClient, - settings: settings, - topicPath: topic.Path, + ctx: ctx, + pubClient: pubClient, + settings: settings, + topicPath: topic.Path, + unloadDelay: time.Hour, } tp := &testPartitionPublisher{ pub: pubFactory.New(topic.Partition), @@ -504,7 +506,7 @@ type testRoutingPublisher struct { pub *routingPublisher } -func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, fakeSourceVal int64) *testRoutingPublisher { +func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSettings, unloadDelay time.Duration, fakeSourceVal int64) *testRoutingPublisher { ctx := context.Background() pubClient, err := newPublisherClient(ctx, "ignored", testServer.ClientConn()) if err != nil { @@ -519,10 +521,11 @@ func newTestRoutingPublisher(t *testing.T, topicPath string, settings PublishSet source := &test.FakeSource{Ret: fakeSourceVal} msgRouterFactory := newMessageRouterFactory(rand.New(source)) pubFactory := &singlePartitionPublisherFactory{ - ctx: ctx, - pubClient: pubClient, - settings: settings, - topicPath: topicPath, + ctx: ctx, + pubClient: pubClient, + settings: settings, + topicPath: topicPath, + unloadDelay: unloadDelay, } pub := newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory) pub.Start() @@ -546,43 +549,62 @@ func (tp *testRoutingPublisher) Stop() { tp.pub.Stop() } func (tp *testRoutingPublisher) WaitStarted() error { return tp.pub.WaitStarted() } func (tp *testRoutingPublisher) WaitStopped() error { return tp.pub.WaitStopped() } -func TestRoutingPublisherStartOnce(t *testing.T) { +func TestRoutingPublisherUnloadIdlePublisher(t *testing.T) { const topic = "projects/123456/locations/us-central1-b/topics/my-topic" numPartitions := 2 + key0 := []byte("baz") // hashes to partition 0 + key1 := []byte("bar") // hashes to partition 1 + + var msgs []*pb.PubSubMessage + for i := 1; i <= 6; i++ { + msgs = append(msgs, &pb.PubSubMessage{Data: []byte(strconv.Itoa(i)), Key: key0}) + } + msg1 := &pb.PubSubMessage{Data: []byte{'a'}, Key: key1} + msg2 := &pb.PubSubMessage{Data: []byte{'b'}, Key: key1} verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) + // Partition 0 is continuously published to and never unloaded. + stream := test.NewRPCVerifier(t) + stream.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) + for i, msg := range msgs { + stream.Push(msgPubReq(msg), msgPubResp(10+int64(i)), nil) + } + verifiers.AddPublishStream(topic, 0, stream) + + // Partition 1 - first connection stream0 := test.NewRPCVerifier(t) - stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 0, stream0) + stream0.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + stream0.Push(msgPubReq(msg1), msgPubResp(21), nil) + verifiers.AddPublishStream(topic, 1, stream0) + // Partition 1 - second connection stream1 := test.NewRPCVerifier(t) stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) + stream1.Push(msgPubReq(msg2), msgPubResp(22), nil) verifiers.AddPublishStream(topic, 1, stream1) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + unloadDelay := time.Millisecond * 10 + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), unloadDelay, 0) + if gotErr := pub.WaitStarted(); gotErr != nil { + t.Errorf("Start() got err: (%v)", gotErr) + } - t.Run("First succeeds", func(t *testing.T) { - // Note: newTestRoutingPublisher() called Start. - if gotErr := pub.WaitStarted(); gotErr != nil { - t.Errorf("Start() got err: (%v)", gotErr) - } - if got, want := pub.NumPartitionPublishers(), numPartitions; got != want { - t.Errorf("Num partition publishers: got %d, want %d", got, want) - } - }) - t.Run("Second no-op", func(t *testing.T) { - // An error is not returned, but no new streams are opened. The mock server - // does not expect more RPCs. - pub.Start() - if gotErr := pub.WaitStarted(); gotErr != nil { - t.Errorf("Start() got err: (%v)", gotErr) - } - }) + result1 := pub.Publish(msg1) + result1.ValidateResult(1, 21) + + for i, msg := range msgs { + result := pub.Publish(msg) + result.ValidateResult(0, 10+int64(i)) + time.Sleep(unloadDelay / 2) + } + + result2 := pub.Publish(msg2) + result2.ValidateResult(1, 22) pub.Stop() if gotErr := pub.WaitStopped(); gotErr != nil { @@ -600,7 +622,7 @@ func TestRoutingPublisherStartStop(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) barrier.ReleaseAfter(func() { pub.Stop() }) if gotErr := pub.WaitStopped(); gotErr != nil { @@ -621,6 +643,8 @@ func TestRoutingPublisherRoundRobin(t *testing.T) { msg2 := &pb.PubSubMessage{Data: []byte{'2'}} msg3 := &pb.PubSubMessage{Data: []byte{'3'}} msg4 := &pb.PubSubMessage{Data: []byte{'4'}} + msg5 := &pb.PubSubMessage{Data: []byte{'5'}} + msg6 := &pb.PubSubMessage{Data: []byte{'6'}} verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) @@ -629,6 +653,7 @@ func TestRoutingPublisherRoundRobin(t *testing.T) { stream0 := test.NewRPCVerifier(t) stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) stream0.Push(msgPubReq(msg3), msgPubResp(34), nil) + stream0.Push(msgPubReq(msg6), msgPubResp(35), nil) verifiers.AddPublishStream(topic, 0, stream0) // Partition 1 @@ -642,6 +667,7 @@ func TestRoutingPublisherRoundRobin(t *testing.T) { stream2 := test.NewRPCVerifier(t) stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) stream2.Push(msgPubReq(msg2), msgPubResp(78), nil) + stream2.Push(msgPubReq(msg5), msgPubResp(79), nil) verifiers.AddPublishStream(topic, 2, stream2) mockServer.OnTestStart(verifiers) @@ -649,7 +675,7 @@ func TestRoutingPublisherRoundRobin(t *testing.T) { // Note: The fake source is initialized with value=1, so Partition=1 publisher // will be the first chosen by the roundRobinMsgRouter. - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 1) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 1) if err := pub.WaitStarted(); err != nil { t.Errorf("Start() got err: (%v)", err) } @@ -657,12 +683,16 @@ func TestRoutingPublisherRoundRobin(t *testing.T) { result1 := pub.Publish(msg1) result2 := pub.Publish(msg2) result3 := pub.Publish(msg3) - result4 := pub.Publish(msg4) - result1.ValidateResult(1, 41) result2.ValidateResult(2, 78) result3.ValidateResult(0, 34) + + result4 := pub.Publish(msg4) + result5 := pub.Publish(msg5) + result6 := pub.Publish(msg6) result4.ValidateResult(1, 42) + result5.ValidateResult(2, 79) + result6.ValidateResult(0, 35) pub.Stop() if err := pub.WaitStopped(); err != nil { @@ -681,9 +711,9 @@ func TestRoutingPublisherHashing(t *testing.T) { // Messages have ordering key, so the hashingMsgRouter is used. msg1 := &pb.PubSubMessage{Data: []byte{'1'}, Key: key2} msg2 := &pb.PubSubMessage{Data: []byte{'2'}, Key: key0} - msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key2} + msg3 := &pb.PubSubMessage{Data: []byte{'3'}, Key: key1} msg4 := &pb.PubSubMessage{Data: []byte{'4'}, Key: key1} - msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key0} + msg5 := &pb.PubSubMessage{Data: []byte{'5'}, Key: key2} verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) @@ -692,26 +722,26 @@ func TestRoutingPublisherHashing(t *testing.T) { stream0 := test.NewRPCVerifier(t) stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) stream0.Push(msgPubReq(msg2), msgPubResp(20), nil) - stream0.Push(msgPubReq(msg5), msgPubResp(21), nil) verifiers.AddPublishStream(topic, 0, stream0) // Partition 1 stream1 := test.NewRPCVerifier(t) stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) - stream1.Push(msgPubReq(msg4), msgPubResp(40), nil) + stream1.Push(msgPubReq(msg3), msgPubResp(40), nil) + stream1.Push(msgPubReq(msg4), msgPubResp(41), nil) verifiers.AddPublishStream(topic, 1, stream1) // Partition 2 stream2 := test.NewRPCVerifier(t) stream2.Push(initPubReq(topicPartition{topic, 2}), initPubResp(), nil) stream2.Push(msgPubReq(msg1), msgPubResp(10), nil) - stream2.Push(msgPubReq(msg3), msgPubResp(11), nil) + stream2.Push(msgPubReq(msg5), msgPubResp(11), nil) verifiers.AddPublishStream(topic, 2, stream2) mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) if err := pub.WaitStarted(); err != nil { t.Errorf("Start() got err: (%v)", err) } @@ -719,14 +749,14 @@ func TestRoutingPublisherHashing(t *testing.T) { result1 := pub.Publish(msg1) result2 := pub.Publish(msg2) result3 := pub.Publish(msg3) - result4 := pub.Publish(msg4) - result5 := pub.Publish(msg5) - result1.ValidateResult(2, 10) result2.ValidateResult(0, 20) - result3.ValidateResult(2, 11) - result4.ValidateResult(1, 40) - result5.ValidateResult(0, 21) + result3.ValidateResult(1, 40) + + result4 := pub.Publish(msg4) + result5 := pub.Publish(msg5) + result4.ValidateResult(1, 41) + result5.ValidateResult(2, 11) pub.Stop() if err := pub.WaitStopped(); err != nil { @@ -761,7 +791,7 @@ func TestRoutingPublisherPermanentError(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) if err := pub.WaitStarted(); err != nil { t.Errorf("Start() got err: (%v)", err) } @@ -785,21 +815,12 @@ func TestRoutingPublisherPublishAfterStop(t *testing.T) { verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(numPartitions), nil) - - // Partition 0 - stream0 := test.NewRPCVerifier(t) - stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 0, stream0) - - // Partition 1 - stream1 := test.NewRPCVerifier(t) - stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 1, stream1) + // No streams expected. mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) if err := pub.WaitStarted(); err != nil { t.Errorf("Start() got err: (%v)", err) } @@ -828,7 +849,7 @@ func TestRoutingPublisherPartitionCountFail(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantErr.Error()) { t.Errorf("Start() got err: (%v), want err: (%v)", gotErr, wantErr) @@ -853,7 +874,7 @@ func TestRoutingPublisherPartitionCountInvalid(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) wantMsg := "topic has invalid number of partitions" if gotErr := pub.WaitStarted(); !test.ErrorHasMsg(gotErr, wantMsg) { @@ -894,7 +915,7 @@ func TestRoutingPublisherPartitionCountIncreases(t *testing.T) { mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) t.Run("Initial count", func(t *testing.T) { if gotErr := pub.WaitStarted(); gotErr != nil { @@ -934,19 +955,12 @@ func TestRoutingPublisherPartitionCountDecreases(t *testing.T) { verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(updatedPartitionCount), nil) - - stream0 := test.NewRPCVerifier(t) - stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 0, stream0) - - stream1 := test.NewRPCVerifier(t) - stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 1, stream1) + // No streams expected. mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) t.Run("Initial count", func(t *testing.T) { if gotErr := pub.WaitStarted(); gotErr != nil { @@ -979,19 +993,12 @@ func TestRoutingPublisherPartitionCountUpdateFails(t *testing.T) { verifiers := test.NewVerifiers(t) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), topicPartitionsResp(initialPartitionCount), nil) verifiers.GlobalVerifier.Push(topicPartitionsReq(topic), nil, serverErr) - - stream0 := test.NewRPCVerifier(t) - stream0.Push(initPubReq(topicPartition{topic, 0}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 0, stream0) - - stream1 := test.NewRPCVerifier(t) - stream1.Push(initPubReq(topicPartition{topic, 1}), initPubResp(), nil) - verifiers.AddPublishStream(topic, 1, stream1) + // No streams expected. mockServer.OnTestStart(verifiers) defer mockServer.OnTestEnd() - pub := newTestRoutingPublisher(t, topic, testPublishSettings(), 0) + pub := newTestRoutingPublisher(t, topic, testPublishSettings(), time.Hour, 0) t.Run("Initial count", func(t *testing.T) { if gotErr := pub.WaitStarted(); gotErr != nil { diff --git a/pubsublite/internal/wire/service_test.go b/pubsublite/internal/wire/service_test.go index 102ccfd46b81..6672e45e21da 100644 --- a/pubsublite/internal/wire/service_test.go +++ b/pubsublite/internal/wire/service_test.go @@ -599,11 +599,11 @@ func TestCompositeServiceTree(t *testing.T) { intermediate1.receiver.VerifyStatus(t, serviceTerminated) intermediate2.receiver.VerifyStatus(t, serviceTerminated) root.receiver.VerifyStatus(t, serviceTerminated) - root.VerifyClosed(t, true) if gotErr := root.WaitStopped(); !test.ErrorEqual(gotErr, wantErr) { t.Errorf("compositeService.WaitStopped() got err: (%v), want err: (%v)", gotErr, wantErr) } + root.VerifyClosed(t, true) }) }