From f6d4939ca4122a58078f94b518cab7bc72cde331 Mon Sep 17 00:00:00 2001 From: Rueian Date: Mon, 15 Feb 2021 23:13:52 +0800 Subject: [PATCH] Fix reader with start latest message id inclusive (#329) Fixes #356 Fixes #419 Motivtions The changes made by the original PR #329 are no longer works. This PR is trying to fix the bugs and make the test case be more robust. Moditications * fix the wrong comparison `pc.startMessageID == lastestMessageID` with `pc.startMessageID.equal(lastestMessageID.(messageID))` * fix the hanging `pc.requestSeek(msgID.messageID)` with `pc.requestSeekWithoutClear(msgID.messageID)` before the dispatch loop * make the `TestReaderLatestInclusiveHasNext` test case be more robust --- pulsar/consumer_partition.go | 15 +++++++++++---- pulsar/reader_test.go | 12 +++++++----- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index b75a7d771b..031e0a3a6d 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -185,7 +185,7 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon pc.log.Info("Created consumer") pc.setConsumerState(consumerReady) - if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID { + if pc.options.startMessageIDInclusive && pc.startMessageID.equal(lastestMessageID.(messageID)) { msgID, err := pc.requestGetLastMessageID() if err != nil { pc.nackTracker.Close() @@ -194,7 +194,8 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if msgID.entryID != noMessageEntry { pc.startMessageID = msgID - err = pc.requestSeek(msgID.messageID) + // use the WithoutClear version because the dispatcher is not started yet + err = pc.requestSeekWithoutClear(msgID.messageID) if err != nil { pc.nackTracker.Close() return nil, err @@ -369,8 +370,15 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) { defer close(seek.doneCh) seek.err = pc.requestSeek(seek.msgID.messageID) } - func (pc *partitionConsumer) requestSeek(msgID messageID) error { + if err := pc.requestSeekWithoutClear(msgID); err != nil { + return err + } + pc.clearMessageChannels() + return nil +} + +func (pc *partitionConsumer) requestSeekWithoutClear(msgID messageID) error { state := pc.getConsumerState() if state == consumerClosing || state == consumerClosed { pc.log.WithField("state", state).Error("Consumer is closing or has closed") @@ -396,7 +404,6 @@ func (pc *partitionConsumer) requestSeek(msgID messageID) error { pc.log.WithError(err).Error("Failed to reset to message id") return err } - pc.clearMessageChannels() return nil } diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 36204cf284..cd97964a07 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -577,10 +577,12 @@ func TestReaderLatestInclusiveHasNext(t *testing.T) { assert.Nil(t, err) defer reader.Close() - if reader.HasNext() { - msg, err := reader.Next(context.Background()) - assert.NoError(t, err) + assert.True(t, reader.HasNext()) + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) - assert.Equal(t, []byte("hello-9"), msg.Payload()) - } + assert.Equal(t, []byte("hello-9"), msg.Payload()) + assert.Equal(t, lastMsgID.Serialize(), msg.ID().Serialize()) + + assert.False(t, reader.HasNext()) }