From ee6459372cee08098ec73fdb09b84859252461c0 Mon Sep 17 00:00:00 2001 From: Alexander Miehe Date: Wed, 15 Aug 2018 15:49:35 +0200 Subject: [PATCH 1/2] Allow either to use assign or subscribe With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign. --- pkg/rdkafka/RdKafkaConsumer.php | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 9d30be2a7..991ba210c 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -55,8 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; - $this->offset = null; - + $this->setSerializer($serializer); } @@ -99,12 +98,15 @@ public function getQueue() public function receive($timeout = 0) { if (false == $this->subscribed) { - $this->consumer->assign([new TopicPartition( - $this->getQueue()->getQueueName(), - $this->getQueue()->getPartition(), - $this->offset - )]); - + if ($this->offset === null) { + $this->consumer->subscribe([$this->getQueue()->getQueueName()]); + } else { + $this->consumer->assign([new TopicPartition( + $this->getQueue()->getQueueName(), + $this->getQueue()->getPartition(), + $this->offset + ), ]); + } $this->subscribed = true; } From 2bd940851de115cd247acc232157b7c7aafd0c2f Mon Sep 17 00:00:00 2001 From: Alexander Miehe Date: Thu, 16 Aug 2018 10:02:18 +0200 Subject: [PATCH 2/2] * change existing tests to use subscribe * add test that when a offset is set we use assign --- pkg/rdkafka/RdKafkaConsumer.php | 8 ++--- pkg/rdkafka/Tests/RdKafkaConsumerTest.php | 40 ++++++++++++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 991ba210c..c9c9b7e9c 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -55,7 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd $this->topic = $topic; $this->subscribed = false; $this->commitAsync = false; - + $this->setSerializer($serializer); } @@ -97,15 +97,15 @@ public function getQueue() */ public function receive($timeout = 0) { - if (false == $this->subscribed) { - if ($this->offset === null) { + if (false === $this->subscribed) { + if (null === $this->offset) { $this->consumer->subscribe([$this->getQueue()->getQueueName()]); } else { $this->consumer->assign([new TopicPartition( $this->getQueue()->getQueueName(), $this->getQueue()->getPartition(), $this->offset - ), ]); + )]); } $this->subscribed = true; } diff --git a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php index 77a949c15..6e0e5d0a2 100644 --- a/pkg/rdkafka/Tests/RdKafkaConsumerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConsumerTest.php @@ -50,7 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->once()) @@ -79,7 +79,7 @@ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->any()) @@ -106,6 +106,36 @@ public function testShouldSubscribeOnFirstReceiveOnly() $kafkaMessage = new Message(); $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + $kafkaConsumer = $this->createKafkaConsumerMock(); + $kafkaConsumer + ->expects($this->once()) + ->method('subscribe') + ; + $kafkaConsumer + ->expects($this->any()) + ->method('consume') + ->willReturn($kafkaMessage) + ; + + $consumer = new RdKafkaConsumer( + $kafkaConsumer, + $this->createContextMock(), + $destination, + $this->createSerializerMock() + ); + + $consumer->receive(1000); + $consumer->receive(1000); + $consumer->receive(1000); + } + + public function testShouldAssignWhenOffsetIsSet() + { + $destination = new RdKafkaTopic('dest'); + + $kafkaMessage = new Message(); + $kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) @@ -124,6 +154,8 @@ public function testShouldSubscribeOnFirstReceiveOnly() $this->createSerializerMock() ); + $consumer->setOffset(123); + $consumer->receive(1000); $consumer->receive(1000); $consumer->receive(1000); @@ -139,7 +171,7 @@ public function testThrowOnOffsetChangeAfterSubscribing() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->any()) @@ -174,7 +206,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue() $kafkaConsumer = $this->createKafkaConsumerMock(); $kafkaConsumer ->expects($this->once()) - ->method('assign') + ->method('subscribe') ; $kafkaConsumer ->expects($this->once())