feat: propagate result code on Producer::flushMessages
err (#183)
#459
Annotations
13 warnings
Infection
The following actions uses node12 which is deprecated and will be forced to run on node16: ramsey/composer-install@v1. For more info: https://github.blog/changelog/2023-06-13-github-actions-all-actions-will-run-on-node16-instead-of-node12-by-default/
|
Infection
The following actions use a deprecated Node.js version and will be forced to run on node20: ramsey/composer-install@v1. For more info: https://github.blog/changelog/2024-03-07-github-actions-all-actions-will-run-on-node20-instead-of-node16-by-default/
|
Infection
The `save-state` command is deprecated and will be disabled soon. Please upgrade to using Environment Files. For more information see: https://github.blog/changelog/2022-10-11-github-actions-deprecating-save-state-and-set-output-commands/
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L36
Escaped Mutant for Mutator "Coalesce":
@@ @@
private bool $shouldRun = true;
public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null)
{
- $this->logger = $logger ?? new NullLogger();
+ $this->logger = new NullLogger() ?? $logger;
$this->setupInternalTerminationSignal($config);
$config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void {
$this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]);
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L38
Escaped Mutant for Mutator "MethodCallRemoval":
@@ @@
public function __construct(ConsumerConfig $config, LoggerInterface|null $logger = null)
{
$this->logger = $logger ?? new NullLogger();
- $this->setupInternalTerminationSignal($config);
+
$config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void {
$this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]);
});
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L40
Escaped Mutant for Mutator "MethodCallRemoval":
@@ @@
{
$this->logger = $logger ?? new NullLogger();
$this->setupInternalTerminationSignal($config);
- $config->getConf()->setErrorCb(function (RdKafkaConsumer $kafka, int $err, string $reason): void {
- $this->logger->error(sprintf('Kafka error: "%s": "%s"', rd_kafka_err2str($err), $reason), ['err' => $err]);
- });
+
$rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L54
Escaped Mutant for Mutator "MethodCallRemoval":
@@ @@
$rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static function (TopicPartition $partition): string {
- return (string) $partition->getPartition();
- }, $partitions));
+
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L56
Escaped Mutant for Mutator "Identical":
@@ @@
$rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static function (TopicPartition $partition): string {
+ $this->logger->debug('Assigning partitions', $partitions !== null ? [] : array_map(static function (TopicPartition $partition): string {
return (string) $partition->getPartition();
}, $partitions));
$kafka->assign($partitions);
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L56
Escaped Mutant for Mutator "UnwrapArrayMap":
@@ @@
$rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static function (TopicPartition $partition): string {
- return (string) $partition->getPartition();
- }, $partitions));
+ $this->logger->debug('Assigning partitions', $partitions === null ? [] : $partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L56
Escaped Mutant for Mutator "Ternary":
@@ @@
$rebalanceCallback = function (RdKafkaConsumer $kafka, int $err, array|null $partitions = null): void {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- $this->logger->debug('Assigning partitions', $partitions === null ? [] : array_map(static function (TopicPartition $partition): string {
+ $this->logger->debug('Assigning partitions', $partitions === null ? array_map(static function (TopicPartition $partition): string {
return (string) $partition->getPartition();
- }, $partitions));
+ }, $partitions) : []);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L82
Escaped Mutant for Mutator "MethodCallRemoval":
@@ @@
$kafka->assign();
}
};
- $config->getConf()->setRebalanceCb($rebalanceCallback);
+
parent::__construct($config->getConf());
}
/**
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L125
Escaped Mutant for Mutator "NotIdentical":
@@ @@
$consumerRecords = new ConsumerRecords();
$this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void {
$consumerRecords->add($message);
- if ($processRecord !== null) {
+ if ($processRecord === null) {
$processRecord($message);
}
if ($consumerRecords->count() === $maxBatchSize) {
|
Infection:
src/Clients/Consumer/KafkaConsumer.php#L126
Escaped Mutant for Mutator "FunctionCallRemoval":
@@ @@
$this->doStart($timeoutMs, function (Message $message) use ($maxBatchSize, $timeoutMs, $batchTime, $processRecord, $onBatchProcessed, $consumerRecords): void {
$consumerRecords->add($message);
if ($processRecord !== null) {
- $processRecord($message);
+
}
if ($consumerRecords->count() === $maxBatchSize) {
if ($onBatchProcessed !== null && !$consumerRecords->isEmpty()) {
|