Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-328] gets last message when LatestMessageID and inclusive #329

Merged
merged 4 commits into from
Aug 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions pulsar/consumer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var (
Help: "Time it takes for application to process messages",
Buckets: []float64{.0005, .001, .005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10},
})

lastestMessageID = LatestMessageID()
)

type consumerState int
Expand All @@ -98,6 +100,10 @@ const (
nonDurable
)

const (
noMessageEntry = -1
)

type partitionConsumerOpts struct {
topic string
consumerName string
Expand Down Expand Up @@ -191,6 +197,21 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon
pc.log.Info("Created consumer")
pc.state = consumerReady

if pc.options.startMessageIDInclusive && pc.startMessageID == lastestMessageID {
msgID, err := pc.requestGetLastMessageID()
if err != nil {
return nil, err
}
if msgID.entryID != noMessageEntry {
pc.startMessageID = msgID

err = pc.requestSeek(msgID)
if err != nil {
return nil, err
}
}
}

go pc.dispatcher()

go pc.runEventsLoop()
Expand Down Expand Up @@ -250,7 +271,10 @@ func (pc *partitionConsumer) getLastMessageID() (messageID, error) {

func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest) {
defer close(req.doneCh)
req.msgID, req.err = pc.requestGetLastMessageID()
}

func (pc *partitionConsumer) requestGetLastMessageID() (messageID, error) {
requestID := pc.client.rpcClient.NewRequestID()
cmdGetLastMessageID := &pb.CommandGetLastMessageId{
RequestId: proto.Uint64(requestID),
Expand All @@ -260,11 +284,10 @@ func (pc *partitionConsumer) internalGetLastMessageID(req *getLastMsgIDRequest)
pb.BaseCommand_GET_LAST_MESSAGE_ID, cmdGetLastMessageID)
if err != nil {
pc.log.WithError(err).Error("Failed to get last message id")
req.err = err
} else {
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
req.msgID = convertToMessageID(id)
return messageID{}, err
}
id := res.Response.GetLastMessageIdResponse.GetLastMessageId()
return convertToMessageID(id), nil
}

func (pc *partitionConsumer) AckID(msgID messageID) {
Expand Down Expand Up @@ -340,17 +363,20 @@ func (pc *partitionConsumer) Seek(msgID messageID) error {

func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
defer close(seek.doneCh)
seek.err = pc.requestSeek(seek.msgID)
}

func (pc *partitionConsumer) requestSeek(msgID messageID) error {
if pc.state == consumerClosing || pc.state == consumerClosed {
pc.log.Error("Consumer was already closed")
return
return nil
}

id := &pb.MessageIdData{}
err := proto.Unmarshal(seek.msgID.Serialize(), id)
err := proto.Unmarshal(msgID.Serialize(), id)
if err != nil {
pc.log.WithError(err).Errorf("deserialize message id error: %s", err.Error())
seek.err = err
return err
}

requestID := pc.client.rpcClient.NewRequestID()
Expand All @@ -363,8 +389,9 @@ func (pc *partitionConsumer) internalSeek(seek *seekRequest) {
_, err = pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, pb.BaseCommand_SEEK, cmdSeek)
if err != nil {
pc.log.WithError(err).Error("Failed to reset to message id")
seek.err = err
return err
}
return nil
}

func (pc *partitionConsumer) SeekByTime(time time.Time) error {
Expand Down
63 changes: 63 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,3 +446,66 @@ func TestReaderOnSpecificMessageWithCustomMessageID(t *testing.T) {
assert.Equal(t, []byte(expectMsg), msg.Payload())
}
}

func TestReaderLatestInclusiveHasNext(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()

// create reader on the last message (inclusive)
reader0, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: LatestMessageID(),
StartMessageIDInclusive: true,
})

assert.Nil(t, err)
defer reader0.Close()

assert.False(t, reader0.HasNext())

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
var lastMsgID MessageID
for i := 0; i < 10; i++ {
lastMsgID, err = producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, lastMsgID)
}

// create reader on the last message (inclusive)
reader, err := client.CreateReader(ReaderOptions{
Topic: topic,
StartMessageID: LatestMessageID(),
StartMessageIDInclusive: true,
})

assert.Nil(t, err)
defer reader.Close()

var msgID MessageID
if reader.HasNext() {
msg, err := reader.Next(context.Background())
assert.NoError(t, err)

assert.Equal(t, []byte("hello-9"), msg.Payload())
msgID = msg.ID()
}

assert.Equal(t, lastMsgID.Serialize(), msgID.Serialize())
}