Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka message channel #407

Merged
merged 16 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/split-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ jobs:
extensions: grpc, rdkafka
coverage: pcov

- name: Install OpenSSH (For Kafka)
run: sudo apt-get update && sudo apt-get install -y --no-install-recommends openssh-client

- name: Enable merge-plugin
run: composer global config --no-interaction allow-plugins.wikimedia/composer-merge-plugin true && composer global require wikimedia/composer-merge-plugin
working-directory: ${{ matrix.package.directory }}
Expand Down
3 changes: 3 additions & 0 deletions .github/workflows/test-monorepo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ jobs:
extensions: grpc, rdkafka
coverage: pcov

- name: Install OpenSSH (For Kafka)
run: sudo apt-get update && sudo apt-get install -y --no-install-recommends openssh-client

- uses: actions/checkout@v2

- name: Validate composer.json and composer.lock
Expand Down
2 changes: 1 addition & 1 deletion bin/check-licence.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
$fileContent = file_get_contents($file->getRealPath());

if (! preg_match('/\*\s*(@licence|licence)\s+(Enterprise|Apache\-2\.0)\s*/', $fileContent, $matches)) {
throw new \RuntimeException("Missing licence in file: " . $file->getRealPath(). "\n. You can add related licence by triggering `bin/add-apache-licence.php` or if you are changing Enterprise modules `bin/add-enterprise-licence.php`");
throw new \RuntimeException("Missing licence in file: " . $file->getRealPath(). "\n. You can add related licence by triggering `php bin/add-apache-licence.php` or if you are changing Enterprise modules `php bin/add-enterprise-licence.php`");
}
}
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: "3.8"

services:
app:
image: simplycodedsoftware/php:8.3.13
image: simplycodedsoftware/php:8.3.10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no 8.3.13, that had to be some bug

