Skip to content

Commit

Permalink
Avoid contention on producer mutex on critical path (#286)
Browse files Browse the repository at this point in the history
* Avoid contention on producer mutex on critical path

* Circumvent the race detector

* Removed space
  • Loading branch information
merlimat authored Jun 18, 2020
1 parent d4c5dcc commit 861d7af
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions pulsar/producer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,20 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

log "github.com/sirupsen/logrus"

"github.com/apache/pulsar-client-go/pulsar/internal"
)

type producer struct {
sync.Mutex
sync.RWMutex
client *client
options *ProducerOptions
topic string
producers []Producer
producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
Expand Down Expand Up @@ -115,6 +117,7 @@ func (p *producer) internalCreatePartitionsProducers() error {

p.Lock()
defer p.Unlock()

oldProducers := p.producers

if oldProducers != nil {
Expand Down Expand Up @@ -179,6 +182,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
return err
}

atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
}
Expand All @@ -188,8 +192,8 @@ func (p *producer) Topic() string {
}

func (p *producer) Name() string {
p.Lock()
defer p.Unlock()
p.RLock()
defer p.RUnlock()

return p.producers[0].Name()
}
Expand All @@ -199,27 +203,30 @@ func (p *producer) NumPartitions() uint32 {
}

func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
p.Lock()
partition := p.messageRouter(msg, p)
pp := p.producers[partition]
p.Unlock()

return pp.Send(ctx, msg)
return p.getPartition(msg).Send(ctx, msg)
}

func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
p.Lock()
partition := p.messageRouter(msg, p)
pp := p.producers[partition]
p.Unlock()
p.getPartition(msg).SendAsync(ctx, msg, callback)
}

pp.SendAsync(ctx, msg, callback)
func (p *producer) getPartition(msg *ProducerMessage) Producer {
// Since partitions can only increase, it's ok if the producers list
// is updated in between. The numPartition is updated only after the list.
partition := p.messageRouter(msg, p)
producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
if partition >= len(producers) {
// We read the old producers list while the count was already
// updated
partition %= len(producers)
}
return producers[partition]
}

func (p *producer) LastSequenceID() int64 {
p.Lock()
defer p.Unlock()
p.RLock()
defer p.RUnlock()

var maxSeq int64 = -1
for _, pp := range p.producers {
Expand All @@ -232,8 +239,8 @@ func (p *producer) LastSequenceID() int64 {
}

func (p *producer) Flush() error {
p.Lock()
defer p.Unlock()
p.RLock()
defer p.RUnlock()

for _, pp := range p.producers {
if err := pp.Flush(); err != nil {
Expand All @@ -245,8 +252,8 @@ func (p *producer) Flush() error {
}

func (p *producer) Close() {
p.Lock()
defer p.Unlock()
p.RLock()
defer p.RUnlock()
if p.ticker != nil {
p.ticker.Stop()
}
Expand Down

0 comments on commit 861d7af

Please sign in to comment.