Skip to content

Commit

Permalink
#1229 Allow return null in kafka Serializer (delete messages aka Tomb…
Browse files Browse the repository at this point in the history
…stone)
  • Loading branch information
Pavel Alexeev authored and Pavel Alexeev committed Jan 11, 2022
1 parent 66f15d0 commit 09afec8
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 17 deletions.
26 changes: 11 additions & 15 deletions pkg/rdkafka/JsonSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,30 @@

class JsonSerializer implements Serializer
{
public function toString(RdKafkaMessage $message): string
public function toString(RdKafkaMessage $message): ?string
{
$json = json_encode([
'body' => $message->getBody(),
'properties' => $message->getProperties(),
'headers' => $message->getHeaders(),
]);

if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
if (\JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg()));
}

return $json;
}

public function toMessage(string $string): RdKafkaMessage
public function toMessage(?string $string): RdKafkaMessage
{
$data = json_decode($string, true);
if (JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf(
'The malformed json given. Error %s and message %s',
json_last_error(),
json_last_error_msg()
));
if (null !== $string) {
$data = json_decode($string, true);
if (\JSON_ERROR_NONE !== json_last_error()) {
throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg()));
}
} else {
$data = ['body' => null, 'properties' => null, 'headers' => 'headers'];
}

return new RdKafkaMessage($data['body'], $data['properties'], $data['headers']);
Expand Down
4 changes: 2 additions & 2 deletions pkg/rdkafka/Serializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

interface Serializer
{
public function toString(RdKafkaMessage $message): string;
public function toString(RdKafkaMessage $message): ?string;

public function toMessage(string $string): RdKafkaMessage;
public function toMessage(?string $string): RdKafkaMessage;
}

0 comments on commit 09afec8

Please sign in to comment.