diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 4ee0d8da1e..2a2b63e8e3 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -59,6 +59,7 @@ type producer struct { numPartitions uint32 messageRouter func(*ProducerMessage, TopicMetadata) int ticker *time.Ticker + tickerStop chan struct{} log *log.Entry } @@ -118,12 +119,19 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { return nil, err } - p.ticker = time.NewTicker(partitionsAutoDiscoveryInterval) + ticker := time.NewTicker(partitionsAutoDiscoveryInterval) + p.ticker = ticker + p.tickerStop = make(chan struct{}) go func() { - for range p.ticker.C { - p.log.Debug("Auto discovering new partitions") - p.internalCreatePartitionsProducers() + for { + select { + case <-ticker.C: + p.log.Debug("Auto discovering new partitions") + p.internalCreatePartitionsProducers() + case <-p.tickerStop: + return + } } }() @@ -282,6 +290,8 @@ func (p *producer) Close() { defer p.RUnlock() if p.ticker != nil { p.ticker.Stop() + close(p.tickerStop) + p.ticker = nil } for _, pp := range p.producers {