diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 9d30be2a7..c9c9b7e9c 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -55,7 +55,6 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; - $this->offset = null; $this->setSerializer($serializer); } @@ -98,13 +97,16 @@ public function getQueue() */ public function receive($timeout = 0) { - if (false == $this->subscribed) { - $this->consumer->assign([new TopicPartition( - $this->getQueue()->getQueueName(), - $this->getQueue()->getPartition(), - $this->offset - )]); - + if (false === $this->subscribed) { + if (null === $this->offset) { + $this->consumer->subscribe([$this->getQueue()->getQueueName()]); + } else { + $this->consumer->assign([new TopicPartition( + $this->getQueue()->getQueueName(), + $this->getQueue()->getPartition(), + $this->offset + )]); + } $this->subscribed = true; } diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 77a949c15..6e0e5d0a2 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -50,7 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->once()) @@ -79,7 +79,7 @@ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->any()) @@ -106,6 +106,36 @@ public function testShouldSubscribeOnFirstReceiveOnly() $kafkaMessage = new Message(); $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('subscribe') + ; + $kafkaConsumer + ->expects($this->any()) + ->method('consume') + ->willReturn($kafkaMessage) + ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); + + $consumer->receive(1000); + $consumer->receive(1000); + $consumer->receive(1000); + } + + public function testShouldAssignWhenOffsetIsSet() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) @@ -124,6 +154,8 @@ public function testShouldSubscribeOnFirstReceiveOnly() $this->createSerializerMock() ); + $consumer->setOffset(123); + $consumer->receive(1000); $consumer->receive(1000); $consumer->receive(1000); @@ -139,7 +171,7 @@ public function testThrowOnOffsetChangeAfterSubscribing() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->any()) @@ -174,7 +206,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->once())