Skip to content

Commit

Permalink
[fix][broker] Fix function-go can't auto ack (apache#19367)
Browse files Browse the repository at this point in the history
  • Loading branch information
quinniup authored and gongtai committed Jan 31, 2023
1 parent 1cd1aef commit 0e5441b
Showing 1 changed file with 13 additions and 2 deletions.
15 changes: 13 additions & 2 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ func (gi *goInstance) setupConsumer() (chan pulsar.ConsumerMessage, error) {
gi.stats.incrTotalSysExceptions(err)
return nil, err
}
// The key format of this consumer is: persistent://tenant/namespace/name
gi.consumers[topicName.Name] = consumer
}
return channel, nil
Expand Down Expand Up @@ -392,11 +393,21 @@ func (gi *goInstance) processResult(msgInput pulsar.Message, output []byte) {
// ackInputMessage doesn't produce any result, or the user doesn't want the result.
func (gi *goInstance) ackInputMessage(inputMessage pulsar.Message) {
log.Debugf("ack input message topic name is: %s", inputMessage.Topic())
gi.consumers[inputMessage.Topic()].Ack(inputMessage)
gi.consumers[trimTopicNamePartition(inputMessage.Topic())].Ack(inputMessage)
}

func (gi *goInstance) nackInputMessage(inputMessage pulsar.Message) {
gi.consumers[inputMessage.Topic()].Nack(inputMessage)
gi.consumers[trimTopicNamePartition(inputMessage.Topic())].Nack(inputMessage)
}

// trimTopicNamePartition Check if topic contains partition, remove partition part. Format: `persistent://tenant/namespace/name`
func trimTopicNamePartition(topic string) string {
topicName, err := ParseTopicName(topic)
if err != nil {
log.Errorf("parse topic name failed, error is: %v", err)
return ""
}
return topicName.NameWithoutPartition()
}

func getIdleTimeout(timeoutMilliSecond time.Duration) time.Duration {
Expand Down

0 comments on commit 0e5441b

Please sign in to comment.