From 09afec816b7f58808bc8f8428167f14746007c30 Mon Sep 17 00:00:00 2001 From: Pavel Alexeev Date: Wed, 12 Jan 2022 02:12:39 +0300 Subject: [PATCH 1/2] #1229 Allow return null in kafka Serializer (delete messages aka Tombstone) --- pkg/rdkafka/JsonSerializer.php | 26 +++++++++++--------------- pkg/rdkafka/Serializer.php | 4 ++-- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php index ae161ca08..1d98c47cb 100644 --- a/pkg/rdkafka/JsonSerializer.php +++ b/pkg/rdkafka/JsonSerializer.php @@ -6,7 +6,7 @@ class JsonSerializer implements Serializer { - public function toString(RdKafkaMessage $message): string + public function toString(RdKafkaMessage $message): ?string { $json = json_encode([ 'body' => $message->getBody(), @@ -14,26 +14,22 @@ public function toString(RdKafkaMessage $message): string 'headers' => $message->getHeaders(), ]); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); } return $json; } - public function toMessage(string $string): RdKafkaMessage + public function toMessage(?string $string): RdKafkaMessage { - $data = json_decode($string, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + if (null !== $string) { + $data = json_decode($string, true); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); + } + } else { + $data = ['body' => null, 'properties' => null, 'headers' => 'headers']; } return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']); diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php index 7e2a116ed..b37d42f57 100644 --- a/pkg/rdkafka/Serializer.php +++ b/pkg/rdkafka/Serializer.php @@ -6,7 +6,7 @@ interface Serializer { - public function toString(RdKafkaMessage $message): string; + public function toString(RdKafkaMessage $message): ?string; - public function toMessage(string $string): RdKafkaMessage; + public function toMessage(?string $string): RdKafkaMessage; } From 90192b24083b50a2dc7b75db8819e321b3dcc9de Mon Sep 17 00:00:00 2001 From: Pavel Alexeev Date: Mon, 17 Jan 2022 01:51:16 +0300 Subject: [PATCH 2/2] Prefer early return for readability (by PR 1230 review comments) --- pkg/rdkafka/JsonSerializer.php | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php index 1d98c47cb..b65f2bec6 100644 --- a/pkg/rdkafka/JsonSerializer.php +++ b/pkg/rdkafka/JsonSerializer.php @@ -23,13 +23,13 @@ public function toString(RdKafkaMessage $message): ?string public function toMessage(?string $string): RdKafkaMessage { - if (null !== $string) { - $data = json_decode($string, true); - if (\JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); - } - } else { - $data = ['body' => null, 'properties' => null, 'headers' => 'headers']; + if (null === $string) { + return new RdKafkaMessage(null, null, null); + } + + $data = json_decode($string, true); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); } return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']);