From 4c2e6990fcafe2fb45fc01de532e00c340f72f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Mon, 28 Sep 2020 14:25:32 +0800 Subject: [PATCH 1/5] send delay message individually even batching is enabled --- pulsar/internal/batch_builder.go | 2 +- pulsar/producer_partition.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 18ec3c4bf0..04847859de 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -152,7 +152,7 @@ func (bb *BatchBuilder) reset() { bb.numMessages = 0 bb.buffer.Clear() bb.callbacks = []interface{}{} - bb.msgMetadata.ReplicateTo = nil + bb.msgMetadata.Reset() } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index a16193f4d4..870da2ef01 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -334,6 +334,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) { sequenceID = internal.GetAndAdd(p.sequenceIDGenerator, 1) } + if !sendAsBatch { + p.internalFlushCurrentBatch() + } added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, msg.ReplicationClusters, deliverAt) if !added { @@ -437,7 +440,6 @@ func (p *partitionProducer) SendAsync(ctx context.Context, msg *ProducerMessage, func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { - sr := &sendRequest{ ctx: ctx, msg: msg, From 6be0285efa8847b5f36586475ff94fabb7b9e29b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Mon, 28 Sep 2020 18:44:07 +0800 Subject: [PATCH 2/5] reset devliverAtTime to nil --- pulsar/internal/batch_builder.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go index 04847859de..fe34f0cf2d 100644 --- a/pulsar/internal/batch_builder.go +++ b/pulsar/internal/batch_builder.go @@ -152,7 +152,8 @@ func (bb *BatchBuilder) reset() { bb.numMessages = 0 bb.buffer.Clear() bb.callbacks = []interface{}{} - bb.msgMetadata.Reset() + bb.msgMetadata.ReplicateTo = nil + bb.msgMetadata.DeliverAtTime = nil } // Flush all the messages buffered in the client and wait until all messages have been successfully persisted. From 3086baaf1cd1a26c357c4151776dbda2219ea6b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Wed, 30 Sep 2020 18:50:09 +0800 Subject: [PATCH 3/5] ut: add test case(TestBatchDelayMessage) for issue #367 --- pulsar/producer_test.go | 70 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 3 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 20cfd91924..b9ccbd344d 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -64,7 +64,6 @@ func TestProducerNoTopic(t *testing.T) { client, err := NewClient(ClientOptions{ URL: "pulsar://localhost:6650", }) - if err != nil { t.Fatal(err) return @@ -151,13 +150,12 @@ func TestProducerAsyncSend(t *testing.T) { } func TestProducerCompression(t *testing.T) { - type testProvider struct { name string compressionType CompressionType } - var providers = []testProvider{ + providers := []testProvider{ {"zlib", ZLib}, {"lz4", LZ4}, {"zstd", ZSTD}, @@ -705,6 +703,72 @@ func TestBatchMessageFlushing(t *testing.T) { assert.Equal(t, 2, published, "expected to publish two messages") } +// test for issue #367 +func TestBatchDelayMessage(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + batchingDelay := time.Second + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + BatchingMaxPublishDelay: batchingDelay, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "subName", + }) + assert.Nil(t, err) + defer consumer.Close() + + ctx := context.Background() + delayMsg := &ProducerMessage{ + Payload: []byte("delay: 3s"), + DeliverAfter: 3 * time.Second, + } + var delayMsgId messageID + ch := make(chan struct{}, 2) + producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { + delayMsgId = id.(messageID) + ch <- struct{}{} + }) + delayMsgPublished := false + select { + case <-ch: + delayMsgPublished = true + case <-time.After(batchingDelay): + } + assert.Equal(t, delayMsgPublished, true, "delay message is not published individually when batching is enabled") + + noDelayMsg := &ProducerMessage{ + Payload: []byte("no delay"), + } + var noDelayMsgId messageID + producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { + noDelayMsgId = id.(messageID) + }) + + for i := 0; i < 2; i++ { + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err, "unexpected error occurred when recving message from topic") + + switch msg.ID().(trackingMessageID).entryID { + case noDelayMsgId.entryID: + assert.LessOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(batchingDelay*2)) + case delayMsgId.entryID: + assert.GreaterOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(time.Second*3)) + default: + t.Fatalf("got an unexpected message from topic, id:%v", msg.ID().Serialize()) + } + } +} + func TestDelayRelative(t *testing.T) { client, err := NewClient(ClientOptions{ URL: serviceURL, From 4696caf1ee09a1efb913a6019de7018043ac22d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Wed, 30 Sep 2020 19:36:40 +0800 Subject: [PATCH 4/5] fix data race with atomic value --- pulsar/producer_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index b9ccbd344d..ad3a769f84 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -23,6 +23,7 @@ import ( "net/http" "strconv" "sync" + "sync/atomic" "testing" "time" @@ -732,10 +733,10 @@ func TestBatchDelayMessage(t *testing.T) { Payload: []byte("delay: 3s"), DeliverAfter: 3 * time.Second, } - var delayMsgId messageID + var delayMsgId int64 ch := make(chan struct{}, 2) producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { - delayMsgId = id.(messageID) + atomic.StoreInt64(&delayMsgId, id.(messageID).entryID) ch <- struct{}{} }) delayMsgPublished := false @@ -749,19 +750,18 @@ func TestBatchDelayMessage(t *testing.T) { noDelayMsg := &ProducerMessage{ Payload: []byte("no delay"), } - var noDelayMsgId messageID + var noDelayMsgId int64 producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { - noDelayMsgId = id.(messageID) + atomic.StoreInt64(&noDelayMsgId, id.(messageID).entryID) }) - for i := 0; i < 2; i++ { msg, err := consumer.Receive(context.Background()) assert.Nil(t, err, "unexpected error occurred when recving message from topic") switch msg.ID().(trackingMessageID).entryID { - case noDelayMsgId.entryID: + case atomic.LoadInt64(&noDelayMsgId): assert.LessOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(batchingDelay*2)) - case delayMsgId.entryID: + case atomic.LoadInt64(&delayMsgId): assert.GreaterOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(time.Second*3)) default: t.Fatalf("got an unexpected message from topic, id:%v", msg.ID().Serialize()) From 525e026b78419b932b63d444854f340553eb68d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A8=8B=E9=A3=9E?= Date: Wed, 30 Sep 2020 19:57:49 +0800 Subject: [PATCH 5/5] change variable style: Id -> ID --- pulsar/producer_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index ad3a769f84..2c1ff51f3a 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -733,10 +733,10 @@ func TestBatchDelayMessage(t *testing.T) { Payload: []byte("delay: 3s"), DeliverAfter: 3 * time.Second, } - var delayMsgId int64 + var delayMsgID int64 ch := make(chan struct{}, 2) producer.SendAsync(ctx, delayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { - atomic.StoreInt64(&delayMsgId, id.(messageID).entryID) + atomic.StoreInt64(&delayMsgID, id.(messageID).entryID) ch <- struct{}{} }) delayMsgPublished := false @@ -750,18 +750,18 @@ func TestBatchDelayMessage(t *testing.T) { noDelayMsg := &ProducerMessage{ Payload: []byte("no delay"), } - var noDelayMsgId int64 + var noDelayMsgID int64 producer.SendAsync(ctx, noDelayMsg, func(id MessageID, producerMessage *ProducerMessage, err error) { - atomic.StoreInt64(&noDelayMsgId, id.(messageID).entryID) + atomic.StoreInt64(&noDelayMsgID, id.(messageID).entryID) }) for i := 0; i < 2; i++ { msg, err := consumer.Receive(context.Background()) assert.Nil(t, err, "unexpected error occurred when recving message from topic") switch msg.ID().(trackingMessageID).entryID { - case atomic.LoadInt64(&noDelayMsgId): + case atomic.LoadInt64(&noDelayMsgID): assert.LessOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(batchingDelay*2)) - case atomic.LoadInt64(&delayMsgId): + case atomic.LoadInt64(&delayMsgID): assert.GreaterOrEqual(t, time.Since(msg.PublishTime()).Nanoseconds(), int64(time.Second*3)) default: t.Fatalf("got an unexpected message from topic, id:%v", msg.ID().Serialize())