From 6f8ac51a3db939d6b81ca33cd3c816504d89dae8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Tue, 5 Nov 2019 23:09:41 +0100 Subject: [PATCH] First version --- .gitignore | 7 + .travis.yml | 44 ++ Makefile | 72 +++ README.md | 117 +++- composer.json | 53 ++ phpcs.xml.dist | 12 + phpstan.neon.dist | 6 + phpunit.xml.dist | 22 + src/Api/Exception/UnexpectedApiResponse.php | 26 + src/Api/HttpRestClient.php | 380 +++++++++++++ src/Api/RestClient.php | 157 ++++++ src/Api/Value/Request/ConsumerOptions.php | 53 ++ src/Api/Value/Request/Message.php | 30 + src/Api/Value/Request/Offset.php | 35 ++ src/Api/Value/Request/Partition.php | 28 + src/Api/Value/Request/Subscription.php | 67 +++ src/Api/Value/Response/AssignedPartition.php | 22 + src/Api/Value/Response/Consumer.php | 22 + src/Api/Value/Response/Message.php | 64 +++ src/Api/Value/Response/Offset.php | 37 ++ src/Api/Value/Response/Partition.php | 28 + src/Api/Value/Response/ProduceError.php | 23 + src/Api/Value/Response/ProduceResult.php | 82 +++ src/Api/Value/Response/ProduceResults.php | 20 + src/Api/Value/Response/Replica.php | 28 + src/Api/Value/Response/Topic.php | 28 + src/BatchConsumer.php | 155 ++++++ src/BatchConsumerFactory.php | 33 ++ src/Consumer.php | 69 +++ src/ConsumerFactory.php | 31 ++ src/Exception/ConsumerClosed.php | 21 + src/Exception/FailedToProduceMessages.php | 29 + src/Producer.php | 61 +++ src/Value/MessagesBatch.php | 37 ++ tests/Api/HttpRestClientTest.php | 546 +++++++++++++++++++ tests/Api/data/createConsumer.json | 4 + tests/Api/data/getConsumerMessages.json | 16 + tests/Api/data/getPartition.json | 16 + tests/Api/data/getTopic.json | 41 ++ tests/Api/data/listPartitions.json | 34 ++ tests/Api/data/produce.json | 22 + tests/Api/data/produceToPartition.json | 16 + tests/BatchConsumerFactoryTest.php | 56 ++ tests/BatchConsumerTest.php | 375 +++++++++++++ tests/ConsumerFactoryTest.php | 54 ++ tests/ConsumerTest.php | 143 +++++ tests/ProducerTest.php | 150 +++++ tests/bootstrap.php | 16 + 48 files changed, 3387 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 Makefile create mode 100644 composer.json create mode 100644 phpcs.xml.dist create mode 100644 phpstan.neon.dist create mode 100644 phpunit.xml.dist create mode 100644 src/Api/Exception/UnexpectedApiResponse.php create mode 100644 src/Api/HttpRestClient.php create mode 100644 src/Api/RestClient.php create mode 100644 src/Api/Value/Request/ConsumerOptions.php create mode 100644 src/Api/Value/Request/Message.php create mode 100644 src/Api/Value/Request/Offset.php create mode 100644 src/Api/Value/Request/Partition.php create mode 100644 src/Api/Value/Request/Subscription.php create mode 100644 src/Api/Value/Response/AssignedPartition.php create mode 100644 src/Api/Value/Response/Consumer.php create mode 100644 src/Api/Value/Response/Message.php create mode 100644 src/Api/Value/Response/Offset.php create mode 100644 src/Api/Value/Response/Partition.php create mode 100644 src/Api/Value/Response/ProduceError.php create mode 100644 src/Api/Value/Response/ProduceResult.php create mode 100644 src/Api/Value/Response/ProduceResults.php create mode 100644 src/Api/Value/Response/Replica.php create mode 100644 src/Api/Value/Response/Topic.php create mode 100644 src/BatchConsumer.php create mode 100644 src/BatchConsumerFactory.php create mode 100644 src/Consumer.php create mode 100644 src/ConsumerFactory.php create mode 100644 src/Exception/ConsumerClosed.php create mode 100644 src/Exception/FailedToProduceMessages.php create mode 100644 src/Producer.php create mode 100644 src/Value/MessagesBatch.php create mode 100644 tests/Api/HttpRestClientTest.php create mode 100644 tests/Api/data/createConsumer.json create mode 100644 tests/Api/data/getConsumerMessages.json create mode 100644 tests/Api/data/getPartition.json create mode 100644 tests/Api/data/getTopic.json create mode 100644 tests/Api/data/listPartitions.json create mode 100644 tests/Api/data/produce.json create mode 100644 tests/Api/data/produceToPartition.json create mode 100644 tests/BatchConsumerFactoryTest.php create mode 100644 tests/BatchConsumerTest.php create mode 100644 tests/ConsumerFactoryTest.php create mode 100644 tests/ConsumerTest.php create mode 100644 tests/ProducerTest.php create mode 100644 tests/bootstrap.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..26d931c --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +/vendor +/.phpcs-cache +/.phpunit.result.cache +/composer.lock +/phpcs.xml +/phpstan.neon +/phpunit.xml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..a8a7e0c --- /dev/null +++ b/.travis.yml @@ -0,0 +1,44 @@ +language: php +php: + - 7.2 + - 7.3 + - 7.4snapshot + - nightly + +env: + - DEPENDENCIES= + - DEPENDENCIES=--prefer-lowest + +install: + composer update --no-interaction --prefer-stable --prefer-dist $DEPENDENCIES + +script: make test + +stages: + - Validate composer.json + - Code Quality + - Test + +jobs: + allow_failures: + - php: 7.4snapshot + - php: nightly + + include: + - stage: Validate composer.json + name: Validate composer.json + php: 7.2 + script: + - make validate-composer + - curl -O https://github.com/maglnet/ComposerRequireChecker/releases/latest/download/composer-require-checker.phar + - php composer-require-checker.phar check composer.json + - stage: Code Quality + name: Lint + php: 7.2 + script: make lint + - name: Coding Standard + php: 7.2 + script: make cs + - name: PHPStan + php: 7.2 + script: make phpstan diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c5ca7d2 --- /dev/null +++ b/Makefile @@ -0,0 +1,72 @@ +### BEGIN main targets + +.PHONY: build +build: vendor + +.PHONY: list +list: + @$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$' + +### END + +### BEGIN secondary targets + +.PHONY: vendor +vendor: vendor/lock + +vendor/lock: composer.json + $(MAKE) validate-composer + $(MAKE) update + touch vendor/lock + +.PHONY: update +update: + composer update + +### END + +### BEGIN tests + +.PHONY: test +test: + vendor/bin/phpunit $(PHPUNIT_ARGS) + +.PHONY: lint +lint: + vendor/bin/parallel-lint src/ tests/ + +.PHONY: cs +cs: vendor + vendor/bin/phpcs $(PHPCS_ARGS) + +.PHONY: cs-fix +cs-fix: vendor + vendor/bin/phpcbf + +.PHONY: phpstan +phpstan: vendor + vendor/bin/phpstan analyse + +.PHONY: validate-composer +validate-composer: + composer validate --strict + +.PHONY: check +check: build validate-composer lint cs phpstan test + +### END + +### BEGIN cleaning + +.PHONY: clean +clean: clean-cache clean-vendor + +.PHONY: clean-cache +clean-cache: + rm -rf var/cache/* + +.PHONY: clean-vendor +clean-vendor: + rm -rf vendor + +### END diff --git a/README.md b/README.md index f988f0f..11c20b7 100644 --- a/README.md +++ b/README.md @@ -1 +1,116 @@ -# php-kafka-rest-client \ No newline at end of file +kafka-rest-client [![Build Status](https://travis-ci.com/grongor/php-kafka-rest-client.svg?branch=master)](https://travis-ci.com/grongor/php-kafka-rest-client) +================= + +This is a modern PHP client for [Confluent REST Proxy](https://docs.confluent.io/current/kafka-rest/) (version 2). + +HTTP client used for communication with API adheres to the modern [HTTP standards](http://php-http.org) like +[PSR-7](https://www.php-fig.org/psr/psr-7/), [PSR-17](https://www.php-fig.org/psr/psr-17/) +and [PSR-18](https://www.php-fig.org/psr/psr-18/). + +Aside from implementing the REST Proxy API this library adds some convenient classes useful for the most interactions +with Apache Kafka - see [examples](#Examples) for the whole list and a simple tutorial on how to use them. + +Missing features +---------------- + +Some features were deliberately skipped to simplify the implementation/shorten the development time. +If you need anything and you're willing create a pull request, I'd be happy to check it out +and (if it makes sense) merge it. You can start the discussion by opening an issue. + +- version 1 of the REST Proxy API + - It's easy to upgrade the REST Proxy, so I don't see any point in implementing the first version. +- AVRO embedded format +- JSON embedded format + - I think that the binary format is sufficient. You can always serialize/deserialize your objects before/after + they interact with this library, therefore direct integration here is just a complication. +- async operations + - To simplify the library all the methods are synchronous. That might definitely change if the need arises. +- all not-yet-implemented features mentioned [here](https://docs.confluent.io/current/kafka-rest/#features) + - When these features are implemented, they will be added here ASAP. + +Examples +-------- + +### Producer + +Producer allows you to produce messages, either one by one or in batches. Both use the same underlying API method +so it's more efficient to use the batch one. + +Both `produce` and `produceBatch` return nothing on success and throw an exception on failure. +There might be partial success/failure so the thrown exception `FailedToProduceMessages` contains two public properties, +`$retryable` and `$nonRetryable`, where each contains an array of +`['error' => 'error provided by Kafka', message => (Message object given to produce())]`. +Whether the error is [non-]retryable is based on the `error_code` as +[documented](https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name). +It's up to you what you do with those. + +```php +$producer = new Producer($restClient); +$producer->produce('your-topic', new Message('some-message')); + +$messages = [new Message('some-message'), new Message('and-some-other-message')]; +$producer->produceBatch('your-topic', $messages); +``` + +### Consumer + +Consumer allows you to consume messages one by one until you return from the loop, application throws exception +or is otherwise forced to exit. +The `consume` method returns a [Generator](https://www.php.net/manual/en/class.generator.php) and loops indefinitely, +yielding messages as they are available. `consume` method accepts two parameters: `timeout` and `maxBytes`. +`timeout` is the maximum time the consumer will wait for the messages. `maxBytes` is the maximum size of the messages +to fetch in a single request. Both of these settings are complementary to the settings in `ConsumerOptions` and to the +server settings (check the Kafka documentation for more information). + +If you set the consumer option `autoCommitEnable` to `false` then you may use consumer's `commit` method to commit +messages. Simply pass it a message you wish to commit. For most cases it's recommended to turn off the auto-commit +and manually commit each message, so that you don't ever "lose" a message if your application dies +in the middle of the processing. + +```php +$consumerFactory = new ConsumerFactory($restClient); +$consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic')); +foreach ($consumer->consume() as $message) { + // Do your magic + $logger->info('Got new message', $message->content); + + // ... and when you are done, commit the message (if you turned off auto-committing). + $consumer->commit($message); +} +``` + +### BatchConsumer + +BatchConsumer works the same way as Consumer does; the difference is that BatchConsumer doesn't yield each message +separately but first puts them in batches (`MessagesBatch`). These batches can be configured to be "limited" by either +count of messages in the batch (`maxCount`), by time (`maxDuration`) or by both (setting both `maxCount` and +`maxDuration`). If you set `maxDuration` then the batch won't ever take longer than that (+ few ms for processing) as +it changes the `timeout` parameter of the consumer (consumer won't get stuck on network waiting for more messages). + +BatchConsumer can be quite useful for processing of "large" data sets, where you would have to otherwise batch +the messages yourself (eg. for inserting into database, where batch operations are always better). + +As mentioned in the Consumer example, you may need to commit the messages. For that there is a `commit` method, +which accepts the yielded `MessagesBatch` and commits all the messages inside it in one request. + +```php +$batchConsumerFactory = new BatchConsumerFactory($restClient); +$batchConsumer = $batchConsumerFactory->create( + 'your-consumer-group', + Subscription::topic('your-topic'), + $maxCount = 10000, // Yield the batch when there is 10 000 messages in it + $maxDuration = 60 // or when 60 seconds passes, whichever comes first. +); +foreach ($batchConsumer->consume() as $messagesBatch) { + // The batch might be empty if you specified the maxDuration. + if ($messagesBatch->isEmpty()) { + continue; + } + + // Do your magic + $database->insert($messagesBatch->getMessages()); + + // ... and when you are done, commit the batch. + $batchConsumer->commit($messagesBatch); +} +``` diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..56b1558 --- /dev/null +++ b/composer.json @@ -0,0 +1,53 @@ +{ + "name": "grongor/kafka-rest-client", + "description": "Client for Confluent REST Proxy of Apache Kafka", + "type": "library", + "license": "MIT", + "keywords": ["kafka", "confluent", "proxy", "rest", "client", "librdkafka", "rdkafka"], + "config": { + "sort-packages": true + }, + "require": { + "php": "^7.2|^8.0", + "beberlei/assert": "^3.2", + "jms/serializer": "^3.2", + "lcobucci/clock": "^1.2", + "psr/http-client": "^1.0", + "psr/http-factory": "^1.0", + "psr/http-message": "^1.0.1", + "psr/log": "^1.1", + "teapot/status-code": "^1.1", + "thecodingmachine/safe": "^0.1.16" + }, + "require-dev": { + "cdn77/coding-standard": "^2.0", + "guzzlehttp/psr7": "^1.6", + "http-interop/http-factory-guzzle": "^1.0", + "jakub-onderka/php-parallel-lint": "^1.0.0", + "mockery/mockery": "^1.2.3", + "php-http/message": "^1.0", + "phpstan/extension-installer": "^1.0", + "phpstan/phpstan": "^0.11.19", + "phpstan/phpstan-beberlei-assert": "^0.11.2", + "phpstan/phpstan-mockery": "^0.11.3", + "phpstan/phpstan-phpunit": "^0.11.2", + "phpstan/phpstan-strict-rules": "^0.11.1", + "phpunit/phpunit": "^8.0", + "roave/security-advisories": "dev-master", + "slevomat/coding-standard": "dev-master as 5.0.4", + "thecodingmachine/phpstan-safe-rule": "^0.1.4" + }, + "conflict": { + "doctrine/annotations": "<1.7.0" + }, + "autoload": { + "psr-4": { + "Grongor\\KafkaRest\\": "src/" + } + }, + "autoload-dev": { + "psr-4": { + "Grongor\\KafkaRest\\Tests\\": "tests/" + } + } +} diff --git a/phpcs.xml.dist b/phpcs.xml.dist new file mode 100644 index 0000000..b606c4f --- /dev/null +++ b/phpcs.xml.dist @@ -0,0 +1,12 @@ + + + + + + + src/ + tests/ + diff --git a/phpstan.neon.dist b/phpstan.neon.dist new file mode 100644 index 0000000..ff1ee6a --- /dev/null +++ b/phpstan.neon.dist @@ -0,0 +1,6 @@ +parameters: + memory-limit: -1 + level: max + paths: + - %currentWorkingDirectory%/src + - %currentWorkingDirectory%/tests diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..025b9cd --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,22 @@ + + + + + + tests + + + + + src/ + + + diff --git a/src/Api/Exception/UnexpectedApiResponse.php b/src/Api/Exception/UnexpectedApiResponse.php new file mode 100644 index 0000000..3a37d29 --- /dev/null +++ b/src/Api/Exception/UnexpectedApiResponse.php @@ -0,0 +1,26 @@ + $response + */ + public static function fromResponse(int $httpCode, array $response) : self + { + return new self( + sprintf('Unexpected response HTTP %d: [%d] %s', $httpCode, $response['error_code'], $response['message']) + ); + } +} diff --git a/src/Api/HttpRestClient.php b/src/Api/HttpRestClient.php new file mode 100644 index 0000000..bc60775 --- /dev/null +++ b/src/Api/HttpRestClient.php @@ -0,0 +1,380 @@ +client = $client; + $this->logger = $logger; + $this->requestFactory = $requestFactory; + $this->serializer = $serializer; + $this->streamFactory = $streamFactory; + $this->uriFactory = $uriFactory; + + $this->apiUri = $this->uriFactory->createUri($apiUri); + } + + /** {@inheritDoc} */ + public function listTopics() : array + { + /** @var array $topics */ + $topics = $this->execute($this->get('/topics'), 'array'); + + return $topics; + } + + /** {@inheritDoc} */ + public function getTopic(string $topic) : Topic + { + /** @var Topic $result */ + $result = $this->execute($this->get(sprintf('/topics/%s', $topic)), Topic::class); + + return $result; + } + + /** {@inheritDoc} */ + public function produce(string $topic, iterable $messages) : array + { + /** @var ProduceResults $result */ + $result = $this->execute( + $this->post(sprintf('/topics/%s', $topic), $this->serialize(['records' => $messages])), + ProduceResults::class + ); + + return $result->results; + } + + /** {@inheritDoc} */ + public function listPartitions(string $topic) : array + { + /** @var Partition[] $partitions */ + $partitions = $this->execute( + $this->get(sprintf('/topics/%s/partitions', $topic)), + sprintf('array<%s>', Partition::class) + ); + + return $partitions; + } + + /** {@inheritDoc} */ + public function getPartition(string $topic, int $partition) : Partition + { + /** @var Partition $result */ + $result = $this->execute( + $this->get(sprintf('/topics/%s/partitions/%d', $topic, $partition)), + Partition::class + ); + + return $result; + } + + /** {@inheritDoc} */ + public function produceToPartition(string $topic, int $partition, iterable $messages) : array + { + /** @var ProduceResults $result */ + $result = $this->execute( + $this->post( + sprintf('/topics/%s/partitions/%d', $topic, $partition), + $this->serialize(['records' => $messages]) + ), + ProduceResults::class + ); + + return $result->results; + } + + /** {@inheritDoc} */ + public function createConsumer(string $group, ?ConsumerOptions $consumerOptions = null) : Consumer + { + $consumerOptions = $consumerOptions ?? new ConsumerOptions(); + + /** @var Consumer $consumer */ + $consumer = $this->execute( + $this->post(sprintf('/consumers/%s', $group), $this->serialize($consumerOptions)), + Consumer::class + ); + + if ($this->apiUri->getUserInfo() !== '') { + $userAndPassword = explode(':', $this->apiUri->getUserInfo(), 2); + $consumerUri = $this->uriFactory->createUri($consumer->baseUri); + $consumer->baseUri = (string) $consumerUri->withUserInfo($userAndPassword[0], $userAndPassword[1] ?? null); + } + + return $consumer; + } + + /** {@inheritDoc} */ + public function deleteConsumer(Consumer $consumer) : void + { + $this->execute($this->consumerDelete($consumer, '')); + } + + /** {@inheritDoc} */ + public function consumerCommitOffsets(Consumer $consumer, ?iterable $offsets = null) : void + { + $body = $offsets === null ? '' : $this->serialize(['offsets' => $offsets]); + + // For some reason the API always returns empty array + // @see https://github.com/confluentinc/kafka-rest/blob/3473f3f/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerState.java#L136 + $this->execute($this->consumerPost($consumer, '/offsets', $body), 'array'); + } + + /** {@inheritDoc} */ + public function getConsumerCommittedOffsets(Consumer $consumer, iterable $partitions) : array + { + /** @var array> $result */ + $result = $this->execute( + $this->consumerGet($consumer, '/offsets', $this->serialize(['partitions' => $partitions])), + sprintf('array>', Offset::class) + ); + + return $result['offsets']; + } + + /** {@inheritDoc} */ + public function consumerSubscribe(Consumer $consumer, Subscription $subscription) : void + { + $this->execute($this->consumerPost($consumer, '/subscription', $this->serialize($subscription))); + } + + /** {@inheritDoc} */ + public function getConsumerSubscribedTopics(Consumer $consumer) : array + { + /** @var array> $result */ + $result = $this->execute($this->consumerGet($consumer, '/subscription'), 'array>'); + + return $result['topics']; + } + + /** {@inheritDoc} */ + public function consumerUnsubscribe(Consumer $consumer) : void + { + $this->execute($this->consumerDelete($consumer, '/subscription')); + } + + /** {@inheritDoc} */ + public function consumerAssignPartitions(Consumer $consumer, iterable $partitions) : void + { + $this->execute($this->consumerPost($consumer, '/assignments', $this->serialize(['partitions' => $partitions]))); + } + + /** {@inheritDoc} */ + public function getConsumerAssignedPartitions(Consumer $consumer) : array + { + /** @var array> $result */ + $result = $this->execute( + $this->consumerGet($consumer, '/assignments'), + sprintf('array>', AssignedPartition::class) + ); + + return $result['partitions']; + } + + /** {@inheritDoc} */ + public function consumerSeek(Consumer $consumer, iterable $offsets) : void + { + $this->execute($this->consumerPost($consumer, '/positions', $this->serialize(['offsets' => $offsets]))); + } + + /** {@inheritDoc} */ + public function consumerSeekStart(Consumer $consumer, iterable $partitions) : void + { + $this->execute( + $this->consumerPost($consumer, '/positions/beginning', $this->serialize(['partitions' => $partitions])) + ); + } + + /** {@inheritDoc} */ + public function consumerSeekEnd(Consumer $consumer, iterable $partitions) : void + { + $this->execute( + $this->consumerPost($consumer, '/positions/end', $this->serialize(['partitions' => $partitions])) + ); + } + + /** {@inheritDoc} */ + public function getConsumerMessages(Consumer $consumer, ?int $timeout = null, ?int $maxBytes = null) : array + { + $parameters = []; + if ($timeout !== null) { + $parameters['timeout'] = $timeout; + } + + if ($maxBytes !== null) { + $parameters['maxBytes'] = $maxBytes; + } + + $request = $this->consumerGet($consumer, '/records'); + $request = $request->withUri($request->getUri()->withQuery(http_build_query($parameters)), true); + + /** @var array $messages */ + $messages = $this->execute($request, sprintf('array<%s>', Message::class)); + + return $messages; + } + + /** {@inheritDoc} */ + public function getBrokers() : array + { + /** @var array> $result */ + $result = $this->execute($this->get('/brokers'), 'array>'); + + return $result['brokers']; + } + + private function request(string $method, string $path, ?UriInterface $baseUri = null) : RequestInterface + { + $request = $this->requestFactory->createRequest($method, ($baseUri ?? $this->apiUri)->withPath($path)); + + return $request->withHeader('Accept', 'application/vnd.kafka.v2+json'); + } + + private function get(string $path, ?UriInterface $baseUri = null) : RequestInterface + { + return $this->request('GET', $path, $baseUri); + } + + private function consumerGet(Consumer $consumer, string $path, ?string $body = null) : RequestInterface + { + $consumerUri = $this->uriFactory->createUri($consumer->baseUri); + $request = $this->get($consumerUri->getPath() . $path, $consumerUri); + + return $body === null ? $request : $this->requestWithBody($request, $body); + } + + private function post(string $path, string $body, ?UriInterface $baseUri = null) : RequestInterface + { + return $this->requestWithBody($this->request('POST', $path, $baseUri), $body); + } + + private function consumerPost(Consumer $consumer, string $path, string $body) : RequestInterface + { + $consumerUri = $this->uriFactory->createUri($consumer->baseUri); + + return $this->post($consumerUri->getPath() . $path, $body, $consumerUri); + } + + private function consumerDelete(Consumer $consumer, string $path) : RequestInterface + { + $consumerUri = $this->uriFactory->createUri($consumer->baseUri); + + return $this->request('DELETE', $consumerUri->getPath() . $path, $consumerUri); + } + + private function requestWithBody(RequestInterface $request, string $body) : RequestInterface + { + $request = $request->withHeader('Content-Type', 'application/vnd.kafka.binary.v2+json'); + $request = $request->withBody($this->streamFactory->createStream($body)); + + return $request; + } + + /** + * @return mixed + */ + private function execute(RequestInterface $request, ?string $resultType = null) + { + $this->logger->debug( + 'Sending request', + [ + 'method' => $request->getMethod(), + 'uri' => (string) $request->getUri(), + 'body' => (string) $request->getBody(), + ] + ); + + $response = $this->client->sendRequest($request); + + $this->logger->debug( + 'Received response', + [ + 'statusCode' => $response->getStatusCode(), + 'statusReason' => $response->getReasonPhrase(), + 'body' => (string) $response->getBody(), + ] + ); + + if ($response->getStatusCode() === Http::OK) { + Assertion::notNull($resultType); + + return $this->serializer->deserialize((string) $response->getBody(), $resultType, 'json'); + } + + if ($response->getStatusCode() === Http::NO_CONTENT) { + return null; + } + + $body = (string) $response->getBody(); + if ($body === '') { + throw UnexpectedApiResponse::fromStatus($response->getStatusCode(), $response->getReasonPhrase()); + } + + throw UnexpectedApiResponse::fromResponse($response->getStatusCode(), json_decode($body, true)); + } + + /** + * @param mixed $data + */ + private function serialize($data) : string + { + return $this->serializer->serialize($data, 'json'); + } +} diff --git a/src/Api/RestClient.php b/src/Api/RestClient.php new file mode 100644 index 0000000..cb855c5 --- /dev/null +++ b/src/Api/RestClient.php @@ -0,0 +1,157 @@ + + */ + public function listTopics() : array; + + /** + * GET /topics/(string:topic_name) + */ + public function getTopic(string $topic) : Topic; + + /** + * POST /topics/(string:topic_name) + * + * @param Value\Request\Message[] $messages + * + * @return ProduceResult[] + */ + public function produce(string $topic, iterable $messages) : array; + + /** + * GET /topics/(string:topic_name)/partitions + * + * @return Partition[] + */ + public function listPartitions(string $topic) : array; + + /** + * GET /topics/(string:topic_name)/partitions/(int:partition_id) + */ + public function getPartition(string $topic, int $partition) : Partition; + + /** + * POST /topics/(string:topic_name)/partitions/(int:partition_id) + * + * @param Value\Request\Message[] $messages + * + * @return ProduceResult[] + */ + public function produceToPartition(string $topic, int $partition, iterable $messages) : array; + + /** + * POST /consumers/(string:group_name) + */ + public function createConsumer(string $group, ?ConsumerOptions $consumerOptions = null) : Consumer; + + /** + * DELETE /consumers/(string:group_name)/instances/(string:instance) + */ + public function deleteConsumer(Consumer $consumer) : void; + + /** + * POST /consumers/(string:group_name)/instances/(string:instance)/offsets + * + * Be aware that REST Proxy increments all the offsets for some reason + * + * @see https://github.com/confluentinc/kafka-rest/blob/3473f3f/kafka-rest/src/main/java/io/confluent/kafkarest/v2/KafkaConsumerState.java#L123 + * @see https://kafka.apache.org/22/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync + * + * @param Value\Request\Offset[]|null $offsets + */ + public function consumerCommitOffsets(Consumer $consumer, ?iterable $offsets = null) : void; + + /** + * GET /consumers/(string:group_name)/instances/(string:instance)/offsets + * + * @param Value\Request\Partition[] $partitions + * + * @return Offset[] + */ + public function getConsumerCommittedOffsets(Consumer $consumer, iterable $partitions) : array; + + /** + * POST /consumers/(string:group_name)/instances/(string:instance)/subscription + */ + public function consumerSubscribe(Consumer $consumer, Subscription $subscription) : void; + + /** + * GET /consumers/(string:group_name)/instances/(string:instance)/subscription + * + * @return array + */ + public function getConsumerSubscribedTopics(Consumer $consumer) : array; + + /** + * DELETE /consumers/(string:group_name)/instances/(string:instance)/subscription + */ + public function consumerUnsubscribe(Consumer $consumer) : void; + + /** + * POST /consumers/(string:group_name)/instances/(string:instance)/assignments + * + * @param Value\Request\Partition[] $partitions + */ + public function consumerAssignPartitions(Consumer $consumer, iterable $partitions) : void; + + /** + * GET /consumers/(string:group_name)/instances/(string:instance)/assignments + * + * @return AssignedPartition[] + */ + public function getConsumerAssignedPartitions(Consumer $consumer) : array; + + /** + * POST /consumers/(string:group_name)/instances/(string:instance)/positions + * + * @param Value\Request\Offset[] $offsets + */ + public function consumerSeek(Consumer $consumer, iterable $offsets) : void; + + /** + * POST /consumers/(string:group_name)/instances/(string:instance)/positions/beginning + * + * @param Value\Request\Partition[] $partitions + */ + public function consumerSeekStart(Consumer $consumer, iterable $partitions) : void; + + /** + * POST /consumers/(string:group_name)/instances/(string:instance)/positions/end + * + * @param Value\Request\Partition[] $partitions + */ + public function consumerSeekEnd(Consumer $consumer, iterable $partitions) : void; + + /** + * GET /consumers/(string:group_name)/instances/(string:instance)/records + * + * @return Message[] + */ + public function getConsumerMessages(Consumer $consumer, ?int $timeout = null, ?int $maxBytes = null) : array; + + /** + * GET /brokers + * + * @return array + */ + public function getBrokers() : array; +} diff --git a/src/Api/Value/Request/ConsumerOptions.php b/src/Api/Value/Request/ConsumerOptions.php new file mode 100644 index 0000000..1384dfa --- /dev/null +++ b/src/Api/Value/Request/ConsumerOptions.php @@ -0,0 +1,53 @@ +content = base64_encode($content); + $this->key = $key === null ? null : base64_encode($key); + } +} diff --git a/src/Api/Value/Request/Offset.php b/src/Api/Value/Request/Offset.php new file mode 100644 index 0000000..ab57fa6 --- /dev/null +++ b/src/Api/Value/Request/Offset.php @@ -0,0 +1,35 @@ +topic = $topic; + $this->partition = $partition; + $this->offset = $offset; + } +} diff --git a/src/Api/Value/Request/Partition.php b/src/Api/Value/Request/Partition.php new file mode 100644 index 0000000..6722e94 --- /dev/null +++ b/src/Api/Value/Request/Partition.php @@ -0,0 +1,28 @@ +topic = $topic; + $this->partition = $partition; + } +} diff --git a/src/Api/Value/Request/Subscription.php b/src/Api/Value/Request/Subscription.php new file mode 100644 index 0000000..8932f24 --- /dev/null +++ b/src/Api/Value/Request/Subscription.php @@ -0,0 +1,67 @@ +") + * @Serializer\Accessor(getter="getTopics") + * @Serializer\SkipWhenEmpty() + * @var iterable|null + */ + private $topics; + + /** + * @Serializer\Type("string") + * @Serializer\SerializedName("topic_pattern") + * @Serializer\Accessor(getter="getPattern") + * @Serializer\SkipWhenEmpty() + * @var string|null + */ + private $pattern; + + /** + * @param iterable|null $topics + */ + private function __construct(?iterable $topics, ?string $pattern) + { + $this->topics = $topics; + $this->pattern = $pattern; + } + + public static function topic(string $topic) : self + { + return new self([$topic], null); + } + + /** + * @param iterable $topics + */ + public static function topics(iterable $topics) : self + { + return new self($topics, null); + } + + public static function pattern(string $pattern) : self + { + return new self(null, $pattern); + } + + /** + * @return string[]|null + */ + public function getTopics() : ?iterable + { + return $this->topics; + } + + public function getPattern() : ?string + { + return $this->pattern; + } +} diff --git a/src/Api/Value/Response/AssignedPartition.php b/src/Api/Value/Response/AssignedPartition.php new file mode 100644 index 0000000..596177e --- /dev/null +++ b/src/Api/Value/Response/AssignedPartition.php @@ -0,0 +1,22 @@ +content = base64_decode($this->content, true); + + if ($this->key === null) { + return; + } + + $this->key = base64_decode($this->key, true); + } +} diff --git a/src/Api/Value/Response/Offset.php b/src/Api/Value/Response/Offset.php new file mode 100644 index 0000000..81a6d73 --- /dev/null +++ b/src/Api/Value/Response/Offset.php @@ -0,0 +1,37 @@ +partition = $partition; + $this->offset = $offset; + } +} diff --git a/src/Api/Value/Response/Partition.php b/src/Api/Value/Response/Partition.php new file mode 100644 index 0000000..f861c15 --- /dev/null +++ b/src/Api/Value/Response/Partition.php @@ -0,0 +1,28 @@ +") + * @var Replica[] + */ + public $replicas; +} diff --git a/src/Api/Value/Response/ProduceError.php b/src/Api/Value/Response/ProduceError.php new file mode 100644 index 0000000..0ef2eae --- /dev/null +++ b/src/Api/Value/Response/ProduceError.php @@ -0,0 +1,23 @@ +errorCode = $errorCode; + $this->error = $error; + } +} diff --git a/src/Api/Value/Response/ProduceResult.php b/src/Api/Value/Response/ProduceResult.php new file mode 100644 index 0000000..67b2922 --- /dev/null +++ b/src/Api/Value/Response/ProduceResult.php @@ -0,0 +1,82 @@ +partition = $partition; + $result->offset = $offset; + + return $result; + } + + /** + * @internal + */ + public static function error(int $errorCode, string $error) : self + { + $result = new self(); + $result->errorCode = $errorCode; + $result->error = $error; + + return $result; + } + + public function isError() : bool + { + return $this->errorCode !== null; + } + + public function getOffset() : Offset + { + Assertion::false($this->isError()); + + return new Offset((int) $this->partition, (int) $this->offset); + } + + public function getError() : ProduceError + { + Assertion::true($this->isError()); + + return new ProduceError((int) $this->errorCode, (string) $this->error); + } +} diff --git a/src/Api/Value/Response/ProduceResults.php b/src/Api/Value/Response/ProduceResults.php new file mode 100644 index 0000000..10e05be --- /dev/null +++ b/src/Api/Value/Response/ProduceResults.php @@ -0,0 +1,20 @@ +") + * @Serializer\SerializedName("offsets") + * @var ProduceResult[] + */ + public $results; +} diff --git a/src/Api/Value/Response/Replica.php b/src/Api/Value/Response/Replica.php new file mode 100644 index 0000000..72ccc49 --- /dev/null +++ b/src/Api/Value/Response/Replica.php @@ -0,0 +1,28 @@ +") + * @var array + */ + public $configs; + + /** + * @Serializer\Type("array") + * @var Partition[] + */ + public $partitions; +} diff --git a/src/BatchConsumer.php b/src/BatchConsumer.php new file mode 100644 index 0000000..73006b1 --- /dev/null +++ b/src/BatchConsumer.php @@ -0,0 +1,155 @@ +client = $client; + $this->consumer = $consumer; + $this->maxCount = $maxCount; + + if ($maxDuration !== null) { + $this->maxDuration = new DateInterval(sprintf('PT%dS', $maxDuration)); + } + + $this->clock = $clock ?? new SystemClock(); + } + + public function __destruct() + { + if (! isset($this->consumer)) { + return; + } + + $this->close(); + } + + /** + * @return Message[] + */ + public function consume(?int $maxBytes = null) : iterable + { + if (! isset($this->consumer)) { + throw ConsumerClosed::new(); + } + + $batch = new MessagesBatch(); + $batchTimeout = $this->getBatchTimeout(); + + try { + while (true) { + $messages = $this->client->getConsumerMessages( + $this->consumer, + $this->getTimeout($batchTimeout), + $maxBytes + ); + foreach ($messages as $message) { + $batch->add($message); + + if (($this->maxCount === null || $batch->count() !== $this->maxCount) && + ($this->maxDuration === null || $this->clock->now() < $batchTimeout) + ) { + continue; + } + + yield $batch; + + $batch = new MessagesBatch(); + $batchTimeout = $this->getBatchTimeout(); + } + } + } catch (Throwable $throwable) { + $this->close(); + + throw ConsumerClosed::error($throwable); + } + } + + public function commit(MessagesBatch $messagesBatch) : void + { + if ($messagesBatch->isEmpty()) { + return; + } + + $offsets = $offsetsMap = []; + $messages = $messagesBatch->getMessages(); + for ($i = count($messages) - 1; $i >= 0; $i--) { + $message = $messages[$i]; + if (isset($offsetsMap[$message->topic][$message->partition])) { + continue; + } + + $offsetsMap[$message->topic][$message->partition] = $message->offset; + $offsets[] = new Offset($message->topic, $message->partition, $message->offset); + } + + $this->client->consumerCommitOffsets($this->consumer, $offsets); + } + + private function getBatchTimeout() : ?DateTimeImmutable + { + if ($this->maxDuration !== null) { + return $this->clock->now()->add($this->maxDuration); + } + + return null; + } + + private function getTimeout(?DateTimeImmutable $batchTimeout) : int + { + if ($batchTimeout === null) { + return 10; + } + + return $batchTimeout->getTimestamp() - $this->clock->now()->getTimestamp(); + } + + private function close() : void + { + $this->client->deleteConsumer($this->consumer); + unset($this->consumer); + } +} diff --git a/src/BatchConsumerFactory.php b/src/BatchConsumerFactory.php new file mode 100644 index 0000000..ce6d5b9 --- /dev/null +++ b/src/BatchConsumerFactory.php @@ -0,0 +1,33 @@ +client = $client; + } + + public function create( + string $group, + Subscription $subscription, + ?int $maxCount, + ?int $maxDuration = null, + ?ConsumerOptions $consumerOptions = null + ) : BatchConsumer { + $consumer = $this->client->createConsumer($group, $consumerOptions); + $this->client->consumerSubscribe($consumer, $subscription); + + return new BatchConsumer($this->client, $consumer, $maxCount, $maxDuration); + } +} diff --git a/src/Consumer.php b/src/Consumer.php new file mode 100644 index 0000000..2d66e75 --- /dev/null +++ b/src/Consumer.php @@ -0,0 +1,69 @@ +client = $client; + $this->consumer = $consumer; + } + + public function __destruct() + { + if (! isset($this->consumer)) { + return; + } + + $this->close(); + } + + /** + * @return Message[] + */ + public function consume(?int $timeout = null, ?int $maxBytes = null) : iterable + { + if (! isset($this->consumer)) { + throw ConsumerClosed::new(); + } + + try { + while (true) { + yield from $this->client->getConsumerMessages($this->consumer, $timeout, $maxBytes); + } + } catch (Throwable $throwable) { + $this->close(); + + throw ConsumerClosed::error($throwable); + } + } + + public function commit(Message $message) : void + { + $this->client->consumerCommitOffsets( + $this->consumer, + [new Offset($message->topic, $message->partition, $message->offset)] + ); + } + + private function close() : void + { + $this->client->deleteConsumer($this->consumer); + unset($this->consumer); + } +} diff --git a/src/ConsumerFactory.php b/src/ConsumerFactory.php new file mode 100644 index 0000000..8be744c --- /dev/null +++ b/src/ConsumerFactory.php @@ -0,0 +1,31 @@ +client = $client; + } + + public function create( + string $group, + Subscription $subscription, + ?ConsumerOptions $consumerOptions = null + ) : Consumer { + $consumer = $this->client->createConsumer($group, $consumerOptions); + $this->client->consumerSubscribe($consumer, $subscription); + + return new Consumer($this->client, $consumer); + } +} diff --git a/src/Exception/ConsumerClosed.php b/src/Exception/ConsumerClosed.php new file mode 100644 index 0000000..ed5b965 --- /dev/null +++ b/src/Exception/ConsumerClosed.php @@ -0,0 +1,21 @@ +retryable = $retryable; + $exception->nonRetryable = $nonRetryable; + + return $exception; + } +} diff --git a/src/Producer.php b/src/Producer.php new file mode 100644 index 0000000..7ed79fa --- /dev/null +++ b/src/Producer.php @@ -0,0 +1,61 @@ +client = $client; + } + + public function produce(string $topic, Message $message) : void + { + $this->produceBatch($topic, [$message]); + } + + /** + * @param Message[] $messages + */ + public function produceBatch(string $topic, iterable $messages) : void + { + $errors = []; + foreach ($this->client->produce($topic, $messages) as $i => $result) { + if (! $result->isError()) { + continue; + } + + $errors[$i] = $result->getError(); + } + + if (count($errors) === 0) { + return; + } + + $retryable = $nonRetryable = []; + foreach ($messages as $i => $message) { + if (! array_key_exists($i, $errors)) { + continue; + } + + if ($errors[$i]->errorCode === 2) { + $retryable[] = ['error' => $errors[$i]->error, 'message' => $message]; + } else { + $nonRetryable[] = ['error' => $errors[$i]->error, 'message' => $message]; + } + } + + throw FailedToProduceMessages::new($retryable, $nonRetryable); + } +} diff --git a/src/Value/MessagesBatch.php b/src/Value/MessagesBatch.php new file mode 100644 index 0000000..c558c88 --- /dev/null +++ b/src/Value/MessagesBatch.php @@ -0,0 +1,37 @@ + */ + private $messages = []; + + public function add(Message $message) : void + { + $this->messages[] = $message; + } + + /** + * @return Message[] + */ + public function getMessages() : array + { + return $this->messages; + } + + public function count() : int + { + return count($this->messages); + } + + public function isEmpty() : bool + { + return count($this->messages) === 0; + } +} diff --git a/tests/Api/HttpRestClientTest.php b/tests/Api/HttpRestClientTest.php new file mode 100644 index 0000000..4ce721f --- /dev/null +++ b/tests/Api/HttpRestClientTest.php @@ -0,0 +1,546 @@ +httpClient = Mockery::mock(ClientInterface::class); + $this->client = new HttpRestClient( + $this->httpClient, + new NullLogger(), + new RequestFactory(), + SerializerBuilder::create()->build(), + new StreamFactory(), + new UriFactory(), + 'http://api' + ); + $this->response = (new ResponseFactory())->createResponse(); + } + + public function testListTopics() : void + { + $this->expectRequest('GET', '/topics', null, $this->responseWithBody('["lorem-ipsum", "dolor-sit-amet"]')); + + $topics = $this->client->listTopics(); + + self::assertSame(['lorem-ipsum', 'dolor-sit-amet'], $topics); + } + + public function testGetTopic() : void + { + $this->expectRequest('GET', '/topics/lorem-ipsum', null, $this->fileResponse('getTopic')); + + $topic = $this->client->getTopic('lorem-ipsum'); + + self::assertSame('lorem-ipsum', $topic->name); + self::assertSame(['retention.bytes' => '-1', 'flush.ms' => '50000'], $topic->configs); + self::assertCount(2, $topic->partitions); + + // partition 0 + self::assertSame(0, $topic->partitions[0]->partition); + self::assertSame(0, $topic->partitions[0]->leader); + self::assertCount(2, $topic->partitions[0]->replicas); + + // partition 0, replica 0 + self::assertSame(0, $topic->partitions[0]->replicas[0]->broker); + self::assertTrue($topic->partitions[0]->replicas[0]->leader); + self::assertTrue($topic->partitions[0]->replicas[0]->inSync); + + // partition 0, replica 1 + self::assertSame(1, $topic->partitions[0]->replicas[1]->broker); + self::assertFalse($topic->partitions[0]->replicas[1]->leader); + self::assertTrue($topic->partitions[0]->replicas[1]->inSync); + + // partition 1 + self::assertSame(1, $topic->partitions[1]->partition); + self::assertSame(1, $topic->partitions[1]->leader); + self::assertCount(2, $topic->partitions[1]->replicas); + + // partition 1, replica 0 + self::assertSame(1, $topic->partitions[1]->replicas[0]->broker); + self::assertTrue($topic->partitions[1]->replicas[0]->leader); + self::assertTrue($topic->partitions[1]->replicas[0]->inSync); + + // partition 1, replica 1 + self::assertSame(0, $topic->partitions[1]->replicas[1]->broker); + self::assertFalse($topic->partitions[1]->replicas[1]->leader); + self::assertTrue($topic->partitions[1]->replicas[1]->inSync); + } + + public function testProduce() : void + { + $this->expectRequest( + 'POST', + '/topics/lorem-ipsum', + '{"records":[{"value":"bXNnMQ=="},{"key":"c29tZSBrZXk=","value":"bXNnMg=="},{"value":"bXNnMw=="}]}', + $this->fileResponse('produce') + ); + + $results = $this->client->produce( + 'lorem-ipsum', + [new Message('msg1'), new Message('msg2', 'some key'), new Message('msg3')] + ); + + self::assertCount(3, $results); + + self::assertFalse($results[0]->isError()); + $offset = $results[0]->getOffset(); + self::assertSame(0, $offset->partition); + self::assertSame(8, $offset->offset); + + self::assertFalse($results[1]->isError()); + $offset = $results[1]->getOffset(); + self::assertSame(1, $offset->partition); + self::assertSame(15, $offset->offset); + + self::assertTrue($results[2]->isError()); + $error = $results[2]->getError(); + self::assertSame(1, $error->errorCode); + self::assertSame('some unexpected error', $error->error); + } + + public function testListPartitions() : void + { + $this->expectRequest('GET', '/topics/lorem-ipsum/partitions', null, $this->fileResponse('listPartitions')); + + $partitions = $this->client->listPartitions('lorem-ipsum'); + + self::assertCount(2, $partitions); + + // partition 0 + self::assertSame(0, $partitions[0]->partition); + self::assertSame(0, $partitions[0]->leader); + self::assertCount(2, $partitions[0]->replicas); + + // partition 0, replica 0 + self::assertSame(0, $partitions[0]->replicas[0]->broker); + self::assertTrue($partitions[0]->replicas[0]->leader); + self::assertTrue($partitions[0]->replicas[0]->inSync); + + // partition 0, replica 1 + self::assertSame(1, $partitions[0]->replicas[1]->broker); + self::assertFalse($partitions[0]->replicas[1]->leader); + self::assertTrue($partitions[0]->replicas[1]->inSync); + + // partition 1 + self::assertSame(1, $partitions[1]->partition); + self::assertSame(1, $partitions[1]->leader); + self::assertCount(2, $partitions[1]->replicas); + + // partition 1, replica 0 + self::assertSame(1, $partitions[1]->replicas[0]->broker); + self::assertTrue($partitions[1]->replicas[0]->leader); + self::assertTrue($partitions[1]->replicas[0]->inSync); + + // partition 1, replica 1 + self::assertSame(0, $partitions[1]->replicas[1]->broker); + self::assertFalse($partitions[1]->replicas[1]->leader); + self::assertTrue($partitions[1]->replicas[1]->inSync); + } + + public function testGetPartition() : void + { + $this->expectRequest('GET', '/topics/lorem-ipsum/partitions/0', null, $this->fileResponse('getPartition')); + + $partition = $this->client->getPartition('lorem-ipsum', 0); + + // partition 0 + self::assertSame(0, $partition->partition); + self::assertSame(0, $partition->leader); + self::assertCount(2, $partition->replicas); + + // partition 0, replica 0 + self::assertSame(0, $partition->replicas[0]->broker); + self::assertTrue($partition->replicas[0]->leader); + self::assertTrue($partition->replicas[0]->inSync); + + // partition 0, replica 1 + self::assertSame(1, $partition->replicas[1]->broker); + self::assertFalse($partition->replicas[1]->leader); + self::assertTrue($partition->replicas[1]->inSync); + } + + public function testProduceToPartition() : void + { + $this->expectRequest( + 'POST', + '/topics/lorem-ipsum/partitions/0', + '{"records":[{"value":"bXNnMQ=="},{"key":"c29tZSBrZXk=","value":"bXNnMg=="}]}', + $this->fileResponse('produceToPartition') + ); + + $offsets = $this->client->produceToPartition( + 'lorem-ipsum', + 0, + [new Message('msg1'), new Message('msg2', 'some key')] + ); + + self::assertCount(2, $offsets); + + self::assertFalse($offsets[0]->isError()); + $offset = $offsets[0]->getOffset(); + self::assertSame(0, $offset->partition); + self::assertSame(8, $offset->offset); + + self::assertTrue($offsets[1]->isError()); + $error = $offsets[1]->getError(); + self::assertSame(1, $error->errorCode); + self::assertSame('some unexpected error', $error->error); + } + + public function testCreateConsumer() : void + { + $consumerOptions = new ConsumerOptions(); + $consumerOptions->name = 'custom-consumer-name'; + + $this->expectRequest('POST', '/consumers/dolor-sit-amet', '{}', $this->fileResponse('createConsumer')); + + $consumer = $this->client->createConsumer('dolor-sit-amet', new ConsumerOptions()); + + self::assertSame('custom-consumer-name', $consumer->instanceId); + self::assertSame('http://api/consumers/dolor-sit-amet/instances/custom-consumer-name', $consumer->baseUri); + } + + public function testDeleteConsumer() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'DELETE', + '/consumers/dolor-sit-amet/instances/custom-consumer-name', + null, + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->deleteConsumer($consumer); + self::assertThat(true, new IsTrue()); + } + + public function testConsumerCommitOffsets() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/offsets', + null, + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerCommitOffsets($consumer); + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/offsets', + '{"offsets":[{"topic":"lorem-ipsum","partition":0,"offset":25}]}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerCommitOffsets($consumer, [new Offset('lorem-ipsum', 0, 25)]); + self::assertThat(true, new IsTrue()); + } + + public function testGetConsumerCommittedOffsets() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'GET', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/offsets', + '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}', + $this->responseWithBody('{"offsets":[{"topic":"lorem-ipsum","partition":0,"offset":25,"metadata":""}]}') + ); + + $offsets = $this->client->getConsumerCommittedOffsets($consumer, [new Partition('lorem-ipsum', 0)]); + + self::assertCount(1, $offsets); + self::assertSame(0, $offsets[0]->partition); + self::assertSame(25, $offsets[0]->offset); + self::assertSame('', $offsets[0]->metadata); + } + + public function testConsumerSubscribe() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription', + '{"topics":["lorem-ipsum"]}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerSubscribe($consumer, Subscription::topic('lorem-ipsum')); + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription', + '{"topic_pattern":"lorem-*"}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerSubscribe($consumer, Subscription::pattern('lorem-*')); + self::assertThat(true, new IsTrue()); + } + + public function testGetConsumerSubscribedTopics() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'GET', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription', + null, + $this->responseWithBody('{"topics":["lorem-ipsum"]}') + ); + + $topics = $this->client->getConsumerSubscribedTopics($consumer); + + self::assertCount(1, $topics); + self::assertSame('lorem-ipsum', $topics[0]); + } + + public function testConsumerUnsubscribe() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'DELETE', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/subscription', + null, + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerUnsubscribe($consumer); + self::assertThat(true, new IsTrue()); + } + + public function testConsumerAssignPartitions() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/assignments', + '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerAssignPartitions($consumer, [new Partition('lorem-ipsum', 0)]); + self::assertThat(true, new IsTrue()); + } + + public function testGetConsumerAssignedPartitions() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'GET', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/assignments', + null, + $this->responseWithBody('{"partitions":[{"topic":"lorem-ipsum","partition":0}]}') + ); + + $partitions = $this->client->getConsumerAssignedPartitions($consumer); + + self::assertCount(1, $partitions); + self::assertSame('lorem-ipsum', $partitions[0]->topic); + self::assertSame(0, $partitions[0]->partition); + } + + public function testConsumerSeek() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/positions', + '{"offsets":[{"topic":"lorem-ipsum","partition":0,"offset":12}]}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerSeek($consumer, [new Offset('lorem-ipsum', 0, 12)]); + self::assertThat(true, new IsTrue()); + } + + public function testConsumerSeekStart() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/positions/beginning', + '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerSeekStart($consumer, [new Partition('lorem-ipsum', 0)]); + self::assertThat(true, new IsTrue()); + } + + public function testConsumerSeekEnd() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'POST', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/positions/end', + '{"partitions":[{"topic":"lorem-ipsum","partition":0}]}', + $this->response->withStatus(Http::NO_CONTENT) + ); + + $this->client->consumerSeekEnd($consumer, [new Partition('lorem-ipsum', 0)]); + self::assertThat(true, new IsTrue()); + } + + public function testGetConsumerMessages() : void + { + $consumer = new Consumer(); + $consumer->baseUri = 'http://api/consumers/dolor-sit-amet/instances/custom-consumer-name'; + + $this->expectRequest( + 'GET', + '/consumers/dolor-sit-amet/instances/custom-consumer-name/records', + null, + $this->fileResponse('getConsumerMessages') + ); + + $messages = $this->client->getConsumerMessages($consumer); + + self::assertCount(2, $messages); + + self::assertSame('lorem-ipsum', $messages[0]->topic); + self::assertNull($messages[0]->key); + self::assertSame('msg1', $messages[0]->content); + self::assertSame(0, $messages[0]->partition); + self::assertSame(52, $messages[0]->offset); + + self::assertSame('lorem-ipsum', $messages[1]->topic); + self::assertSame('some key', $messages[1]->key); + self::assertSame('msg2', $messages[1]->content); + self::assertSame(1, $messages[1]->partition); + self::assertSame(53, $messages[1]->offset); + } + + public function testGetBrokers() : void + { + $this->expectRequest('GET', '/brokers', null, $this->responseWithBody('{"brokers":[1, 0]}')); + + $brokers = $this->client->getBrokers(); + + self::assertCount(2, $brokers); + self::assertSame(1, $brokers[0]); + self::assertSame(0, $brokers[1]); + } + + /** + * @dataProvider providerErrors + */ + public function testErrors(?string $body, int $status, string $message) : void + { + if ($body === null) { + $response = $this->response->withStatus($status); + } else { + $response = $this->responseWithBody($body)->withStatus($status); + } + + $this->expectRequest('GET', '/topics', null, $response); + + $this->expectException(UnexpectedApiResponse::class); + $this->expectExceptionMessage($message); + + $this->client->listTopics(); + } + + /** + * @return iterable> + */ + public function providerErrors() : iterable + { + yield [ + 'body' => '{"error_code":40401,"message":"Topic not found"}', + 'status' => Http::NOT_FOUND, + 'message' => 'Unexpected response HTTP 404: [40401] Topic not found', + ]; + + yield [ + 'body' => null, + 'status' => Http::INTERNAL_SERVER_ERROR, + 'message' => 'Unexpected HTTP status 500: Internal Server Error', + ]; + } + + private function expectRequest(string $method, string $path, ?string $body, ResponseInterface $response) : void + { + $this->httpClient->shouldReceive('sendRequest') + ->once() + ->with(Mockery::on(static function (RequestInterface $request) use ($method, $path, $body) { + return $request->getMethod() === $method && + (string) $request->getUri() === 'http://api' . $path && + $request->getHeader('Accept') === ['application/vnd.kafka.v2+json'] && + (string) $request->getBody() === ($body ?? ''); + })) + ->andReturn($response); + } + + private function fileResponse(string $name) : ResponseInterface + { + $path = sprintf('%s/data/%s.json', __DIR__, $name); + + return $this->responseWithBody(file_get_contents($path)); + } + + private function responseWithBody(string $body) : ResponseInterface + { + return $this->response->withBody((new StreamFactory())->createStream($body)); + } +} diff --git a/tests/Api/data/createConsumer.json b/tests/Api/data/createConsumer.json new file mode 100644 index 0000000..daa472c --- /dev/null +++ b/tests/Api/data/createConsumer.json @@ -0,0 +1,4 @@ +{ + "instance_id": "custom-consumer-name", + "base_uri": "http://api/consumers/dolor-sit-amet/instances/custom-consumer-name" +} diff --git a/tests/Api/data/getConsumerMessages.json b/tests/Api/data/getConsumerMessages.json new file mode 100644 index 0000000..9090319 --- /dev/null +++ b/tests/Api/data/getConsumerMessages.json @@ -0,0 +1,16 @@ +[ + { + "topic": "lorem-ipsum", + "key": null, + "value": "bXNnMQ==", + "partition": 0, + "offset": 52 + }, + { + "topic": "lorem-ipsum", + "key": "c29tZSBrZXk=", + "value": "bXNnMg==", + "partition": 1, + "offset": 53 + } +] diff --git a/tests/Api/data/getPartition.json b/tests/Api/data/getPartition.json new file mode 100644 index 0000000..6b12c72 --- /dev/null +++ b/tests/Api/data/getPartition.json @@ -0,0 +1,16 @@ +{ + "partition": 0, + "leader": 0, + "replicas": [ + { + "broker": 0, + "leader": true, + "in_sync": true + }, + { + "broker": 1, + "leader": false, + "in_sync": true + } + ] +} diff --git a/tests/Api/data/getTopic.json b/tests/Api/data/getTopic.json new file mode 100644 index 0000000..50f4c05 --- /dev/null +++ b/tests/Api/data/getTopic.json @@ -0,0 +1,41 @@ +{ + "name": "lorem-ipsum", + "configs": { + "retention.bytes": "-1", + "flush.ms": "50000" + }, + "partitions": [ + { + "partition": 0, + "leader": 0, + "replicas": [ + { + "broker": 0, + "leader": true, + "in_sync": true + }, + { + "broker": 1, + "leader": false, + "in_sync": true + } + ] + }, + { + "partition": 1, + "leader": 1, + "replicas": [ + { + "broker": 1, + "leader": true, + "in_sync": true + }, + { + "broker": 0, + "leader": false, + "in_sync": true + } + ] + } + ] +} diff --git a/tests/Api/data/listPartitions.json b/tests/Api/data/listPartitions.json new file mode 100644 index 0000000..9878e1c --- /dev/null +++ b/tests/Api/data/listPartitions.json @@ -0,0 +1,34 @@ +[ + { + "partition": 0, + "leader": 0, + "replicas": [ + { + "broker": 0, + "leader": true, + "in_sync": true + }, + { + "broker": 1, + "leader": false, + "in_sync": true + } + ] + }, + { + "partition": 1, + "leader": 1, + "replicas": [ + { + "broker": 1, + "leader": true, + "in_sync": true + }, + { + "broker": 0, + "leader": false, + "in_sync": true + } + ] + } +] diff --git a/tests/Api/data/produce.json b/tests/Api/data/produce.json new file mode 100644 index 0000000..3388bca --- /dev/null +++ b/tests/Api/data/produce.json @@ -0,0 +1,22 @@ +{ + "offsets": [ + { + "partition": 0, + "offset": 8, + "error_code": null, + "error": null + }, + { + "partition": 1, + "offset": 15, + "error_code": null, + "error": null + }, + { + "error_code": 1, + "error": "some unexpected error" + } + ], + "key_schema_id": null, + "value_schema_id": null +} diff --git a/tests/Api/data/produceToPartition.json b/tests/Api/data/produceToPartition.json new file mode 100644 index 0000000..c9f83f0 --- /dev/null +++ b/tests/Api/data/produceToPartition.json @@ -0,0 +1,16 @@ +{ + "offsets": [ + { + "partition": 0, + "offset": 8, + "error_code": null, + "error": null + }, + { + "error_code": 1, + "error": "some unexpected error" + } + ], + "key_schema_id": null, + "value_schema_id": null +} diff --git a/tests/BatchConsumerFactoryTest.php b/tests/BatchConsumerFactoryTest.php new file mode 100644 index 0000000..8e6588f --- /dev/null +++ b/tests/BatchConsumerFactoryTest.php @@ -0,0 +1,56 @@ +shouldReceive('createConsumer') + ->once() + ->with($group, $consumerOptions) + ->andReturn($consumer); + + $client->shouldReceive('consumerSubscribe') + ->once() + ->with($consumer, $subscription); + + $client->shouldReceive('deleteConsumer') + ->with($consumer); + + $factory = new BatchConsumerFactory($client); + $factory->create($group, $subscription, $maxCount, $maxDuration, $consumerOptions); + self::assertThat(true, new IsTrue()); + } + + /** + * @return iterable> + */ + public function providerCreate() : iterable + { + yield [null, 10, null]; + yield [1000, null, null]; + yield [null, 10, new ConsumerOptions()]; + yield [1000, null, new ConsumerOptions()]; + } +} diff --git a/tests/BatchConsumerTest.php b/tests/BatchConsumerTest.php new file mode 100644 index 0000000..ff7f7fa --- /dev/null +++ b/tests/BatchConsumerTest.php @@ -0,0 +1,375 @@ +shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $firstMessages) { + $now = $now->add(new DateInterval('PT20S')); + $clock->setTo($now); + + return $firstMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration - 20, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $secondMessages) { + $now = $now->add(new DateInterval('PT10S')); + $clock->setTo($now); + + return $secondMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $thirdMessages) { + $now = $now->add(new DateInterval('PT15S')); + $clock->setTo($now); + + return $thirdMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration - 15, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $forthMessages) { + $now = $now->add(new DateInterval('PT10S')); + $clock->setTo($now); + + return $forthMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration - 25, $maxBytes) + ->andThrow(new Exception('some exception')); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new BatchConsumer($client, $consumerVO, null, $maxDuration, $clock); + $messages = $consumer->consume($maxBytes); + + self::assertInstanceOf(Generator::class, $messages); + self::assertTrue($messages->valid()); + + $batch = $messages->current(); + self::assertInstanceOf(MessagesBatch::class, $batch); + + self::assertSame(3, $batch->count()); + self::assertCount(3, $batch->getMessages()); + self::assertSame(array_merge($firstMessages, $secondMessages), $batch->getMessages()); + + try { + $messages->next(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertFalse($messages->valid()); + } + } + + /** + * @dataProvider providerMaxBytes + */ + public function testConsumeWithMaxCount(?int $maxBytes) : void + { + $clock = new FrozenClock(new DateTimeImmutable()); + + $consumerVO = new Consumer(); + + $firstMessages = [new Message(), new Message()]; + $secondMessages = [new Message()]; + $thirdMessages = []; + $forthMessages = [new Message()]; + + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, 10, $maxBytes) + ->andReturn($firstMessages); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, 10, $maxBytes) + ->andReturn($secondMessages); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, 10, $maxBytes) + ->andReturn($thirdMessages); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, 10, $maxBytes) + ->andReturn($forthMessages); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, 10, $maxBytes) + ->andThrow(new Exception('some exception')); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new BatchConsumer($client, $consumerVO, 3, null, $clock); + $messages = $consumer->consume($maxBytes); + + self::assertInstanceOf(Generator::class, $messages); + self::assertTrue($messages->valid()); + + $batch = $messages->current(); + self::assertInstanceOf(MessagesBatch::class, $batch); + + self::assertSame(3, $batch->count()); + self::assertCount(3, $batch->getMessages()); + self::assertSame(array_merge($firstMessages, $secondMessages), $batch->getMessages()); + + try { + $messages->next(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertFalse($messages->valid()); + } + } + + /** + * @dataProvider providerMaxBytes + */ + public function testConsumeWithMaxCountAndMaxDuration(?int $maxBytes) : void + { + $maxDuration = 30; + $now = new DateTimeImmutable(); + $clock = new FrozenClock($now); + + $consumerVO = new Consumer(); + + $firstMessages = [new Message(), new Message()]; + $secondMessages = [new Message()]; + $thirdMessages = [new Message()]; + $forthMessages = [new Message()]; + + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $firstMessages) { + $now = $now->add(new DateInterval('PT20S')); + $clock->setTo($now); + + return $firstMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration - 20, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $secondMessages) { + $now = $now->add(new DateInterval('PT5S')); + $clock->setTo($now); + + return $secondMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $thirdMessages) { + $now = $now->add(new DateInterval('PT40S')); + $clock->setTo($now); + + return $thirdMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration, $maxBytes) + ->andReturnUsing(static function () use (&$now, $clock, $forthMessages) { + $now = $now->add(new DateInterval('PT10S')); + $clock->setTo($now); + + return $forthMessages; + }); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $maxDuration - 10, $maxBytes) + ->andThrow(new Exception('some exception')); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new BatchConsumer($client, $consumerVO, 3, $maxDuration, $clock); + $messages = $consumer->consume($maxBytes); + + self::assertInstanceOf(Generator::class, $messages); + self::assertTrue($messages->valid()); + + $batch = $messages->current(); + self::assertInstanceOf(MessagesBatch::class, $batch); + + self::assertSame(3, $batch->count()); + self::assertCount(3, $batch->getMessages()); + self::assertSame(array_merge($firstMessages, $secondMessages), $batch->getMessages()); + + $messages->next(); + self::assertTrue($messages->valid()); + $batch = $messages->current(); + self::assertInstanceOf(MessagesBatch::class, $batch); + + self::assertSame(1, $batch->count()); + self::assertCount(1, $batch->getMessages()); + self::assertSame($thirdMessages, $batch->getMessages()); + + try { + $messages->next(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertFalse($messages->valid()); + } + } + + /** + * @return iterable> + */ + public function providerMaxBytes() : iterable + { + yield [null]; + yield [10000]; + } + + public function testCantCreateBatchConsumerWithoutBatchLimitation() : void + { + $client = Mockery::mock(RestClient::class); + $consumerVO = new Consumer(); + + $this->expectException(AssertionFailedException::class); + $this->expectExceptionMessage('You must specify at least one of maxCount/maxDuration'); + + new BatchConsumer($client, $consumerVO, null, null); + } + + public function testCommit() : void + { + $message1 = new Message(); + $message1->topic = 'some-topic'; + $message1->partition = 0; + $message1->offset = 12; + + $message2 = new Message(); + $message2->topic = 'some-topic'; + $message2->partition = 1; + $message2->offset = 5; + + $message3 = new Message(); + $message3->topic = 'some-other-topic'; + $message3->partition = 0; + $message3->offset = 31; + + $message4 = new Message(); + $message4->topic = 'some-topic'; + $message4->partition = 0; + $message4->offset = 13; + + $message5 = new Message(); + $message5->topic = 'some-other-topic'; + $message5->partition = 0; + $message5->offset = 32; + + $consumerVO = new Consumer(); + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('consumerCommitOffsets') + ->with($consumerVO, IsEqual::equalTo([ + new Offset('some-other-topic', 0, 32), + new Offset('some-topic', 0, 13), + new Offset('some-topic', 1, 5), + ])); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new BatchConsumer($client, $consumerVO, 100); + + $batch = new MessagesBatch(); + $batch->add($message1); + $batch->add($message2); + $batch->add($message3); + $batch->add($message4); + $batch->add($message5); + + $consumer->commit($batch); + self::assertThat(true, new IsTrue()); + } + + public function testCloseAfterError() : void + { + $consumerVO = new Consumer(); + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, 10, null) + ->andThrow($expectedException = new RuntimeException('some error')); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new BatchConsumer($client, $consumerVO, 100); + + $messages = $consumer->consume(); + self::assertInstanceOf(Generator::class, $messages); + + try { + $messages->current(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertSame('Failed to consume messages; consumer is now closed', $exception->getMessage()); + self::assertSame($expectedException, $exception->getPrevious()); + } + + $messages = $consumer->consume(); + self::assertInstanceOf(Generator::class, $messages); + + try { + $messages->current(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertSame('Consumer is closed', $exception->getMessage()); + self::assertNull($exception->getPrevious()); + } + } +} diff --git a/tests/ConsumerFactoryTest.php b/tests/ConsumerFactoryTest.php new file mode 100644 index 0000000..d790480 --- /dev/null +++ b/tests/ConsumerFactoryTest.php @@ -0,0 +1,54 @@ +shouldReceive('createConsumer') + ->once() + ->with($group, $consumerOptions) + ->andReturn($consumer); + + $client->shouldReceive('consumerSubscribe') + ->once() + ->with($consumer, $subscription); + + $client->shouldReceive('deleteConsumer') + ->with($consumer); + + $factory = new ConsumerFactory($client); + $factory->create($group, $subscription, $consumerOptions); + self::assertThat(true, new IsTrue()); + } + + /** + * @return iterable> + */ + public function providerCreate() : iterable + { + yield [null]; + yield [new ConsumerOptions()]; + } +} diff --git a/tests/ConsumerTest.php b/tests/ConsumerTest.php new file mode 100644 index 0000000..4db56bf --- /dev/null +++ b/tests/ConsumerTest.php @@ -0,0 +1,143 @@ +shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $timeout, $maxBytes) + ->andReturns($firstMessages); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, $timeout, $maxBytes) + ->andReturns($secondMessages); + $client->shouldReceive('getConsumerMessages') + ->once() + ->andThrow(new Exception('some exception')); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new Consumer($client, $consumerVO); + $messages = $consumer->consume($timeout, $maxBytes); + + self::assertInstanceOf(Generator::class, $messages); + + self::assertTrue($messages->valid()); + self::assertSame($firstMessages[0], $messages->current()); + + $messages->next(); + self::assertTrue($messages->valid()); + self::assertSame($firstMessages[1], $messages->current()); + + $messages->next(); + self::assertTrue($messages->valid()); + self::assertSame($secondMessages[0], $messages->current()); + self::assertTrue($messages->valid()); + + try { + $messages->next(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertFalse($messages->valid()); + } + } + + /** + * @return iterable> + */ + public function providerConsume() : iterable + { + yield [null, null]; + yield [null, 10000]; + yield [10, null]; + } + + public function testCommit() : void + { + $message = new Message(); + $message->topic = 'some-topic'; + $message->partition = 0; + $message->offset = 12; + + $consumerVO = new \Grongor\KafkaRest\Api\Value\Response\Consumer(); + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('consumerCommitOffsets') + ->once() + ->with($consumerVO, IsEqual::equalTo([new Offset('some-topic', 0, 12)])); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new Consumer($client, $consumerVO); + $consumer->commit($message); + self::assertThat(true, new IsTrue()); + } + + public function testCloseAfterError() : void + { + $consumerVO = new \Grongor\KafkaRest\Api\Value\Response\Consumer(); + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('getConsumerMessages') + ->once() + ->with($consumerVO, null, null) + ->andThrow($expectedException = new RuntimeException('some error')); + + $client->shouldReceive('deleteConsumer') + ->once() + ->with($consumerVO); + + $consumer = new Consumer($client, $consumerVO); + + $messages = $consumer->consume(); + self::assertInstanceOf(Generator::class, $messages); + + try { + $messages->current(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertSame('Failed to consume messages; consumer is now closed', $exception->getMessage()); + self::assertSame($expectedException, $exception->getPrevious()); + } + + $messages = $consumer->consume(); + self::assertInstanceOf(Generator::class, $messages); + + try { + $messages->current(); + self::fail('Expected an exception'); + } catch (ConsumerClosed $exception) { + self::assertSame('Consumer is closed', $exception->getMessage()); + self::assertNull($exception->getPrevious()); + } + } +} diff --git a/tests/ProducerTest.php b/tests/ProducerTest.php new file mode 100644 index 0000000..493db23 --- /dev/null +++ b/tests/ProducerTest.php @@ -0,0 +1,150 @@ +shouldReceive('produce') + ->once() + ->with('some-topic', [$message]) + ->andReturn([$result]); + + $producer = new Producer($client); + $producer->produce('some-topic', $message); + self::assertThat(true, new IsTrue()); + } + + public function testProduceWithNonRetryableError() : void + { + $message = new Message('msg1'); + + $result = ProduceResult::error(1, 'some non-retryable error'); + + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('produce') + ->once() + ->with('some-topic', [$message]) + ->andReturn([$result]); + + $producer = new Producer($client); + + try { + $producer->produce('some-topic', $message); + self::fail('Expectec an FailedToProduceMessages exception'); + } catch (FailedToProduceMessages $exception) { + self::assertSame( + [ + [ + 'error' => $result->error, + 'message' => $message, + ], + ], + $exception->nonRetryable + ); + self::assertCount(0, $exception->retryable); + } + } + + public function testProduceWithRetryableError() : void + { + $message = new Message('msg1'); + + $result = ProduceResult::error(2, 'some retryable error'); + + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('produce') + ->once() + ->with('some-topic', [$message]) + ->andReturn([$result]); + + $producer = new Producer($client); + + try { + $producer->produce('some-topic', $message); + self::fail('Expectec an FailedToProduceMessages exception'); + } catch (FailedToProduceMessages $exception) { + self::assertCount(0, $exception->nonRetryable); + self::assertSame( + [ + [ + 'error' => $result->error, + 'message' => $message, + ], + ], + $exception->retryable + ); + } + } + + public function testProduceBatch() : void + { + $message1 = new Message('msg1'); + $message2 = new Message('msg2'); + $message3 = new Message('msg3'); + $message4 = new Message('msg3'); + $message5 = new Message('msg3'); + $messages = [$message1, $message2, $message3, $message4, $message5]; + + $result1 = ProduceResult::offset(1, 25); + $result2 = ProduceResult::error(1, 'some non-retryable error'); + $result3 = ProduceResult::error(2, 'some retryable error'); + $result4 = ProduceResult::error(1, 'some non-retryable error'); + $result5 = ProduceResult::offset(0, 26); + + $results = [$result1, $result2, $result3, $result4, $result5]; + + $client = Mockery::mock(RestClient::class); + $client->shouldReceive('produce') + ->once() + ->with('some-topic', $messages) + ->andReturn($results); + + $producer = new Producer($client); + + try { + $producer->produceBatch('some-topic', $messages); + self::fail('Expectec an FailedToProduceMessages exception'); + } catch (FailedToProduceMessages $exception) { + self::assertSame( + [ + [ + 'error' => $result2->error, + 'message' => $message2, + ], + [ + 'error' => $result4->error, + 'message' => $message4, + ], + ], + $exception->nonRetryable + ); + self::assertSame( + [ + [ + 'error' => $result3->error, + 'message' => $message3, + ], + ], + $exception->retryable + ); + } + } +} diff --git a/tests/bootstrap.php b/tests/bootstrap.php new file mode 100644 index 0000000..3e7c0a7 --- /dev/null +++ b/tests/bootstrap.php @@ -0,0 +1,16 @@ +