diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..26d931c
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+/vendor
+/.phpcs-cache
+/.phpunit.result.cache
+/composer.lock
+/phpcs.xml
+/phpstan.neon
+/phpunit.xml
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..5f9f310
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,44 @@
+language: php
+php:
+ - 7.2
+ - 7.3
+ - 7.4snapshot
+ - nightly
+
+env:
+ - DEPENDENCIES=
+ - DEPENDENCIES=--prefer-lowest
+
+install:
+ composer update --no-interaction --prefer-stable --prefer-dist $DEPENDENCIES
+
+script: make test
+
+stages:
+ - Validate composer.json
+ - Code Quality
+ - Test
+
+jobs:
+ allow_failures:
+ - php: 7.4snapshot
+ - php: nightly
+
+ include:
+ - stage: Validate composer.json
+ name: Validate composer.json
+ php: 7.2
+ script:
+ - make validate-composer
+ - composer global require maglnet/composer-require-checker
+ - $(composer config -g home)/vendor/bin/composer-require-checker check composer.json
+ - stage: Code Quality
+ name: Lint
+ php: 7.2
+ script: make lint
+ - name: Coding Standard
+ php: 7.2
+ script: make cs
+ - name: Static Analysis
+ php: 7.2
+ script: make static-analysis
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..8455dc6
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,72 @@
+### BEGIN main targets
+
+.PHONY: build
+build: vendor
+
+.PHONY: list
+list:
+ @$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$'
+
+### END
+
+### BEGIN secondary targets
+
+.PHONY: vendor
+vendor: vendor/lock
+
+vendor/lock: composer.json
+ $(MAKE) validate-composer
+ $(MAKE) update
+ touch vendor/lock
+
+.PHONY: update
+update:
+ composer update
+
+### END
+
+### BEGIN tests
+
+.PHONY: test
+test:
+ vendor/bin/phpunit $(PHPUNIT_ARGS)
+
+.PHONY: lint
+lint:
+ vendor/bin/parallel-lint src/ tests/
+
+.PHONY: cs
+cs: vendor
+ vendor/bin/phpcs $(PHPCS_ARGS)
+
+.PHONY: cs-fix
+cs-fix: vendor
+ vendor/bin/phpcbf
+
+.PHONY: static-analysis
+static-analysis: vendor
+ vendor/bin/phpstan analyse
+
+.PHONY: validate-composer
+validate-composer:
+ composer validate --strict
+
+.PHONY: check
+check: build validate-composer lint cs static-analysis test
+
+### END
+
+### BEGIN cleaning
+
+.PHONY: clean
+clean: clean-cache clean-vendor
+
+.PHONY: clean-cache
+clean-cache:
+ rm -rf var/cache/*
+
+.PHONY: clean-vendor
+clean-vendor:
+ rm -rf vendor
+
+### END
diff --git a/README.md b/README.md
index f988f0f..f229dee 100644
--- a/README.md
+++ b/README.md
@@ -1 +1,115 @@
-# php-kafka-rest-client
\ No newline at end of file
+kafka-rest-client [![Build Status](https://travis-ci.org/grongor/php-kafka-rest-client.svg?branch=master)](https://travis-ci.org/grongor/php-kafka-rest-client)
+=================
+
+This is a modern PHP client for [Confluent REST Proxy](https://docs.confluent.io/current/kafka-rest/) (version 2).
+
+HTTP client used for communication with API adheres to the modern [HTTP standards](http://php-http.org) like
+[PSR-7](https://www.php-fig.org/psr/psr-7/), [PSR-17](https://www.php-fig.org/psr/psr-17/)
+and [PSR-18](https://www.php-fig.org/psr/psr-18/).
+
+Aside from implementing the REST Proxy API this library adds some convenient classes useful for the most interactions
+with Apache Kafka - see [examples](#Examples) for the whole list and a simple tutorial on how to use them.
+
+Missing features
+----------------
+
+Some features were deliberately skipped to simplify the implementation/shorten the development time.
+If you need anything and you're willing create a pull request, I'd be happy to check it out
+and (if it makes sense) merge it. You can start the discussion by opening an issue.
+
+- version 1 of the REST Proxy API
+ - It's easy to upgrade the REST Proxy, so I don't see any point in implementing the first version.
+- AVRO embedded format
+- JSON embedded format
+ - I think that the binary format is sufficient. You can always serialize/deserialize your objects before/after
+ they interact with this library, therefore direct integration here is just a complication.
+- async operations
+ - To simplify the library all the methods are synchronous. That might definitely change if the need arises.
+- all not-yet-implemented features mentioned [here](https://docs.confluent.io/current/kafka-rest/#features)
+ - When these features are implemented, they will be added here ASAP.
+
+Examples
+--------
+
+### Producer
+
+Producer allows you to produce messages, either one by one or in batches. Both use the same underlying API method
+so it's more efficient to use the batch one.
+
+Both `produce` and `produceBatch` return nothing on success and throw an exception on failure.
+There might be partial success/failure so the thrown exception `FailedToProduceMessages` contains two public properties,
+`$retryable` and `$nonRetryable`, where each contains an array of
+`['error' => 'error provided by Kafka', message => (Message object given to produce)]`.
+Whether the error is [non-]retryable is based on the `error_code` as documented
+[here](https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name).
+It's up to you what you do with those.
+
+```php
+$producer = new Producer($restClient);
+$producer->produce('your-topic', new Message('some-message'));
+
+$messages = [new Message('some-message'), new Message('and-some-other-message')];
+$producer->produceBatch('your-topic', $messages);
+```
+
+### Consumer
+
+Consumer allows you to consume messages one by one until you return from the loop, application throws exception
+or is otherwise forced to exit.
+The `consume` method returns a [Generator](https://www.php.net/manual/en/class.generator.php) and loops indefinitely,
+yielding messages as they are available. `consume` method accepts two parameters: `timeout` and `maxBytes`.
+`timeout` is the maximum time the consumer will wait for the messages. `maxBytes` is the maximum size of the messages
+to fetch in a single request. Both of these settings are complementary to the settings in `ConsumerOptions` and to the
+server settings (check the Kafka documentation for more information).
+
+If you didn't set the consumer to auto-commit messages then you should use consumers `commit` method to do so. Simply
+pass it a message you wish to commit. For most cases it's recommended to leave auto-commit off and manually commit
+each message as you process them.
+
+```php
+$consumerFactory = new ConsumerFactory($restClient);
+$consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic'));
+foreach ($consumer->consume() as $message) {
+ // Do your magic
+ $logger->info('got new message', $message->content);
+
+ // ... and when you are done, commit the message.
+ $consumer->commit($message);
+}
+```
+
+### BatchConsumer
+
+BatchConsumer works the same way as Consumer does; the difference is that BatchConsumer doesn't yield each message
+separately but first puts them in batches (`MessagesBatch`). These batches can be configured to be "limited" by either
+count of messages in the batch (`maxCount`), by time (`maxDuration`) or by both (setting both `maxCount` and
+`maxDuration`). If you set `maxDuration` then the batch won't ever take longer than that (+ few ms for processing) as
+it changes the `timeout` parameter of the consumer (consumer won't get stuck on network waiting for more messages).
+
+BatchConsumer can be quite useful for processing of "large" data sets, where you would have to otherwise batch
+the messages yourself (eg. for inserting into database, where batch operations are always better).
+
+As mentioned in the Consumer example, you may need to commit the messages. For that there is a `commit` method,
+which accepts the yielded `MessagesBatch` and commits all the messages inside it in one request.
+
+```php
+$batchConsumerFactory = new BatchConsumerFactory($restClient);
+$batchConsumer = $batchConsumerFactory->create(
+ 'your-consumer-group',
+ Subscription::topic('your-topic'),
+ $maxCount = 10000, // Yield the batch when there is 10 000 messages in it
+ $maxDuration = 60 // or when 60 seconds passes, whichever comes first.
+);
+foreach ($batchConsumer->consume() as $messagesBatch) {
+ // The batch might be empty if you specified the maxDuration.
+ if ($messagesBatch->isEmpty()) {
+ continue;
+ }
+
+ // Do your magic
+ $database->insert($messagesBatch->getMessages());
+
+ // ... and when you are done, commit the batch.
+ $batchConsumer->commit($messagesBatch);
+}
+```
diff --git a/composer.json b/composer.json
new file mode 100644
index 0000000..3685d2e
--- /dev/null
+++ b/composer.json
@@ -0,0 +1,54 @@
+{
+ "name": "grongor/kafka-rest-client",
+ "description": "Client for Confluent REST Proxy of Apache Kafka",
+ "type": "library",
+ "license": "MIT",
+ "keywords": ["kafka", "confluent", "proxy", "rest", "client", "librdkafka", "rdkafka"],
+ "config": {
+ "sort-packages": true
+ },
+ "require": {
+ "php": "^7.2|^8.0",
+ "beberlei/assert": "^3.2",
+ "jms/serializer": "^3.3",
+ "lcobucci/clock": "^1.2",
+ "psr/http-client": "^1.0",
+ "psr/http-factory": "^1.0",
+ "psr/http-message": "^1.0 >=1.0.1",
+ "psr/log": "^1.1",
+ "teapot/status-code": "^1.1",
+ "thecodingmachine/safe": "^0.1.16"
+ },
+ "require-dev": {
+ "cdn77/coding-standard": "^2.0",
+ "guzzlehttp/psr7": "^1.6",
+ "http-interop/http-factory-guzzle": "^1.0",
+ "jakub-onderka/php-parallel-lint": "^1.0.0",
+ "mockery/mockery": "^1.2 >=1.2.3",
+ "php-http/message": "^1.0",
+ "phpstan/extension-installer": "^1.0",
+ "phpstan/phpstan": "^0.11.19",
+ "phpstan/phpstan-beberlei-assert": "^0.11.2",
+ "phpstan/phpstan-mockery": "^0.11.3",
+ "phpstan/phpstan-phpunit": "^0.11.2",
+ "phpstan/phpstan-strict-rules": "^0.11.1",
+ "phpunit/phpunit": "^8.0",
+ "roave/security-advisories": "dev-master",
+ "slevomat/coding-standard": "dev-master as 5.0.4",
+ "thecodingmachine/phpstan-safe-rule": "^0.1.4"
+ },
+ "conflict": {
+ "php-http/httplug": "^1.0",
+ "doctrine/annotations": "<1.7.0"
+ },
+ "autoload": {
+ "psr-4": {
+ "Grongor\\KafkaRest\\": "src/"
+ }
+ },
+ "autoload-dev": {
+ "psr-4": {
+ "Grongor\\KafkaRest\\Tests\\": "tests/"
+ }
+ }
+}
diff --git a/phpcs.xml.dist b/phpcs.xml.dist
new file mode 100644
index 0000000..b606c4f
--- /dev/null
+++ b/phpcs.xml.dist
@@ -0,0 +1,12 @@
+
+
+
+
+
+
+ src/
+ tests/
+
diff --git a/phpstan.neon.dist b/phpstan.neon.dist
new file mode 100644
index 0000000..ff1ee6a
--- /dev/null
+++ b/phpstan.neon.dist
@@ -0,0 +1,6 @@
+parameters:
+ memory-limit: -1
+ level: max
+ paths:
+ - %currentWorkingDirectory%/src
+ - %currentWorkingDirectory%/tests
diff --git a/phpunit.xml.dist b/phpunit.xml.dist
new file mode 100644
index 0000000..025b9cd
--- /dev/null
+++ b/phpunit.xml.dist
@@ -0,0 +1,22 @@
+
+
+
+
+
+ tests
+
+
+
+
+ src/
+
+
+
diff --git a/src/Api/Exception/UnexpectedApiResponse.php b/src/Api/Exception/UnexpectedApiResponse.php
new file mode 100644
index 0000000..3a37d29
--- /dev/null
+++ b/src/Api/Exception/UnexpectedApiResponse.php
@@ -0,0 +1,26 @@
+ $response
+ */
+ public static function fromResponse(int $httpCode, array $response) : self
+ {
+ return new self(
+ sprintf('Unexpected response HTTP %d: [%d] %s', $httpCode, $response['error_code'], $response['message'])
+ );
+ }
+}
diff --git a/src/Api/HttpRestClient.php b/src/Api/HttpRestClient.php
new file mode 100644
index 0000000..ff6442d
--- /dev/null
+++ b/src/Api/HttpRestClient.php
@@ -0,0 +1,361 @@
+client = $client;
+ $this->logger = $logger;
+ $this->requestFactory = $requestFactory;
+ $this->serializer = $serializer;
+ $this->streamFactory = $streamFactory;
+ $this->uriFactory = $uriFactory;
+
+ $this->apiUri = $this->uriFactory->createUri($apiUri);
+ }
+
+ /** {@inheritDoc} */
+ public function listTopics() : array
+ {
+ /** @var array $topics */
+ $topics = $this->execute($this->get('/topics'), 'array');
+
+ return $topics;
+ }
+
+ /** {@inheritDoc} */
+ public function getTopic(string $topic) : Topic
+ {
+ /** @var Topic $result */
+ $result = $this->execute($this->get(sprintf('/topics/%s', $topic)), Topic::class);
+
+ return $result;
+ }
+
+ /** {@inheritDoc} */
+ public function produce(string $topic, iterable $messages) : array
+ {
+ /** @var ProduceResults $result */
+ $result = $this->execute(
+ $this->post(sprintf('/topics/%s', $topic), $this->serialize(['records' => $messages])),
+ ProduceResults::class
+ );
+
+ return $result->results;
+ }
+
+ /** {@inheritDoc} */
+ public function listPartitions(string $topic) : array
+ {
+ /** @var Partition[] $partitions */
+ $partitions = $this->execute(
+ $this->get(sprintf('/topics/%s/partitions', $topic)),
+ sprintf('array<%s>', Partition::class)
+ );
+
+ return $partitions;
+ }
+
+ /** {@inheritDoc} */
+ public function getPartition(string $topic, int $partition) : Partition
+ {
+ /** @var Partition $result */
+ $result = $this->execute(
+ $this->get(sprintf('/topics/%s/partitions/%d', $topic, $partition)),
+ Partition::class
+ );
+
+ return $result;
+ }
+
+ /** {@inheritDoc} */
+ public function produceToPartition(string $topic, int $partition, iterable $messages) : array
+ {
+ /** @var ProduceResults $result */
+ $result = $this->execute(
+ $this->post(
+ sprintf('/topics/%s/partitions/%d', $topic, $partition),
+ $this->serialize(['records' => $messages])
+ ),
+ ProduceResults::class
+ );
+
+ return $result->results;
+ }
+
+ /** {@inheritDoc} */
+ public function createConsumer(string $group, ?ConsumerOptions $consumerOptions = null) : Consumer
+ {
+ $consumerOptions = $consumerOptions ?? new ConsumerOptions();
+
+ /** @var Consumer $consumer */
+ $consumer = $this->execute(
+ $this->post(sprintf('/consumers/%s', $group), $this->serialize($consumerOptions)),
+ Consumer::class
+ );
+
+ return $consumer;
+ }
+
+ /** {@inheritDoc} */
+ public function deleteConsumer(Consumer $consumer) : void
+ {
+ $this->execute($this->requestFactory->createRequest('DELETE', $consumer->baseUri));
+ }
+
+ /** {@inheritDoc} */
+ public function consumerCommitOffsets(Consumer $consumer, ?iterable $offsets = null) : void
+ {
+ $body = $offsets === null ? '' : $this->serialize(['offsets' => $offsets]);
+ $this->execute($this->consumerPost($consumer, '/offsets', $body));
+ }
+
+ /** {@inheritDoc} */
+ public function getConsumerCommittedOffsets(Consumer $consumer, iterable $partitions) : array
+ {
+ /** @var array> $result */
+ $result = $this->execute(
+ $this->consumerGet($consumer, '/offsets', $this->serialize(['partitions' => $partitions])),
+ sprintf('array>', Offset::class)
+ );
+
+ return $result['offsets'];
+ }
+
+ /** {@inheritDoc} */
+ public function consumerSubscribe(Consumer $consumer, Subscription $subscription) : void
+ {
+ $this->execute($this->consumerPost($consumer, '/subscription', $this->serialize($subscription)));
+ }
+
+ /** {@inheritDoc} */
+ public function getConsumerSubscribedTopics(Consumer $consumer) : array
+ {
+ /** @var array> $result */
+ $result = $this->execute($this->consumerGet($consumer, '/subscription'), 'array>');
+
+ return $result['topics'];
+ }
+
+ /** {@inheritDoc} */
+ public function consumerUnsubscribe(Consumer $consumer) : void
+ {
+ $this->execute($this->requestFactory->createRequest('DELETE', $consumer->baseUri . '/subscription'));
+ }
+
+ /** {@inheritDoc} */
+ public function consumerAssignPartitions(Consumer $consumer, iterable $partitions) : void
+ {
+ $this->execute($this->consumerPost($consumer, '/assignments', $this->serialize(['partitions' => $partitions])));
+ }
+
+ /** {@inheritDoc} */
+ public function getConsumerAssignedPartitions(Consumer $consumer) : array
+ {
+ /** @var array> $result */
+ $result = $this->execute(
+ $this->consumerGet($consumer, '/assignments'),
+ sprintf('array>', AssignedPartition::class)
+ );
+
+ return $result['partitions'];
+ }
+
+ /** {@inheritDoc} */
+ public function consumerSeek(Consumer $consumer, iterable $offsets) : void
+ {
+ $this->execute($this->consumerPost($consumer, '/positions', $this->serialize(['offsets' => $offsets])));
+ }
+
+ /** {@inheritDoc} */
+ public function consumerSeekStart(Consumer $consumer, iterable $partitions) : void
+ {
+ $this->execute(
+ $this->consumerPost($consumer, '/positions/beginning', $this->serialize(['partitions' => $partitions]))
+ );
+ }
+
+ /** {@inheritDoc} */
+ public function consumerSeekEnd(Consumer $consumer, iterable $partitions) : void
+ {
+ $this->execute(
+ $this->consumerPost($consumer, '/positions/end', $this->serialize(['partitions' => $partitions]))
+ );
+ }
+
+ /** {@inheritDoc} */
+ public function getConsumerMessages(Consumer $consumer, ?int $timeout = null, ?int $maxBytes = null) : array
+ {
+ $parameters = [];
+ if ($timeout !== null) {
+ $parameters['timeout'] = $timeout;
+ }
+
+ if ($maxBytes !== null) {
+ $parameters['maxBytes'] = $maxBytes;
+ }
+
+ $request = $this->consumerGet($consumer, '/records');
+ $request = $request->withUri($request->getUri()->withQuery(http_build_query($parameters)), true);
+
+ /** @var array $messages */
+ $messages = $this->execute($request, sprintf('array<%s>', Message::class));
+
+ return $messages;
+ }
+
+ /** {@inheritDoc} */
+ public function getBrokers() : array
+ {
+ /** @var array> $result */
+ $result = $this->execute($this->get('/brokers'), 'array>');
+
+ return $result['brokers'];
+ }
+
+ private function get(string $path, ?UriInterface $baseUri = null) : RequestInterface
+ {
+ $request = $this->requestFactory->createRequest('GET', ($baseUri ?? $this->apiUri)->withPath($path));
+
+ return $request->withHeader('Accept', 'application/vnd.kafka.v2+json');
+ }
+
+ private function consumerGet(Consumer $consumer, string $path, ?string $body = null) : RequestInterface
+ {
+ $consumerUri = $this->uriFactory->createUri($consumer->baseUri);
+ $request = $this->get($consumerUri->getPath() . $path, $consumerUri);
+
+ if ($body !== null) {
+ return $this->requestWithBody($request, $body);
+ }
+
+ return $request;
+ }
+
+ private function post(string $path, string $body, ?UriInterface $baseUri = null) : RequestInterface
+ {
+ $request = $this->requestFactory->createRequest('POST', ($baseUri ?? $this->apiUri)->withPath($path));
+ $request = $request->withHeader('Accept', 'application/vnd.kafka.v2+json');
+
+ return $this->requestWithBody($request, $body);
+ }
+
+ private function consumerPost(Consumer $consumer, string $path, string $body) : RequestInterface
+ {
+ $consumerUri = $this->uriFactory->createUri($consumer->baseUri);
+
+ return $this->post($consumerUri->getPath() . $path, $body, $consumerUri);
+ }
+
+ private function requestWithBody(RequestInterface $request, string $body) : RequestInterface
+ {
+ $request = $request->withHeader('Content-Type', 'application/vnd.kafka.binary.v2+json');
+ $request = $request->withBody($this->streamFactory->createStream($body));
+
+ return $request;
+ }
+
+ /**
+ * @return mixed
+ */
+ private function execute(RequestInterface $request, ?string $resultType = null)
+ {
+ $this->logger->debug(
+ 'Sending request',
+ ['method' => $request->getMethod(), 'uri' => $request->getUri(), 'body' => $request->getBody()]
+ );
+
+ $response = $this->client->sendRequest($request);
+
+ $this->logger->debug(
+ 'Received response',
+ [
+ 'statusCode' => $response->getStatusCode(),
+ 'statusReason' => $response->getReasonPhrase(),
+ 'response' => $response->getBody(),
+ ]
+ );
+
+ if ($response->getStatusCode() === Http::OK) {
+ Assertion::notNull($resultType);
+
+ return $this->serializer->deserialize((string) $response->getBody(), $resultType, 'json');
+ }
+
+ if ($response->getStatusCode() === Http::NO_CONTENT) {
+ return null;
+ }
+
+ $body = (string) $response->getBody();
+ if ($body === '') {
+ throw UnexpectedApiResponse::fromStatus($response->getStatusCode(), $response->getReasonPhrase());
+ }
+
+ throw UnexpectedApiResponse::fromResponse($response->getStatusCode(), json_decode($body, true));
+ }
+
+ /**
+ * @param mixed $data
+ */
+ private function serialize($data) : string
+ {
+ return $this->serializer->serialize($data, 'json');
+ }
+}
diff --git a/src/Api/RestClient.php b/src/Api/RestClient.php
new file mode 100644
index 0000000..a671f56
--- /dev/null
+++ b/src/Api/RestClient.php
@@ -0,0 +1,152 @@
+
+ */
+ public function listTopics() : array;
+
+ /**
+ * GET /topics/(string:topic_name)
+ */
+ public function getTopic(string $topic) : Topic;
+
+ /**
+ * POST /topics/(string:topic_name)
+ *
+ * @param Value\Request\Message[] $messages
+ *
+ * @return ProduceResult[]
+ */
+ public function produce(string $topic, iterable $messages) : array;
+
+ /**
+ * GET /topics/(string:topic_name)/partitions
+ *
+ * @return Partition[]
+ */
+ public function listPartitions(string $topic) : array;
+
+ /**
+ * GET /topics/(string:topic_name)/partitions/(int:partition_id)
+ */
+ public function getPartition(string $topic, int $partition) : Partition;
+
+ /**
+ * POST /topics/(string:topic_name)/partitions/(int:partition_id)
+ *
+ * @param Value\Request\Message[] $messages
+ *
+ * @return ProduceResult[]
+ */
+ public function produceToPartition(string $topic, int $partition, iterable $messages) : array;
+
+ /**
+ * POST /consumers/(string:group_name)
+ */
+ public function createConsumer(string $group, ?ConsumerOptions $consumerOptions = null) : Consumer;
+
+ /**
+ * DELETE /consumers/(string:group_name)/instances/(string:instance)
+ */
+ public function deleteConsumer(Consumer $consumer) : void;
+
+ /**
+ * POST /consumers/(string:group_name)/instances/(string:instance)/offsets
+ *
+ * @param Value\Request\Offset[]|null $offsets
+ */
+ public function consumerCommitOffsets(Consumer $consumer, ?iterable $offsets = null) : void;
+
+ /**
+ * GET /consumers/(string:group_name)/instances/(string:instance)/offsets
+ *
+ * @param Value\Request\Partition[] $partitions
+ *
+ * @return Offset[]
+ */
+ public function getConsumerCommittedOffsets(Consumer $consumer, iterable $partitions) : array;
+
+ /**
+ * POST /consumers/(string:group_name)/instances/(string:instance)/subscription
+ */
+ public function consumerSubscribe(Consumer $consumer, Subscription $subscription) : void;
+
+ /**
+ * GET /consumers/(string:group_name)/instances/(string:instance)/subscription
+ *
+ * @return array
+ */
+ public function getConsumerSubscribedTopics(Consumer $consumer) : array;
+
+ /**
+ * DELETE /consumers/(string:group_name)/instances/(string:instance)/subscription
+ */
+ public function consumerUnsubscribe(Consumer $consumer) : void;
+
+ /**
+ * POST /consumers/(string:group_name)/instances/(string:instance)/assignments
+ *
+ * @param Value\Request\Partition[] $partitions
+ */
+ public function consumerAssignPartitions(Consumer $consumer, iterable $partitions) : void;
+
+ /**
+ * GET /consumers/(string:group_name)/instances/(string:instance)/assignments
+ *
+ * @return AssignedPartition[]
+ */
+ public function getConsumerAssignedPartitions(Consumer $consumer) : array;
+
+ /**
+ * POST /consumers/(string:group_name)/instances/(string:instance)/positions
+ *
+ * @param Value\Request\Offset[] $offsets
+ */
+ public function consumerSeek(Consumer $consumer, iterable $offsets) : void;
+
+ /**
+ * POST /consumers/(string:group_name)/instances/(string:instance)/positions/beginning
+ *
+ * @param Value\Request\Partition[] $partitions
+ */
+ public function consumerSeekStart(Consumer $consumer, iterable $partitions) : void;
+
+ /**
+ * POST /consumers/(string:group_name)/instances/(string:instance)/positions/end
+ *
+ * @param Value\Request\Partition[] $partitions
+ */
+ public function consumerSeekEnd(Consumer $consumer, iterable $partitions) : void;
+
+ /**
+ * GET /consumers/(string:group_name)/instances/(string:instance)/records
+ *
+ * @return Message[]
+ */
+ public function getConsumerMessages(Consumer $consumer, ?int $timeout = null, ?int $maxBytes = null) : array;
+
+ /**
+ * GET /brokers
+ *
+ * @return array
+ */
+ public function getBrokers() : array;
+}
diff --git a/src/Api/Value/Request/ConsumerOptions.php b/src/Api/Value/Request/ConsumerOptions.php
new file mode 100644
index 0000000..1384dfa
--- /dev/null
+++ b/src/Api/Value/Request/ConsumerOptions.php
@@ -0,0 +1,53 @@
+content = $content;
+ $this->key = $key;
+ }
+}
diff --git a/src/Api/Value/Request/Offset.php b/src/Api/Value/Request/Offset.php
new file mode 100644
index 0000000..ab57fa6
--- /dev/null
+++ b/src/Api/Value/Request/Offset.php
@@ -0,0 +1,35 @@
+topic = $topic;
+ $this->partition = $partition;
+ $this->offset = $offset;
+ }
+}
diff --git a/src/Api/Value/Request/Partition.php b/src/Api/Value/Request/Partition.php
new file mode 100644
index 0000000..6722e94
--- /dev/null
+++ b/src/Api/Value/Request/Partition.php
@@ -0,0 +1,28 @@
+topic = $topic;
+ $this->partition = $partition;
+ }
+}
diff --git a/src/Api/Value/Request/Subscription.php b/src/Api/Value/Request/Subscription.php
new file mode 100644
index 0000000..8932f24
--- /dev/null
+++ b/src/Api/Value/Request/Subscription.php
@@ -0,0 +1,67 @@
+")
+ * @Serializer\Accessor(getter="getTopics")
+ * @Serializer\SkipWhenEmpty()
+ * @var iterable|null
+ */
+ private $topics;
+
+ /**
+ * @Serializer\Type("string")
+ * @Serializer\SerializedName("topic_pattern")
+ * @Serializer\Accessor(getter="getPattern")
+ * @Serializer\SkipWhenEmpty()
+ * @var string|null
+ */
+ private $pattern;
+
+ /**
+ * @param iterable|null $topics
+ */
+ private function __construct(?iterable $topics, ?string $pattern)
+ {
+ $this->topics = $topics;
+ $this->pattern = $pattern;
+ }
+
+ public static function topic(string $topic) : self
+ {
+ return new self([$topic], null);
+ }
+
+ /**
+ * @param iterable $topics
+ */
+ public static function topics(iterable $topics) : self
+ {
+ return new self($topics, null);
+ }
+
+ public static function pattern(string $pattern) : self
+ {
+ return new self(null, $pattern);
+ }
+
+ /**
+ * @return string[]|null
+ */
+ public function getTopics() : ?iterable
+ {
+ return $this->topics;
+ }
+
+ public function getPattern() : ?string
+ {
+ return $this->pattern;
+ }
+}
diff --git a/src/Api/Value/Response/AssignedPartition.php b/src/Api/Value/Response/AssignedPartition.php
new file mode 100644
index 0000000..596177e
--- /dev/null
+++ b/src/Api/Value/Response/AssignedPartition.php
@@ -0,0 +1,22 @@
+partition = $partition;
+ $this->offset = $offset;
+ }
+}
diff --git a/src/Api/Value/Response/Partition.php b/src/Api/Value/Response/Partition.php
new file mode 100644
index 0000000..f861c15
--- /dev/null
+++ b/src/Api/Value/Response/Partition.php
@@ -0,0 +1,28 @@
+")
+ * @var Replica[]
+ */
+ public $replicas;
+}
diff --git a/src/Api/Value/Response/ProduceError.php b/src/Api/Value/Response/ProduceError.php
new file mode 100644
index 0000000..0ef2eae
--- /dev/null
+++ b/src/Api/Value/Response/ProduceError.php
@@ -0,0 +1,23 @@
+errorCode = $errorCode;
+ $this->error = $error;
+ }
+}
diff --git a/src/Api/Value/Response/ProduceResult.php b/src/Api/Value/Response/ProduceResult.php
new file mode 100644
index 0000000..67b2922
--- /dev/null
+++ b/src/Api/Value/Response/ProduceResult.php
@@ -0,0 +1,82 @@
+partition = $partition;
+ $result->offset = $offset;
+
+ return $result;
+ }
+
+ /**
+ * @internal
+ */
+ public static function error(int $errorCode, string $error) : self
+ {
+ $result = new self();
+ $result->errorCode = $errorCode;
+ $result->error = $error;
+
+ return $result;
+ }
+
+ public function isError() : bool
+ {
+ return $this->errorCode !== null;
+ }
+
+ public function getOffset() : Offset
+ {
+ Assertion::false($this->isError());
+
+ return new Offset((int) $this->partition, (int) $this->offset);
+ }
+
+ public function getError() : ProduceError
+ {
+ Assertion::true($this->isError());
+
+ return new ProduceError((int) $this->errorCode, (string) $this->error);
+ }
+}
diff --git a/src/Api/Value/Response/ProduceResults.php b/src/Api/Value/Response/ProduceResults.php
new file mode 100644
index 0000000..10e05be
--- /dev/null
+++ b/src/Api/Value/Response/ProduceResults.php
@@ -0,0 +1,20 @@
+")
+ * @Serializer\SerializedName("offsets")
+ * @var ProduceResult[]
+ */
+ public $results;
+}
diff --git a/src/Api/Value/Response/Replica.php b/src/Api/Value/Response/Replica.php
new file mode 100644
index 0000000..72ccc49
--- /dev/null
+++ b/src/Api/Value/Response/Replica.php
@@ -0,0 +1,28 @@
+")
+ * @var array
+ */
+ public $configs;
+
+ /**
+ * @Serializer\Type("array")
+ * @var Partition[]
+ */
+ public $partitions;
+}
diff --git a/src/BatchConsumer.php b/src/BatchConsumer.php
new file mode 100644
index 0000000..0a9379c
--- /dev/null
+++ b/src/BatchConsumer.php
@@ -0,0 +1,155 @@
+client = $client;
+ $this->consumer = $consumer;
+ $this->maxCount = $maxCount;
+
+ if ($maxDuration !== null) {
+ $this->maxDuration = new DateInterval(sprintf('PT%dS', $maxDuration));
+ }
+
+ $this->clock = $clock ?? new SystemClock();
+ }
+
+ public function __destruct()
+ {
+ if (! isset($this->consumer)) {
+ return;
+ }
+
+ $this->close();
+ }
+
+ /**
+ * @return Message[]
+ */
+ public function consume(?int $maxBytes = null) : iterable
+ {
+ if (! isset($this->consumer)) {
+ throw ConsumerClosed::new();
+ }
+
+ $batch = new MessagesBatch();
+ $batchTimeout = $this->getBatchTimeout();
+
+ try {
+ while (true) {
+ $messages = $this->client->getConsumerMessages(
+ $this->consumer,
+ $this->getTimeout($batchTimeout),
+ $maxBytes
+ );
+ foreach ($messages as $message) {
+ $batch->add($message);
+
+ if (($this->maxCount === null || $batch->count() !== $this->maxCount) &&
+ ($this->maxDuration === null || $this->clock->now() < $batchTimeout)
+ ) {
+ continue;
+ }
+
+ yield $batch;
+
+ $batch = new MessagesBatch();
+ $batchTimeout = $this->getBatchTimeout();
+ }
+ }
+ } catch (Throwable $throwable) {
+ $this->close();
+
+ throw ConsumerClosed::error($throwable);
+ }
+ }
+
+ public function commit(MessagesBatch $messagesBatch) : void
+ {
+ if ($messagesBatch->isEmpty()) {
+ return;
+ }
+
+ $offsets = $offsetsMap = [];
+ $messages = $messagesBatch->getMessages();
+ for ($i = count($messages) - 1; $i >= 0; $i--) {
+ $message = $messages[$i];
+ if (isset($offsetsMap[$message->topic][$message->partition])) {
+ continue;
+ }
+
+ $offsetsMap[$message->topic][$message->partition] = $message->offset;
+ $offsets[] = new Offset($message->topic, $message->partition, $message->offset + 1);
+ }
+
+ $this->client->consumerCommitOffsets($this->consumer, $offsets);
+ }
+
+ private function getBatchTimeout() : ?DateTimeImmutable
+ {
+ if ($this->maxDuration !== null) {
+ return $this->clock->now()->add($this->maxDuration);
+ }
+
+ return null;
+ }
+
+ private function getTimeout(?DateTimeImmutable $batchTimeout) : int
+ {
+ if ($batchTimeout === null) {
+ return 10;
+ }
+
+ return $batchTimeout->getTimestamp() - $this->clock->now()->getTimestamp();
+ }
+
+ private function close() : void
+ {
+ $this->client->deleteConsumer($this->consumer);
+ unset($this->consumer);
+ }
+}
diff --git a/src/BatchConsumerFactory.php b/src/BatchConsumerFactory.php
new file mode 100644
index 0000000..ce6d5b9
--- /dev/null
+++ b/src/BatchConsumerFactory.php
@@ -0,0 +1,33 @@
+client = $client;
+ }
+
+ public function create(
+ string $group,
+ Subscription $subscription,
+ ?int $maxCount,
+ ?int $maxDuration = null,
+ ?ConsumerOptions $consumerOptions = null
+ ) : BatchConsumer {
+ $consumer = $this->client->createConsumer($group, $consumerOptions);
+ $this->client->consumerSubscribe($consumer, $subscription);
+
+ return new BatchConsumer($this->client, $consumer, $maxCount, $maxDuration);
+ }
+}
diff --git a/src/Consumer.php b/src/Consumer.php
new file mode 100644
index 0000000..7a97c11
--- /dev/null
+++ b/src/Consumer.php
@@ -0,0 +1,69 @@
+client = $client;
+ $this->consumer = $consumer;
+ }
+
+ public function __destruct()
+ {
+ if (! isset($this->consumer)) {
+ return;
+ }
+
+ $this->close();
+ }
+
+ /**
+ * @return Message[]
+ */
+ public function consume(?int $timeout = null, ?int $maxBytes = null) : iterable
+ {
+ if (! isset($this->consumer)) {
+ throw ConsumerClosed::new();
+ }
+
+ try {
+ while (true) {
+ yield from $this->client->getConsumerMessages($this->consumer, $timeout, $maxBytes);
+ }
+ } catch (Throwable $throwable) {
+ $this->close();
+
+ throw ConsumerClosed::error($throwable);
+ }
+ }
+
+ public function commit(Message $message) : void
+ {
+ $this->client->consumerCommitOffsets(
+ $this->consumer,
+ [new Offset($message->topic, $message->partition, $message->offset + 1)]
+ );
+ }
+
+ private function close() : void
+ {
+ $this->client->deleteConsumer($this->consumer);
+ unset($this->consumer);
+ }
+}
diff --git a/src/ConsumerFactory.php b/src/ConsumerFactory.php
new file mode 100644
index 0000000..8be744c
--- /dev/null
+++ b/src/ConsumerFactory.php
@@ -0,0 +1,31 @@
+client = $client;
+ }
+
+ public function create(
+ string $group,
+ Subscription $subscription,
+ ?ConsumerOptions $consumerOptions = null
+ ) : Consumer {
+ $consumer = $this->client->createConsumer($group, $consumerOptions);
+ $this->client->consumerSubscribe($consumer, $subscription);
+
+ return new Consumer($this->client, $consumer);
+ }
+}
diff --git a/src/Exception/ConsumerClosed.php b/src/Exception/ConsumerClosed.php
new file mode 100644
index 0000000..ed5b965
--- /dev/null
+++ b/src/Exception/ConsumerClosed.php
@@ -0,0 +1,21 @@
+retryable = $retryable;
+ $exception->nonRetryable = $nonRetryable;
+
+ return $exception;
+ }
+}
diff --git a/src/Producer.php b/src/Producer.php
new file mode 100644
index 0000000..7ed79fa
--- /dev/null
+++ b/src/Producer.php
@@ -0,0 +1,61 @@
+client = $client;
+ }
+
+ public function produce(string $topic, Message $message) : void
+ {
+ $this->produceBatch($topic, [$message]);
+ }
+
+ /**
+ * @param Message[] $messages
+ */
+ public function produceBatch(string $topic, iterable $messages) : void
+ {
+ $errors = [];
+ foreach ($this->client->produce($topic, $messages) as $i => $result) {
+ if (! $result->isError()) {
+ continue;
+ }
+
+ $errors[$i] = $result->getError();
+ }
+
+ if (count($errors) === 0) {
+ return;
+ }
+
+ $retryable = $nonRetryable = [];
+ foreach ($messages as $i => $message) {
+ if (! array_key_exists($i, $errors)) {
+ continue;
+ }
+
+ if ($errors[$i]->errorCode === 2) {
+ $retryable[] = ['error' => $errors[$i]->error, 'message' => $message];
+ } else {
+ $nonRetryable[] = ['error' => $errors[$i]->error, 'message' => $message];
+ }
+ }
+
+ throw FailedToProduceMessages::new($retryable, $nonRetryable);
+ }
+}
diff --git a/src/Value/MessagesBatch.php b/src/Value/MessagesBatch.php
new file mode 100644
index 0000000..c558c88
--- /dev/null
+++ b/src/Value/MessagesBatch.php
@@ -0,0 +1,37 @@
+ */
+ private $messages = [];
+
+ public function add(Message $message) : void
+ {
+ $this->messages[] = $message;
+ }
+
+ /**
+ * @return Message[]
+ */
+ public function getMessages() : array
+ {
+ return $this->messages;
+ }
+
+ public function count() : int
+ {
+ return count($this->messages);
+ }
+
+ public function isEmpty() : bool
+ {
+ return count($this->messages) === 0;
+ }
+}
diff --git a/tests/Api/HttpRestClientTest.php b/tests/Api/HttpRestClientTest.php
new file mode 100644
index 0000000..b901e3d
--- /dev/null
+++ b/tests/Api/HttpRestClientTest.php
@@ -0,0 +1,541 @@
+httpClient = Mockery::mock(ClientInterface::class);
+ $this->client = new HttpRestClient(
+ $this->httpClient,
+ new NullLogger(),
+ new RequestFactory(),
+ SerializerBuilder::create()->build(),
+ new StreamFactory(),
+ new UriFactory(),
+ 'http://api'
+ );
+ $this->response = (new ResponseFactory())->createResponse();
+ }
+
+ public function testListTopics() : void
+ {
+ $this->expectRequest('GET', '/topics', null, $this->responseWithBody('["lorem-ipsum", "dolor-sit-amet"]'));
+
+ $topics = $this->client->listTopics();
+
+ self::assertSame(['lorem-ipsum', 'dolor-sit-amet'], $topics);
+ }
+
+ public function testGetTopic() : void
+ {
+ $this->expectRequest('GET', '/topics/lorem-ipsum', null, $this->fileResponse('getTopic'));
+
+ $topic = $this->client->getTopic('lorem-ipsum');
+
+ self::assertSame('lorem-ipsum', $topic->name);
+ self::assertSame(['retention.bytes' => '-1', 'flush.ms' => '50000'], $topic->configs);
+ self::assertCount(2, $topic->partitions);
+
+ // partition 0
+ self::assertSame(0, $topic->partitions[0]->partition);
+ self::assertSame(0, $topic->partitions[0]->leader);
+ self::assertCount(2, $topic->partitions[0]->replicas);
+
+ // partition 0, replica 0
+ self::assertSame(0, $topic->partitions[0]->replicas[0]->broker);
+ self::assertTrue($topic->partitions[0]->replicas[0]->leader);
+ self::assertTrue($topic->partitions[0]->replicas[0]->inSync);
+
+ // partition 0, replica 1
+ self::assertSame(1, $topic->partitions[0]->replicas[1]->broker);
+ self::assertFalse($topic->partitions[0]->replicas[1]->leader);
+ self::assertTrue($topic->partitions[0]->replicas[1]->inSync);
+
+ // partition 1
+ self::assertSame(1, $topic->partitions[1]->partition);
+ self::assertSame(1, $topic->partitions[1]->leader);
+ self::assertCount(2, $topic->partitions[1]->replicas);
+
+ // partition 1, replica 0
+ self::assertSame(1, $topic->partitions[1]->replicas[0]->broker);
+ self::assertTrue($topic->partitions[1]->replicas[0]->leader);
+ self::assertTrue($topic->partitions[1]->replicas[0]->inSync);
+
+ // partition 1, replica 1
+ self::assertSame(0, $topic->partitions[1]->replicas[1]->broker);
+ self::assertFalse($topic->partitions[1]->replicas[1]->leader);
+ self::assertTrue($topic->partitions[1]->replicas[1]->inSync);
+ }
+
+ public function testProduce() : void
+ {
+ $this->expectRequest(
+ 'POST',
+ '/topics/lorem-ipsum',
+ '{"records":[{"value":"msg1"},{"value":"msg2"},{"value":"msg3"}]}',
+ $this->fileResponse('produce')
+ );
+
+ $results = $this->client->produce(
+ 'lorem-ipsum',
+ [new Message('msg1'), new Message('msg2'), new Message('msg3')]
+ );
+
+ self::assertCount(3, $results);
+
+ self::assertFalse($results[0]->isError());
+ $offset = $results[0]->getOffset();
+ self::assertSame(0, $offset->partition);
+ self::assertSame(8, $offset->offset);
+
+ self::assertFalse($results[1]->isError());
+ $offset = $results[1]->getOffset();
+ self::assertSame(1, $offset->partition);
+ self::assertSame(15, $offset->offset);
+
+ self::assertTrue($results[2]->isError());
+ $error = $results[2]->getError();
+ self::assertSame(1, $error->errorCode);
+ self::assertSame('some unexpected error', $error->error);
+ }
+
+ public function testListPartitions() : void
+ {
+ $this->expectRequest('GET', '/topics/lorem-ipsum/partitions', null, $this->fileResponse('listPartitions'));
+
+ $partitions = $this->client->listPartitions('lorem-ipsum');
+
+ self::assertCount(2, $partitions);
+
+ // partition 0
+ self::assertSame(0, $partitions[0]->partition);
+ self::assertSame(0, $partitions[0]->leader);
+ self::assertCount(2, $partitions[0]->replicas);
+
+ // partition 0, replica 0
+ self::assertSame(0, $partitions[0]->replicas[0]->broker);
+ self::assertTrue($partitions[0]->replicas[0]->leader);
+ self::assertTrue($partitions[0]->replicas[0]->inSync);
+
+ // partition 0, replica 1
+ self::assertSame(1, $partitions[0]->replicas[1]->broker);
+ self::assertFalse($partitions[0]->replicas[1]->leader);
+ self::assertTrue($partitions[0]->replicas[1]->inSync);
+
+ // partition 1
+ self::assertSame(1, $partitions[1]->partition);
+ self::assertSame(1, $partitions[1]->leader);
+ self::assertCount(2, $partitions[1]->replicas);
+
+ // partition 1, replica 0
+ self::assertSame(1, $partitions[1]->replicas[0]->broker);
+ self::assertTrue($partitions[1]->replicas[0]->leader);
+ self::assertTrue($partitions[1]->replicas[0]->inSync);
+
+ // partition 1, replica 1
+ self::assertSame(0, $partitions[1]->replicas[1]->broker);
+ self::assertFalse($partitions[1]->replicas[1]->leader);
+ self::assertTrue($partitions[1]->replicas[1]->inSync);
+ }
+
+ public function testGetPartition() : void
+ {
+ $this->expectRequest('GET', '/topics/lorem-ipsum/partitions/0', null, $this->fileResponse('getPartition'));
+
+ $partition = $this->client->getPartition('lorem-ipsum', 0);
+
+ // partition 0
+ self::assertSame(0, $partition->partition);
+ self::assertSame(0, $partition->leader);
+ self::assertCount(2, $partition->replicas);
+
+ // partition 0, replica 0
+ self::assertSame(0, $partition->replicas[0]->broker);
+ self::assertTrue($partition->replicas[0]->leader);
+ self::assertTrue($partition->replicas[0]->inSync);
+
+ // partition 0, replica 1
+ self::assertSame(1, $partition->replicas[1]->broker);
+ self::assertFalse($partition->replicas[1]->leader);
+ self::assertTrue($partition->replicas[1]->inSync);
+ }
+
+ public function testProduceToPartition() : void
+ {
+ $this->expectRequest(
+ 'POST',
+ '/topics/lorem-ipsum/partitions/0',
+ '{"records":[{"value":"msg1"},{"value":"msg2"}]}',
+ $this->fileResponse('produceToPartition')
+ );
+
+ $offsets = $this->client->produceToPartition('lorem-ipsum', 0, [new Message('msg1'), new Message('msg2')]);
+
+ self::assertCount(2, $offsets);
+
+ self::assertFalse($offsets[0]->isError());
+ $offset = $offsets[0]->getOffset();
+ self::assertSame(0, $offset->partition);
+ self::assertSame(8, $offset->offset);
+
+ self::assertTrue($offsets[1]->isError());
+ $error = $offsets[1]->getError();
+ self::assertSame(1, $error->errorCode);
+ self::assertSame('some unexpected error', $error->error);
+ }
+
+ public function testCreateConsumer() : void
+ {
+ $consumerOptions = new ConsumerOptions();
+ $consumerOptions->name = 'custom-consumer-name';
+
+ $this->expectRequest('POST', '/consumers/dolor-sit-amet', '{}', $this->fileResponse('createConsumer'));
+
+ $consumer = $this->client->createConsumer('dolor-sit-amet', new ConsumerOptions());
+
+ self::assertSame('custom-consumer-name', $consumer->instanceId);
+ self::assertSame('http://api/consumers/dolor-sit-amet/instances/custom-consumer-name', $consumer->baseUri);
+ }
+
+ public function testDeleteConsumer() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'DELETE',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name',
+ null,
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->deleteConsumer($consumer);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testConsumerCommitOffsets() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/offsets',
+ null,
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerCommitOffsets($consumer);
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/offsets',
+ '{"offsets":[{"topic":"lorem-ipsum","partition":0,"offset":25}]}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerCommitOffsets($consumer, [new Offset('lorem-ipsum', 0, 25)]);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testGetConsumerCommittedOffsets() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'GET',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/offsets',
+ '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}',
+ $this->responseWithBody('{"offsets":[{"topic":"lorem-ipsum","partition":0,"offset":25,"metadata":""}]}')
+ );
+
+ $offsets = $this->client->getConsumerCommittedOffsets($consumer, [new Partition('lorem-ipsum', 0)]);
+
+ self::assertCount(1, $offsets);
+ self::assertSame(0, $offsets[0]->partition);
+ self::assertSame(25, $offsets[0]->offset);
+ self::assertSame('', $offsets[0]->metadata);
+ }
+
+ public function testConsumerSubscribe() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription',
+ '{"topics":["lorem-ipsum"]}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerSubscribe($consumer, Subscription::topic('lorem-ipsum'));
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription',
+ '{"topic_pattern":"lorem-*"}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerSubscribe($consumer, Subscription::pattern('lorem-*'));
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testGetConsumerSubscribedTopics() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'GET',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription',
+ null,
+ $this->responseWithBody('{"topics":["lorem-ipsum"]}')
+ );
+
+ $topics = $this->client->getConsumerSubscribedTopics($consumer);
+
+ self::assertCount(1, $topics);
+ self::assertSame('lorem-ipsum', $topics[0]);
+ }
+
+ public function testConsumerUnsubscribe() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'DELETE',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription',
+ null,
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerUnsubscribe($consumer);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testConsumerAssignPartitions() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/assignments',
+ '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerAssignPartitions($consumer, [new Partition('lorem-ipsum', 0)]);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testGetConsumerAssignedPartitions() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'GET',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/assignments',
+ null,
+ $this->responseWithBody('{"partitions":[{"topic":"lorem-ipsum","partition":0}]}')
+ );
+
+ $partitions = $this->client->getConsumerAssignedPartitions($consumer);
+
+ self::assertCount(1, $partitions);
+ self::assertSame('lorem-ipsum', $partitions[0]->topic);
+ self::assertSame(0, $partitions[0]->partition);
+ }
+
+ public function testConsumerSeek() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/positions',
+ '{"offsets":[{"topic":"lorem-ipsum","partition":0,"offset":12}]}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerSeek($consumer, [new Offset('lorem-ipsum', 0, 12)]);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testConsumerSeekStart() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/positions/beginning',
+ '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerSeekStart($consumer, [new Partition('lorem-ipsum', 0)]);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testConsumerSeekEnd() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'POST',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/positions/end',
+ '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}',
+ $this->response->withStatus(Http::NO_CONTENT)
+ );
+
+ $this->client->consumerSeekEnd($consumer, [new Partition('lorem-ipsum', 0)]);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testGetConsumerMessages() : void
+ {
+ $consumer = new Consumer();
+ $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name';
+
+ $this->expectRequest(
+ 'GET',
+ '/consumers/dolor-sit-amet/instances/custom-consumer-name/records',
+ null,
+ $this->fileResponse('getConsumerMessages')
+ );
+
+ $messages = $this->client->getConsumerMessages($consumer);
+
+ self::assertCount(2, $messages);
+
+ self::assertSame('lorem-ipsum', $messages[0]->topic);
+ self::assertNull($messages[0]->key);
+ self::assertSame('msg1', $messages[0]->content);
+ self::assertSame(0, $messages[0]->partition);
+ self::assertSame(52, $messages[0]->offset);
+
+ self::assertSame('lorem-ipsum', $messages[1]->topic);
+ self::assertNull($messages[1]->key);
+ self::assertSame('msg2', $messages[1]->content);
+ self::assertSame(1, $messages[1]->partition);
+ self::assertSame(53, $messages[1]->offset);
+ }
+
+ public function testGetBrokers() : void
+ {
+ $this->expectRequest('GET', '/brokers', null, $this->responseWithBody('{"brokers":[1, 0]}'));
+
+ $brokers = $this->client->getBrokers();
+
+ self::assertCount(2, $brokers);
+ self::assertSame(1, $brokers[0]);
+ self::assertSame(0, $brokers[1]);
+ }
+
+ /**
+ * @dataProvider providerErrors
+ */
+ public function testErrors(?string $body, int $status, string $message) : void
+ {
+ if ($body === null) {
+ $response = $this->response->withStatus($status);
+ } else {
+ $response = $this->responseWithBody($body)->withStatus($status);
+ }
+
+ $this->expectRequest('GET', '/topics', null, $response);
+
+ $this->expectException(UnexpectedApiResponse::class);
+ $this->expectExceptionMessage($message);
+
+ $this->client->listTopics();
+ }
+
+ /**
+ * @return mixed[]
+ */
+ public function providerErrors() : iterable
+ {
+ yield [
+ 'body' => '{"error_code":40401,"message":"Topic not found"}',
+ 'status' => Http::NOT_FOUND,
+ 'message' => 'Unexpected response HTTP 404: [40401] Topic not found',
+ ];
+
+ yield [
+ 'body' => null,
+ 'status' => Http::INTERNAL_SERVER_ERROR,
+ 'message' => 'Unexpected HTTP status 500: Internal Server Error',
+ ];
+ }
+
+ private function expectRequest(string $method, string $path, ?string $body, ResponseInterface $response) : void
+ {
+ $this->httpClient->shouldReceive('sendRequest')
+ ->once()
+ ->with(Mockery::on(static function (RequestInterface $request) use ($method, $path, $body) {
+ return $request->getMethod() === $method &&
+ (string) $request->getUri() === 'http://api' . $path &&
+ (string) $request->getBody() === ($body ?? '');
+ }))
+ ->andReturn($response);
+ }
+
+ private function fileResponse(string $name) : ResponseInterface
+ {
+ $path = sprintf('%s/data/%s.json', __DIR__, $name);
+
+ return $this->responseWithBody(file_get_contents($path));
+ }
+
+ private function responseWithBody(string $body) : ResponseInterface
+ {
+ return $this->response->withBody((new StreamFactory())->createStream($body));
+ }
+}
diff --git a/tests/Api/data/createConsumer.json b/tests/Api/data/createConsumer.json
new file mode 100644
index 0000000..daa472c
--- /dev/null
+++ b/tests/Api/data/createConsumer.json
@@ -0,0 +1,4 @@
+{
+ "instance_id": "custom-consumer-name",
+ "base_uri": "http://api/consumers/dolor-sit-amet/instances/custom-consumer-name"
+}
diff --git a/tests/Api/data/getConsumerMessages.json b/tests/Api/data/getConsumerMessages.json
new file mode 100644
index 0000000..ebe88cb
--- /dev/null
+++ b/tests/Api/data/getConsumerMessages.json
@@ -0,0 +1,16 @@
+[
+ {
+ "topic": "lorem-ipsum",
+ "key": null,
+ "value": "msg1",
+ "partition": 0,
+ "offset": 52
+ },
+ {
+ "topic": "lorem-ipsum",
+ "key": null,
+ "value": "msg2",
+ "partition": 1,
+ "offset": 53
+ }
+]
diff --git a/tests/Api/data/getPartition.json b/tests/Api/data/getPartition.json
new file mode 100644
index 0000000..6b12c72
--- /dev/null
+++ b/tests/Api/data/getPartition.json
@@ -0,0 +1,16 @@
+{
+ "partition": 0,
+ "leader": 0,
+ "replicas": [
+ {
+ "broker": 0,
+ "leader": true,
+ "in_sync": true
+ },
+ {
+ "broker": 1,
+ "leader": false,
+ "in_sync": true
+ }
+ ]
+}
diff --git a/tests/Api/data/getTopic.json b/tests/Api/data/getTopic.json
new file mode 100644
index 0000000..50f4c05
--- /dev/null
+++ b/tests/Api/data/getTopic.json
@@ -0,0 +1,41 @@
+{
+ "name": "lorem-ipsum",
+ "configs": {
+ "retention.bytes": "-1",
+ "flush.ms": "50000"
+ },
+ "partitions": [
+ {
+ "partition": 0,
+ "leader": 0,
+ "replicas": [
+ {
+ "broker": 0,
+ "leader": true,
+ "in_sync": true
+ },
+ {
+ "broker": 1,
+ "leader": false,
+ "in_sync": true
+ }
+ ]
+ },
+ {
+ "partition": 1,
+ "leader": 1,
+ "replicas": [
+ {
+ "broker": 1,
+ "leader": true,
+ "in_sync": true
+ },
+ {
+ "broker": 0,
+ "leader": false,
+ "in_sync": true
+ }
+ ]
+ }
+ ]
+}
diff --git a/tests/Api/data/listPartitions.json b/tests/Api/data/listPartitions.json
new file mode 100644
index 0000000..9878e1c
--- /dev/null
+++ b/tests/Api/data/listPartitions.json
@@ -0,0 +1,34 @@
+[
+ {
+ "partition": 0,
+ "leader": 0,
+ "replicas": [
+ {
+ "broker": 0,
+ "leader": true,
+ "in_sync": true
+ },
+ {
+ "broker": 1,
+ "leader": false,
+ "in_sync": true
+ }
+ ]
+ },
+ {
+ "partition": 1,
+ "leader": 1,
+ "replicas": [
+ {
+ "broker": 1,
+ "leader": true,
+ "in_sync": true
+ },
+ {
+ "broker": 0,
+ "leader": false,
+ "in_sync": true
+ }
+ ]
+ }
+]
diff --git a/tests/Api/data/produce.json b/tests/Api/data/produce.json
new file mode 100644
index 0000000..3388bca
--- /dev/null
+++ b/tests/Api/data/produce.json
@@ -0,0 +1,22 @@
+{
+ "offsets": [
+ {
+ "partition": 0,
+ "offset": 8,
+ "error_code": null,
+ "error": null
+ },
+ {
+ "partition": 1,
+ "offset": 15,
+ "error_code": null,
+ "error": null
+ },
+ {
+ "error_code": 1,
+ "error": "some unexpected error"
+ }
+ ],
+ "key_schema_id": null,
+ "value_schema_id": null
+}
diff --git a/tests/Api/data/produceToPartition.json b/tests/Api/data/produceToPartition.json
new file mode 100644
index 0000000..c9f83f0
--- /dev/null
+++ b/tests/Api/data/produceToPartition.json
@@ -0,0 +1,16 @@
+{
+ "offsets": [
+ {
+ "partition": 0,
+ "offset": 8,
+ "error_code": null,
+ "error": null
+ },
+ {
+ "error_code": 1,
+ "error": "some unexpected error"
+ }
+ ],
+ "key_schema_id": null,
+ "value_schema_id": null
+}
diff --git a/tests/BatchConsumerFactoryTest.php b/tests/BatchConsumerFactoryTest.php
new file mode 100644
index 0000000..fd41cb5
--- /dev/null
+++ b/tests/BatchConsumerFactoryTest.php
@@ -0,0 +1,56 @@
+shouldReceive('createConsumer')
+ ->once()
+ ->with($group, $consumerOptions)
+ ->andReturn($consumer);
+
+ $client->shouldReceive('consumerSubscribe')
+ ->once()
+ ->with($consumer, $subscription);
+
+ $client->shouldReceive('deleteConsumer')
+ ->with($consumer);
+
+ $factory = new BatchConsumerFactory($client);
+ $factory->create($group, $subscription, $maxCount, $maxDuration, $consumerOptions);
+ self::assertThat(true, new IsTrue());
+ }
+
+ /**
+ * @return mixed[]
+ */
+ public function providerCreate() : iterable
+ {
+ yield [null, 10, null];
+ yield [1000, null, null];
+ yield [null, 10, new ConsumerOptions()];
+ yield [1000, null, new ConsumerOptions()];
+ }
+}
diff --git a/tests/BatchConsumerTest.php b/tests/BatchConsumerTest.php
new file mode 100644
index 0000000..3566e9c
--- /dev/null
+++ b/tests/BatchConsumerTest.php
@@ -0,0 +1,375 @@
+shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $firstMessages) {
+ $now = $now->add(new DateInterval('PT20S'));
+ $clock->setTo($now);
+
+ return $firstMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration - 20, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $secondMessages) {
+ $now = $now->add(new DateInterval('PT10S'));
+ $clock->setTo($now);
+
+ return $secondMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $thirdMessages) {
+ $now = $now->add(new DateInterval('PT15S'));
+ $clock->setTo($now);
+
+ return $thirdMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration - 15, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $forthMessages) {
+ $now = $now->add(new DateInterval('PT10S'));
+ $clock->setTo($now);
+
+ return $forthMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration - 25, $maxBytes)
+ ->andThrow(new Exception('some exception'));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new BatchConsumer($client, $consumerVO, null, $maxDuration, $clock);
+ $messages = $consumer->consume($maxBytes);
+
+ self::assertInstanceOf(Generator::class, $messages);
+ self::assertTrue($messages->valid());
+
+ $batch = $messages->current();
+ self::assertInstanceOf(MessagesBatch::class, $batch);
+
+ self::assertSame(3, $batch->count());
+ self::assertCount(3, $batch->getMessages());
+ self::assertSame(array_merge($firstMessages, $secondMessages), $batch->getMessages());
+
+ try {
+ $messages->next();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertFalse($messages->valid());
+ }
+ }
+
+ /**
+ * @dataProvider providerMaxBytes
+ */
+ public function testConsumeWithMaxCount(?int $maxBytes) : void
+ {
+ $clock = new FrozenClock(new DateTimeImmutable());
+
+ $consumerVO = new Consumer();
+
+ $firstMessages = [new Message(), new Message()];
+ $secondMessages = [new Message()];
+ $thirdMessages = [];
+ $forthMessages = [new Message()];
+
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, 10, $maxBytes)
+ ->andReturn($firstMessages);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, 10, $maxBytes)
+ ->andReturn($secondMessages);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, 10, $maxBytes)
+ ->andReturn($thirdMessages);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, 10, $maxBytes)
+ ->andReturn($forthMessages);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, 10, $maxBytes)
+ ->andThrow(new Exception('some exception'));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new BatchConsumer($client, $consumerVO, 3, null, $clock);
+ $messages = $consumer->consume($maxBytes);
+
+ self::assertInstanceOf(Generator::class, $messages);
+ self::assertTrue($messages->valid());
+
+ $batch = $messages->current();
+ self::assertInstanceOf(MessagesBatch::class, $batch);
+
+ self::assertSame(3, $batch->count());
+ self::assertCount(3, $batch->getMessages());
+ self::assertSame(array_merge($firstMessages, $secondMessages), $batch->getMessages());
+
+ try {
+ $messages->next();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertFalse($messages->valid());
+ }
+ }
+
+ /**
+ * @dataProvider providerMaxBytes
+ */
+ public function testConsumeWithMaxCountAndMaxDuration(?int $maxBytes) : void
+ {
+ $maxDuration = 30;
+ $now = new DateTimeImmutable();
+ $clock = new FrozenClock($now);
+
+ $consumerVO = new Consumer();
+
+ $firstMessages = [new Message(), new Message()];
+ $secondMessages = [new Message()];
+ $thirdMessages = [new Message()];
+ $forthMessages = [new Message()];
+
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $firstMessages) {
+ $now = $now->add(new DateInterval('PT20S'));
+ $clock->setTo($now);
+
+ return $firstMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration - 20, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $secondMessages) {
+ $now = $now->add(new DateInterval('PT5S'));
+ $clock->setTo($now);
+
+ return $secondMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $thirdMessages) {
+ $now = $now->add(new DateInterval('PT40S'));
+ $clock->setTo($now);
+
+ return $thirdMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration, $maxBytes)
+ ->andReturnUsing(static function () use (&$now, $clock, $forthMessages) {
+ $now = $now->add(new DateInterval('PT10S'));
+ $clock->setTo($now);
+
+ return $forthMessages;
+ });
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $maxDuration - 10, $maxBytes)
+ ->andThrow(new Exception('some exception'));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new BatchConsumer($client, $consumerVO, 3, $maxDuration, $clock);
+ $messages = $consumer->consume($maxBytes);
+
+ self::assertInstanceOf(Generator::class, $messages);
+ self::assertTrue($messages->valid());
+
+ $batch = $messages->current();
+ self::assertInstanceOf(MessagesBatch::class, $batch);
+
+ self::assertSame(3, $batch->count());
+ self::assertCount(3, $batch->getMessages());
+ self::assertSame(array_merge($firstMessages, $secondMessages), $batch->getMessages());
+
+ $messages->next();
+ self::assertTrue($messages->valid());
+ $batch = $messages->current();
+ self::assertInstanceOf(MessagesBatch::class, $batch);
+
+ self::assertSame(1, $batch->count());
+ self::assertCount(1, $batch->getMessages());
+ self::assertSame($thirdMessages, $batch->getMessages());
+
+ try {
+ $messages->next();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertFalse($messages->valid());
+ }
+ }
+
+ /**
+ * @return mixed[]
+ */
+ public function providerMaxBytes() : iterable
+ {
+ yield [null];
+ yield [10000];
+ }
+
+ public function testCantCreateBatchConsumerWithoutBatchLimitation() : void
+ {
+ $client = Mockery::mock(RestClient::class);
+ $consumerVO = new Consumer();
+
+ $this->expectException(AssertionFailedException::class);
+ $this->expectExceptionMessage('You must specify at least one of maxCount/maxDuration');
+
+ new BatchConsumer($client, $consumerVO, null, null);
+ }
+
+ public function testCommit() : void
+ {
+ $message1 = new Message();
+ $message1->topic = 'some-topic';
+ $message1->partition = 0;
+ $message1->offset = 12;
+
+ $message2 = new Message();
+ $message2->topic = 'some-topic';
+ $message2->partition = 1;
+ $message2->offset = 5;
+
+ $message3 = new Message();
+ $message3->topic = 'some-other-topic';
+ $message3->partition = 0;
+ $message3->offset = 31;
+
+ $message4 = new Message();
+ $message4->topic = 'some-topic';
+ $message4->partition = 0;
+ $message4->offset = 13;
+
+ $message5 = new Message();
+ $message5->topic = 'some-other-topic';
+ $message5->partition = 0;
+ $message5->offset = 32;
+
+ $consumerVO = new Consumer();
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('consumerCommitOffsets')
+ ->with($consumerVO, IsEqual::equalTo([
+ new Offset('some-other-topic', 0, 33),
+ new Offset('some-topic', 0, 14),
+ new Offset('some-topic', 1, 6),
+ ]));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new BatchConsumer($client, $consumerVO, 100);
+
+ $batch = new MessagesBatch();
+ $batch->add($message1);
+ $batch->add($message2);
+ $batch->add($message3);
+ $batch->add($message4);
+ $batch->add($message5);
+
+ $consumer->commit($batch);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testCloseAfterError() : void
+ {
+ $consumerVO = new Consumer();
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, 10, null)
+ ->andThrow($expectedException = new RuntimeException('some error'));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new BatchConsumer($client, $consumerVO, 100);
+
+ $messages = $consumer->consume();
+ self::assertInstanceOf(Generator::class, $messages);
+
+ try {
+ $messages->current();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertSame('Failed to consume messages; consumer is now closed', $exception->getMessage());
+ self::assertSame($expectedException, $exception->getPrevious());
+ }
+
+ $messages = $consumer->consume();
+ self::assertInstanceOf(Generator::class, $messages);
+
+ try {
+ $messages->current();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertSame('Consumer is closed', $exception->getMessage());
+ self::assertNull($exception->getPrevious());
+ }
+ }
+}
diff --git a/tests/ConsumerFactoryTest.php b/tests/ConsumerFactoryTest.php
new file mode 100644
index 0000000..f1525d4
--- /dev/null
+++ b/tests/ConsumerFactoryTest.php
@@ -0,0 +1,54 @@
+shouldReceive('createConsumer')
+ ->once()
+ ->with($group, $consumerOptions)
+ ->andReturn($consumer);
+
+ $client->shouldReceive('consumerSubscribe')
+ ->once()
+ ->with($consumer, $subscription);
+
+ $client->shouldReceive('deleteConsumer')
+ ->with($consumer);
+
+ $factory = new ConsumerFactory($client);
+ $factory->create($group, $subscription, $consumerOptions);
+ self::assertThat(true, new IsTrue());
+ }
+
+ /**
+ * @return mixed[]
+ */
+ public function providerCreate() : iterable
+ {
+ yield [null];
+ yield [new ConsumerOptions()];
+ }
+}
diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php
new file mode 100644
index 0000000..3ad5579
--- /dev/null
+++ b/tests/ConsumerTest.php
@@ -0,0 +1,143 @@
+shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $timeout, $maxBytes)
+ ->andReturns($firstMessages);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, $timeout, $maxBytes)
+ ->andReturns($secondMessages);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->andThrow(new Exception('some exception'));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new Consumer($client, $consumerVO);
+ $messages = $consumer->consume($timeout, $maxBytes);
+
+ self::assertInstanceOf(Generator::class, $messages);
+
+ self::assertTrue($messages->valid());
+ self::assertSame($firstMessages[0], $messages->current());
+
+ $messages->next();
+ self::assertTrue($messages->valid());
+ self::assertSame($firstMessages[1], $messages->current());
+
+ $messages->next();
+ self::assertTrue($messages->valid());
+ self::assertSame($secondMessages[0], $messages->current());
+ self::assertTrue($messages->valid());
+
+ try {
+ $messages->next();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertFalse($messages->valid());
+ }
+ }
+
+ /**
+ * @return mixed[]
+ */
+ public function providerConsume() : iterable
+ {
+ yield [null, null];
+ yield [null, 10000];
+ yield [10, null];
+ }
+
+ public function testCommit() : void
+ {
+ $message = new Message();
+ $message->topic = 'some-topic';
+ $message->partition = 0;
+ $message->offset = 12;
+
+ $consumerVO = new \Grongor\KafkaRest\Api\Value\Response\Consumer();
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('consumerCommitOffsets')
+ ->once()
+ ->with($consumerVO, IsEqual::equalTo([new Offset('some-topic', 0, 13)]));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new Consumer($client, $consumerVO);
+ $consumer->commit($message);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testCloseAfterError() : void
+ {
+ $consumerVO = new \Grongor\KafkaRest\Api\Value\Response\Consumer();
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('getConsumerMessages')
+ ->once()
+ ->with($consumerVO, null, null)
+ ->andThrow($expectedException = new RuntimeException('some error'));
+
+ $client->shouldReceive('deleteConsumer')
+ ->once()
+ ->with($consumerVO);
+
+ $consumer = new Consumer($client, $consumerVO);
+
+ $messages = $consumer->consume();
+ self::assertInstanceOf(Generator::class, $messages);
+
+ try {
+ $messages->current();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertSame('Failed to consume messages; consumer is now closed', $exception->getMessage());
+ self::assertSame($expectedException, $exception->getPrevious());
+ }
+
+ $messages = $consumer->consume();
+ self::assertInstanceOf(Generator::class, $messages);
+
+ try {
+ $messages->current();
+ self::fail('Expected an exception');
+ } catch (ConsumerClosed $exception) {
+ self::assertSame('Consumer is closed', $exception->getMessage());
+ self::assertNull($exception->getPrevious());
+ }
+ }
+}
diff --git a/tests/ProducerTest.php b/tests/ProducerTest.php
new file mode 100644
index 0000000..493db23
--- /dev/null
+++ b/tests/ProducerTest.php
@@ -0,0 +1,150 @@
+shouldReceive('produce')
+ ->once()
+ ->with('some-topic', [$message])
+ ->andReturn([$result]);
+
+ $producer = new Producer($client);
+ $producer->produce('some-topic', $message);
+ self::assertThat(true, new IsTrue());
+ }
+
+ public function testProduceWithNonRetryableError() : void
+ {
+ $message = new Message('msg1');
+
+ $result = ProduceResult::error(1, 'some non-retryable error');
+
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('produce')
+ ->once()
+ ->with('some-topic', [$message])
+ ->andReturn([$result]);
+
+ $producer = new Producer($client);
+
+ try {
+ $producer->produce('some-topic', $message);
+ self::fail('Expectec an FailedToProduceMessages exception');
+ } catch (FailedToProduceMessages $exception) {
+ self::assertSame(
+ [
+ [
+ 'error' => $result->error,
+ 'message' => $message,
+ ],
+ ],
+ $exception->nonRetryable
+ );
+ self::assertCount(0, $exception->retryable);
+ }
+ }
+
+ public function testProduceWithRetryableError() : void
+ {
+ $message = new Message('msg1');
+
+ $result = ProduceResult::error(2, 'some retryable error');
+
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('produce')
+ ->once()
+ ->with('some-topic', [$message])
+ ->andReturn([$result]);
+
+ $producer = new Producer($client);
+
+ try {
+ $producer->produce('some-topic', $message);
+ self::fail('Expectec an FailedToProduceMessages exception');
+ } catch (FailedToProduceMessages $exception) {
+ self::assertCount(0, $exception->nonRetryable);
+ self::assertSame(
+ [
+ [
+ 'error' => $result->error,
+ 'message' => $message,
+ ],
+ ],
+ $exception->retryable
+ );
+ }
+ }
+
+ public function testProduceBatch() : void
+ {
+ $message1 = new Message('msg1');
+ $message2 = new Message('msg2');
+ $message3 = new Message('msg3');
+ $message4 = new Message('msg3');
+ $message5 = new Message('msg3');
+ $messages = [$message1, $message2, $message3, $message4, $message5];
+
+ $result1 = ProduceResult::offset(1, 25);
+ $result2 = ProduceResult::error(1, 'some non-retryable error');
+ $result3 = ProduceResult::error(2, 'some retryable error');
+ $result4 = ProduceResult::error(1, 'some non-retryable error');
+ $result5 = ProduceResult::offset(0, 26);
+
+ $results = [$result1, $result2, $result3, $result4, $result5];
+
+ $client = Mockery::mock(RestClient::class);
+ $client->shouldReceive('produce')
+ ->once()
+ ->with('some-topic', $messages)
+ ->andReturn($results);
+
+ $producer = new Producer($client);
+
+ try {
+ $producer->produceBatch('some-topic', $messages);
+ self::fail('Expectec an FailedToProduceMessages exception');
+ } catch (FailedToProduceMessages $exception) {
+ self::assertSame(
+ [
+ [
+ 'error' => $result2->error,
+ 'message' => $message2,
+ ],
+ [
+ 'error' => $result4->error,
+ 'message' => $message4,
+ ],
+ ],
+ $exception->nonRetryable
+ );
+ self::assertSame(
+ [
+ [
+ 'error' => $result3->error,
+ 'message' => $message3,
+ ],
+ ],
+ $exception->retryable
+ );
+ }
+ }
+}
diff --git a/tests/bootstrap.php b/tests/bootstrap.php
new file mode 100644
index 0000000..3e7c0a7
--- /dev/null
+++ b/tests/bootstrap.php
@@ -0,0 +1,16 @@
+