diff --git a/pubsub_broadcaster.go b/pubsub_broadcaster.go index 02a23a6..590905d 100644 --- a/pubsub_broadcaster.go +++ b/pubsub_broadcaster.go @@ -39,6 +39,13 @@ func NewPubSubBroadcaster(ctx context.Context, psub *pubsub.PubSub, topic string go func(ctx context.Context, subs *pubsub.Subscription) { <-ctx.Done() subs.Cancel() + // subs.Next returns error when subscription closed. Subscription must + // be closed before psubTopic can be closed. + var err error + for err == nil { + _, err = subs.Next(ctx) + } + psubTopic.Close() }(ctx, subs) return &PubSubBroadcaster{