diff --git a/pkg/rdkafka/JsonSerializer.php b/pkg/rdkafka/JsonSerializer.php index ae161ca08..b65f2bec6 100644 --- a/pkg/rdkafka/JsonSerializer.php +++ b/pkg/rdkafka/JsonSerializer.php @@ -6,7 +6,7 @@ class JsonSerializer implements Serializer { - public function toString(RdKafkaMessage $message): string + public function toString(RdKafkaMessage $message): ?string { $json = json_encode([ 'body' => $message->getBody(), @@ -14,26 +14,22 @@ public function toString(RdKafkaMessage $message): string 'headers' => $message->getHeaders(), ]); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); } return $json; } - public function toMessage(string $string): RdKafkaMessage + public function toMessage(?string $string): RdKafkaMessage { + if (null === $string) { + return new RdKafkaMessage(null, null, null); + } + $data = json_decode($string, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); } return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']); diff --git a/pkg/rdkafka/Serializer.php b/pkg/rdkafka/Serializer.php index 7e2a116ed..b37d42f57 100644 --- a/pkg/rdkafka/Serializer.php +++ b/pkg/rdkafka/Serializer.php @@ -6,7 +6,7 @@ interface Serializer { - public function toString(RdKafkaMessage $message): string; + public function toString(RdKafkaMessage $message): ?string; - public function toMessage(string $string): RdKafkaMessage; + public function toMessage(?string $string): RdKafkaMessage; }