volumes:
- "$PWD:/data/app"
working_dir: "/data/app"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class OutboundMessageConverter
{
public function __construct(
private HeaderMapper $headerMapper,
private ?MediaType $defaultConversionMediaType,
private ?MediaType $defaultConversionMediaType = null,
private ?int $defaultDeliveryDelay = null,
private ?int $defaultTimeToLive = null,
private ?int $defaultPriority = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public static function create(AnnotationFinder $annotationRegistrationService, I
*/
public function prepare(Configuration $messagingConfiguration, array $extensionObjects, ModuleReferenceSearchService $moduleReferenceSearchService, InterfaceToCallRegistry $interfaceToCallRegistry): void
{
$serviceConfiguration = ExtensionObjectResolver::resolveUnique(ServiceConfiguration::class, $extensionObjects, ServiceConfiguration::createWithDefaults());
foreach ($extensionObjects as $extensionObject) {
if ($extensionObject instanceof ChannelInterceptorBuilder) {
$messagingConfiguration->registerChannelInterceptor($extensionObject);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Ecotone\Enqueue;
namespace Ecotone\Messaging\Handler\Gateway;

use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
Expand Down
4 changes: 2 additions & 2 deletions packages/Enqueue/src/EnqueueAcknowledgementCallback.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public function reject(): void
try {
$this->enqueueConsumer->reject($this->enqueueMessage);
} catch (Exception $exception) {
$this->loggingGateway->info('Failed to reject message, disconnecting Connection in order to self-heal. Failure happen due to: ' . $exception->getMessage(), exception: $exception);
$this->loggingGateway->info('Failed to reject message, disconnecting Connection in order to self-heal. Failure happen due to: ' . $exception->getMessage(), ['exception' => $exception]);

$this->connectionFactory->reconnect();

Expand All @@ -126,7 +126,7 @@ public function requeue(): void
try {
$this->enqueueConsumer->reject($this->enqueueMessage, true);
} catch (Exception $exception) {
$this->loggingGateway->info('Failed to requeue message, disconnecting Connection in order to self-heal. Failure happen due to: ' . $exception->getMessage(), exception: $exception);
$this->loggingGateway->info('Failed to requeue message, disconnecting Connection in order to self-heal. Failure happen due to: ' . $exception->getMessage(), ['exception' => $exception]);

$this->connectionFactory->reconnect();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Ecotone\Messaging\Endpoint\InboundChannelAdapterEntrypoint;
use Ecotone\Messaging\Endpoint\InterceptedChannelAdapterBuilder;
use Ecotone\Messaging\Handler\Gateway\GatewayProxyBuilder;
use Ecotone\Messaging\Handler\Gateway\NullEntrypointGateway;
use Ecotone\Messaging\Handler\InterfaceToCall;
use Ecotone\Messaging\Handler\InterfaceToCallRegistry;

Expand Down
21 changes: 19 additions & 2 deletions packages/Kafka/src/Attribute/KafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,23 @@

use Attribute;
use Ecotone\Messaging\Attribute\MessageConsumer;
use Ecotone\Messaging\Config\Container\DefinedObject;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Support\Assert;

/**
* licence Enterprise
*/
#[Attribute]
final class KafkaConsumer extends MessageConsumer
final class KafkaConsumer extends MessageConsumer implements DefinedObject
{
public function __construct(
string $endpointId,
private array|string $topics,
private ?string $groupId = null
) {
Assert::notNullAndEmpty($topics, "Topics can't be empty");

parent::__construct($endpointId);
}

Expand All @@ -31,6 +36,18 @@ public function getTopics(): array

public function getGroupId(): ?string
{
return $this->groupId;
return $this->groupId ?? $this->getEndpointId();
}

public function getDefinition(): Definition
{
return new Definition(
self::class,
[
$this->getEndpointId(),
$this->topics,
$this->groupId
]
);
}
}
40 changes: 40 additions & 0 deletions packages/Kafka/src/Channel/KafkaMessageChannel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
<?php

declare(strict_types=1);

namespace Ecotone\Kafka\Channel;

use Ecotone\Kafka\Configuration\KafkaConsumerConfiguration;
use Ecotone\Kafka\Inbound\KafkaInboundChannelAdapter;
use Ecotone\Kafka\Outbound\KafkaOutboundChannelAdapter;
use Ecotone\Messaging\Message;
use Ecotone\Messaging\PollableChannel;

/**
* licence Enterprise
*/
final class KafkaMessageChannel implements PollableChannel
{
public function __construct(
private KafkaInboundChannelAdapter $inboundChannelAdapter,
private KafkaOutboundChannelAdapter $outboundChannelAdapter
)
{

}

public function send(Message $message): void
{
$this->outboundChannelAdapter->handle($message);
}

public function receiveWithTimeout(int $timeoutInMilliseconds): ?Message
{
return $this->inboundChannelAdapter->receiveWithTimeout($timeoutInMilliseconds);
}

public function receive(): ?Message
{
return $this->inboundChannelAdapter->receiveWithTimeout(KafkaConsumerConfiguration::DEFAULT_RECEIVE_TIMEOUT);
}
}
70 changes: 70 additions & 0 deletions packages/Kafka/src/Channel/KafkaMessageChannelBuilder.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

declare(strict_types=1);

namespace Ecotone\Kafka\Channel;

use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
use Ecotone\Kafka\Configuration\KafkaConsumerConfiguration;
use Ecotone\Kafka\Configuration\KafkaPublisherConfiguration;
use Ecotone\Kafka\Inbound\KafkaInboundChannelAdapterBuilder;
use Ecotone\Kafka\Outbound\KafkaOutboundChannelAdapterBuilder;
use Ecotone\Messaging\Channel\MessageChannelBuilder;
use Ecotone\Messaging\Config\Container\Definition;
use Ecotone\Messaging\Config\Container\MessagingContainerBuilder;
use Ecotone\Messaging\Config\Container\Reference;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\MessageHandlerBuilder;
use Ramsey\Uuid\Uuid;

/**
* licence Enterprise
*/
final class KafkaMessageChannelBuilder implements MessageChannelBuilder
{
private function __construct(
private string $channelName,
public readonly string $topicName,
public readonly string $groupId,
)
{
}

public static function create(
string $channelName,
?string $topicName = null,
?string $groupId = null
): self
{
return new self(
$channelName,
$topicName ?? $channelName,
$groupId ?? $channelName,
);
}

public function compile(MessagingContainerBuilder $builder): Definition|Reference
{
return new Definition(
KafkaMessageChannel::class,
[
KafkaInboundChannelAdapterBuilder::create(
$this->channelName,
)->compile($builder),
KafkaOutboundChannelAdapterBuilder::create(
$this->channelName,
)->compile($builder)
]
);
}

public function getMessageChannelName(): string
{
return $this->channelName;
}

public function isPollable(): bool
{
return true;
}
}
67 changes: 58 additions & 9 deletions packages/Kafka/src/Configuration/KafkaAdmin.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
namespace Ecotone\Kafka\Configuration;

use Ecotone\Messaging\Config\ConfigurationException;
use Ecotone\Kafka\Attribute\KafkaConsumer as KafkaConsumerAttribute;
use Ecotone\Messaging\Support\Assert;
use RdKafka\KafkaConsumer;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
use RdKafka\TopicConf;
use RdKafka\TopicPartition;

/**
* licence Enterprise
Expand All @@ -20,27 +24,37 @@ final class KafkaAdmin
private array $initializedProducers = [];

/**
* @var KafkaConsumer[]
*/
private array $initializedConsumers = [];

/**
* @param KafkaConsumerAttribute[] $kafkaConsumers
* @param KafkaConsumerConfiguration[] $consumerConfigurations
* @param TopicConfiguration[] $topicConfigurations
* @param KafkaPublisherConfiguration[] $publisherConfigurations
* @param array<string, KafkaBrokerConfiguration> $kafkaBrokerConfigurations
*/
public function __construct(
private array $kafkaConsumers,
private array $consumerConfigurations,
private array $topicConfigurations,
private array $publisherConfigurations,
private array $kafkaBrokerConfigurations,
private bool $isRunningInTestMode
) {

}

public static function createEmpty(): self
{
return new self([], [], []);
return new self([], [], [], [], [], false);
}

public function getConfigurationForConsumer(string $endpointId): KafkaConsumerConfiguration
{
if (! array_key_exists($endpointId, $this->consumerConfigurations)) {
return KafkaConsumerConfiguration::createWithDefaults($endpointId);
return KafkaConsumerConfiguration::createWithDefaults($endpointId)->enableKafkaDebugging();
}

return $this->consumerConfigurations[$endpointId];
Expand All @@ -55,17 +69,47 @@ public function getConfigurationForTopic(string $topicName): TopicConf
return $this->topicConfigurations[$topicName]->getConfig();
}

private function getConfigurationForPublisher(string $referenceName): KafkaPublisherConfiguration
public function getConfigurationForPublisher(string $referenceName): KafkaPublisherConfiguration
{
return $this->publisherConfigurations[$referenceName] ?? throw ConfigurationException::create("Publisher configuration for {$referenceName} not found");
}

public function getProducer(string $referenceName, KafkaBrokerConfiguration $kafkaBrokerConfiguration): Producer
public function getConsumer(string $endpointId): KafkaConsumer
{
if (! array_key_exists($referenceName, $this->initializedProducers)) {
$conf = $this->getConfigurationForPublisher($referenceName);
$conf = $conf->getAsKafkaConfig();
if (! array_key_exists($endpointId, $this->initializedConsumers)) {
Assert::keyExists($this->kafkaConsumers, $endpointId, "Consumer with endpoint id {$endpointId} not found");

$configuration = $this->getConfigurationForConsumer($endpointId);
$conf = $configuration->getConfig();
$conf->set('group.id', $this->kafkaConsumers[$endpointId]->getGroupId());
$kafkaBrokerConfiguration = $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()];
$conf->set('bootstrap.servers', implode(',', $kafkaBrokerConfiguration->getBootstrapServers()));
$consumer = new KafkaConsumer($conf);

if ($this->isRunningForTests($kafkaBrokerConfiguration)) {
// ensures there is no need for repartitioning
$consumer->assign([new TopicPartition($this->kafkaConsumers[$endpointId]->getTopics()[0], 0)]);
}else {
$consumer->subscribe($this->kafkaConsumers[$endpointId]->getTopics());
}

foreach ($this->kafkaConsumers[$endpointId]->getTopics() as $topic) {
$consumer->subscribe([$topic]);
}
$consumer->assign([new TopicPartition($this->kafkaConsumers[$endpointId]->getTopics()[0], 0)]);

$this->initializedConsumers[$endpointId] = $consumer;
}

return $this->initializedConsumers[$endpointId];
}

public function getProducer(string $referenceName): Producer
{
if (! array_key_exists($referenceName, $this->initializedProducers)) {
$configuration = $this->getConfigurationForPublisher($referenceName);
$conf = $configuration->getAsKafkaConfig();
$conf->set('bootstrap.servers', implode(',', $this->kafkaBrokerConfigurations[$configuration->getBrokerConfigurationReference()]->getBootstrapServers()));
$producer = new Producer($conf);

$this->initializedProducers[$referenceName] = $producer;
Expand All @@ -74,14 +118,19 @@ public function getProducer(string $referenceName, KafkaBrokerConfiguration $kaf
return $this->initializedProducers[$referenceName];
}

public function getTopicForProducer(string $referenceName, KafkaBrokerConfiguration $kafkaBrokerConfiguration): ProducerTopic
public function getTopicForProducer(string $referenceName): ProducerTopic
{
$producer = $this->getProducer($referenceName, $kafkaBrokerConfiguration);
$producer = $this->getProducer($referenceName);
$topicName = $this->getConfigurationForPublisher($referenceName)->getDefaultTopicName();

return $producer->newTopic(
$topicName,
$this->getConfigurationForTopic($topicName)
);
}

private function isRunningForTests(KafkaBrokerConfiguration $kafkaBrokerConfiguration): bool
{
return ($this->isRunningInTestMode && $kafkaBrokerConfiguration->isSetupForTesting() === null) || $kafkaBrokerConfiguration->isSetupForTesting() === true;
}
}
Loading
Loading