From 62e5a0462285d45ba6c160f77d42d9734d7356f3 Mon Sep 17 00:00:00 2001 From: Quentin Dreyer Date: Wed, 20 Jul 2022 12:00:16 +0200 Subject: [PATCH 1/3] fix: allow falsy keys --- pkg/rdkafka/RdKafkaProducer.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index d5380b590..59efd69ab 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -39,7 +39,7 @@ public function send(Destination $destination, Message $message): void $partition = $this->getPartition($destination, $message); $payload = $this->serializer->toString($message); - $key = $message->getKey() ?: $destination->getKey() ?: null; + $key = $message->getKey() ?? $destination->getKey() ?? null; $topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf()); From dbe56f20d4708d1ab8171286289eb505ea571623 Mon Sep 17 00:00:00 2001 From: Quentin Dreyer Date: Wed, 20 Jul 2022 12:41:20 +0200 Subject: [PATCH 2/3] test: allow falsy keys --- pkg/rdkafka/Tests/RdKafkaProducerTest.php | 60 +++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pkg/rdkafka/Tests/RdKafkaProducerTest.php b/pkg/rdkafka/Tests/RdKafkaProducerTest.php index 01c1c1b4c..512cc9f5a 100644 --- a/pkg/rdkafka/Tests/RdKafkaProducerTest.php +++ b/pkg/rdkafka/Tests/RdKafkaProducerTest.php @@ -323,6 +323,66 @@ public function testShouldGetPartitionFromDestination(): void $producer->send($destination, $message); } + public function testShouldAllowFalsyKeyFromMessage(): void + { + $key = 0; + + $kafkaTopic = $this->createKafkaTopicMock(); + $kafkaTopic + ->expects($this->once()) + ->method('producev') + ->with( + RD_KAFKA_PARTITION_UA, + 0, + '', + $key + ) + ; + + $kafkaProducer = $this->createKafkaProducerMock(); + $kafkaProducer + ->expects($this->once()) + ->method('newTopic') + ->willReturn($kafkaTopic) + ; + + $message = new RdKafkaMessage(); + $message->setKey($key); + + $producer = new RdKafkaProducer($kafkaProducer, $this->createSerializerMock()); + $producer->send(new RdKafkaTopic(''), $message); + } + + public function testShouldAllowFalsyKeyFromDestination(): void + { + $key = 0; + + $kafkaTopic = $this->createKafkaTopicMock(); + $kafkaTopic + ->expects($this->once()) + ->method('producev') + ->with( + RD_KAFKA_PARTITION_UA, + 0, + '', + $key + ) + ; + + $kafkaProducer = $this->createKafkaProducerMock(); + $kafkaProducer + ->expects($this->once()) + ->method('newTopic') + ->willReturn($kafkaTopic) + ; + + $destination = new RdKafkaTopic(''); + $destination->setKey($key); + + $producer = new RdKafkaProducer($kafkaProducer, $this->createSerializerMock()); + $producer->send($destination, new RdKafkaMessage()); + } + /** * @return \PHPUnit\Framework\MockObject\MockObject|ProducerTopic */ From 664374b1f2f66a167404ca47e29793d88d6ff8e5 Mon Sep 17 00:00:00 2001 From: qkdreyer Date: Wed, 20 Jul 2022 12:51:11 +0200 Subject: [PATCH 3/3] style: fix code style checks --- pkg/rdkafka/RdKafkaProducer.php | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/pkg/rdkafka/RdKafkaProducer.php b/pkg/rdkafka/RdKafkaProducer.php index 59efd69ab..a3c977f83 100644 --- a/pkg/rdkafka/RdKafkaProducer.php +++ b/pkg/rdkafka/RdKafkaProducer.php @@ -37,7 +37,7 @@ public function send(Destination $destination, Message $message): void InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class); InvalidMessageException::assertMessageInstanceOf($message, RdKafkaMessage::class); - $partition = $this->getPartition($destination, $message); + $partition = $message->getPartition() ?? $destination->getPartition() ?? RD_KAFKA_PARTITION_UA; $payload = $this->serializer->toString($message); $key = $message->getKey() ?? $destination->getKey() ?? null; @@ -122,21 +122,4 @@ public function flush(int $timeout): void $this->producer->flush($timeout); } } - - /** - * @param RdKafkaTopic $destination - * @param RdKafkaMessage $message - */ - private function getPartition(Destination $destination, Message $message): int - { - if (null !== $message->getPartition()) { - return $message->getPartition(); - } - - if (null !== $destination->getPartition()) { - return $destination->getPartition(); - } - - return \RD_KAFKA_PARTITION_UA; - } }