-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
First version #1
Merged
+3,387
−1
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
/vendor | ||
/.phpcs-cache | ||
/.phpunit.result.cache | ||
/composer.lock | ||
/phpcs.xml | ||
/phpstan.neon | ||
/phpunit.xml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
language: php | ||
php: | ||
- 7.2 | ||
- 7.3 | ||
- 7.4snapshot | ||
- nightly | ||
|
||
env: | ||
- DEPENDENCIES= | ||
- DEPENDENCIES=--prefer-lowest | ||
|
||
install: | ||
composer update --no-interaction --prefer-stable --prefer-dist $DEPENDENCIES | ||
|
||
script: make test | ||
|
||
stages: | ||
- Validate composer.json | ||
- Code Quality | ||
- Test | ||
|
||
jobs: | ||
allow_failures: | ||
- php: 7.4snapshot | ||
- php: nightly | ||
|
||
include: | ||
- stage: Validate composer.json | ||
name: Validate composer.json | ||
php: 7.2 | ||
script: | ||
- make validate-composer | ||
- curl -O https://github.com/maglnet/ComposerRequireChecker/releases/latest/download/composer-require-checker.phar | ||
- php composer-require-checker.phar check composer.json | ||
- stage: Code Quality | ||
name: Lint | ||
php: 7.2 | ||
script: make lint | ||
- name: Coding Standard | ||
php: 7.2 | ||
script: make cs | ||
- name: PHPStan | ||
php: 7.2 | ||
script: make phpstan |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,72 @@ | ||
### BEGIN main targets | ||
|
||
.PHONY: build | ||
build: vendor | ||
|
||
.PHONY: list | ||
list: | ||
@$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$' | ||
|
||
### END | ||
|
||
### BEGIN secondary targets | ||
|
||
.PHONY: vendor | ||
vendor: vendor/lock | ||
|
||
vendor/lock: composer.json | ||
$(MAKE) validate-composer | ||
$(MAKE) update | ||
touch vendor/lock | ||
|
||
.PHONY: update | ||
update: | ||
composer update | ||
|
||
### END | ||
|
||
### BEGIN tests | ||
|
||
.PHONY: test | ||
test: | ||
vendor/bin/phpunit $(PHPUNIT_ARGS) | ||
|
||
.PHONY: lint | ||
lint: | ||
vendor/bin/parallel-lint src/ tests/ | ||
|
||
.PHONY: cs | ||
cs: vendor | ||
vendor/bin/phpcs $(PHPCS_ARGS) | ||
|
||
.PHONY: cs-fix | ||
cs-fix: vendor | ||
vendor/bin/phpcbf | ||
|
||
.PHONY: phpstan | ||
phpstan: vendor | ||
vendor/bin/phpstan analyse | ||
|
||
.PHONY: validate-composer | ||
validate-composer: | ||
composer validate --strict | ||
|
||
.PHONY: check | ||
check: build validate-composer lint cs phpstan test | ||
|
||
### END | ||
|
||
### BEGIN cleaning | ||
|
||
.PHONY: clean | ||
clean: clean-cache clean-vendor | ||
|
||
.PHONY: clean-cache | ||
clean-cache: | ||
rm -rf var/cache/* | ||
|
||
.PHONY: clean-vendor | ||
clean-vendor: | ||
rm -rf vendor | ||
|
||
### END |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,116 @@ | ||
# php-kafka-rest-client | ||
kafka-rest-client [![Build Status](https://travis-ci.com/grongor/php-kafka-rest-client.svg?branch=master)](https://travis-ci.com/grongor/php-kafka-rest-client) | ||
================= | ||
|
||
This is a modern PHP client for [Confluent REST Proxy](https://docs.confluent.io/current/kafka-rest/) (version 2). | ||
|
||
HTTP client used for communication with API adheres to the modern [HTTP standards](http://php-http.org) like | ||
[PSR-7](https://www.php-fig.org/psr/psr-7/), [PSR-17](https://www.php-fig.org/psr/psr-17/) | ||
and [PSR-18](https://www.php-fig.org/psr/psr-18/). | ||
|
||
Aside from implementing the REST Proxy API this library adds some convenient classes useful for the most interactions | ||
with Apache Kafka - see [examples](#Examples) for the whole list and a simple tutorial on how to use them. | ||
|
||
Missing features | ||
---------------- | ||
|
||
Some features were deliberately skipped to simplify the implementation/shorten the development time. | ||
If you need anything and you're willing create a pull request, I'd be happy to check it out | ||
and (if it makes sense) merge it. You can start the discussion by opening an issue. | ||
|
||
- version 1 of the REST Proxy API | ||
- It's easy to upgrade the REST Proxy, so I don't see any point in implementing the first version. | ||
- AVRO embedded format | ||
- JSON embedded format | ||
- I think that the binary format is sufficient. You can always serialize/deserialize your objects before/after | ||
they interact with this library, therefore direct integration here is just a complication. | ||
- async operations | ||
- To simplify the library all the methods are synchronous. That might definitely change if the need arises. | ||
- all not-yet-implemented features mentioned [here](https://docs.confluent.io/current/kafka-rest/#features) | ||
- When these features are implemented, they will be added here ASAP. | ||
|
||
Examples | ||
-------- | ||
|
||
### Producer | ||
|
||
Producer allows you to produce messages, either one by one or in batches. Both use the same underlying API method | ||
so it's more efficient to use the batch one. | ||
|
||
Both `produce` and `produceBatch` return nothing on success and throw an exception on failure. | ||
There might be partial success/failure so the thrown exception `FailedToProduceMessages` contains two public properties, | ||
`$retryable` and `$nonRetryable`, where each contains an array of | ||
`['error' => 'error provided by Kafka', message => (Message object given to produce())]`. | ||
Whether the error is [non-]retryable is based on the `error_code` as | ||
[documented](https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name). | ||
It's up to you what you do with those. | ||
|
||
```php | ||
$producer = new Producer($restClient); | ||
$producer->produce('your-topic', new Message('some-message')); | ||
|
||
$messages = [new Message('some-message'), new Message('and-some-other-message')]; | ||
$producer->produceBatch('your-topic', $messages); | ||
``` | ||
|
||
### Consumer | ||
|
||
Consumer allows you to consume messages one by one until you return from the loop, application throws exception | ||
or is otherwise forced to exit. | ||
The `consume` method returns a [Generator](https://www.php.net/manual/en/class.generator.php) and loops indefinitely, | ||
yielding messages as they are available. `consume` method accepts two parameters: `timeout` and `maxBytes`. | ||
`timeout` is the maximum time the consumer will wait for the messages. `maxBytes` is the maximum size of the messages | ||
to fetch in a single request. Both of these settings are complementary to the settings in `ConsumerOptions` and to the | ||
server settings (check the Kafka documentation for more information). | ||
|
||
If you set the consumer option `autoCommitEnable` to `false` then you may use consumer's `commit` method to commit | ||
messages. Simply pass it a message you wish to commit. For most cases it's recommended to turn off the auto-commit | ||
and manually commit each message, so that you don't ever "lose" a message if your application dies | ||
in the middle of the processing. | ||
|
||
```php | ||
$consumerFactory = new ConsumerFactory($restClient); | ||
$consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic')); | ||
foreach ($consumer->consume() as $message) { | ||
// Do your magic | ||
$logger->info('Got new message', $message->content); | ||
|
||
// ... and when you are done, commit the message (if you turned off auto-committing). | ||
$consumer->commit($message); | ||
} | ||
``` | ||
|
||
### BatchConsumer | ||
|
||
BatchConsumer works the same way as Consumer does; the difference is that BatchConsumer doesn't yield each message | ||
separately but first puts them in batches (`MessagesBatch`). These batches can be configured to be "limited" by either | ||
count of messages in the batch (`maxCount`), by time (`maxDuration`) or by both (setting both `maxCount` and | ||
`maxDuration`). If you set `maxDuration` then the batch won't ever take longer than that (+ few ms for processing) as | ||
it changes the `timeout` parameter of the consumer (consumer won't get stuck on network waiting for more messages). | ||
|
||
BatchConsumer can be quite useful for processing of "large" data sets, where you would have to otherwise batch | ||
the messages yourself (eg. for inserting into database, where batch operations are always better). | ||
|
||
As mentioned in the Consumer example, you may need to commit the messages. For that there is a `commit` method, | ||
which accepts the yielded `MessagesBatch` and commits all the messages inside it in one request. | ||
simPod marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
```php | ||
$batchConsumerFactory = new BatchConsumerFactory($restClient); | ||
$batchConsumer = $batchConsumerFactory->create( | ||
'your-consumer-group', | ||
Subscription::topic('your-topic'), | ||
$maxCount = 10000, // Yield the batch when there is 10 000 messages in it | ||
$maxDuration = 60 // or when 60 seconds passes, whichever comes first. | ||
); | ||
foreach ($batchConsumer->consume() as $messagesBatch) { | ||
// The batch might be empty if you specified the maxDuration. | ||
if ($messagesBatch->isEmpty()) { | ||
continue; | ||
} | ||
|
||
// Do your magic | ||
$database->insert($messagesBatch->getMessages()); | ||
|
||
// ... and when you are done, commit the batch. | ||
$batchConsumer->commit($messagesBatch); | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
{ | ||
"name": "grongor/kafka-rest-client", | ||
"description": "Client for Confluent REST Proxy of Apache Kafka", | ||
"type": "library", | ||
"license": "MIT", | ||
"keywords": ["kafka", "confluent", "proxy", "rest", "client", "librdkafka", "rdkafka"], | ||
"config": { | ||
"sort-packages": true | ||
}, | ||
"require": { | ||
"php": "^7.2|^8.0", | ||
"beberlei/assert": "^3.2", | ||
"jms/serializer": "^3.2", | ||
"lcobucci/clock": "^1.2", | ||
"psr/http-client": "^1.0", | ||
"psr/http-factory": "^1.0", | ||
"psr/http-message": "^1.0.1", | ||
"psr/log": "^1.1", | ||
"teapot/status-code": "^1.1", | ||
"thecodingmachine/safe": "^0.1.16" | ||
}, | ||
"require-dev": { | ||
"cdn77/coding-standard": "^2.0", | ||
"guzzlehttp/psr7": "^1.6", | ||
"http-interop/http-factory-guzzle": "^1.0", | ||
"jakub-onderka/php-parallel-lint": "^1.0.0", | ||
"mockery/mockery": "^1.2.3", | ||
"php-http/message": "^1.0", | ||
"phpstan/extension-installer": "^1.0", | ||
"phpstan/phpstan": "^0.11.19", | ||
"phpstan/phpstan-beberlei-assert": "^0.11.2", | ||
"phpstan/phpstan-mockery": "^0.11.3", | ||
"phpstan/phpstan-phpunit": "^0.11.2", | ||
"phpstan/phpstan-strict-rules": "^0.11.1", | ||
"phpunit/phpunit": "^8.0", | ||
"roave/security-advisories": "dev-master", | ||
"slevomat/coding-standard": "dev-master as 5.0.4", | ||
"thecodingmachine/phpstan-safe-rule": "^0.1.4" | ||
}, | ||
"conflict": { | ||
"doctrine/annotations": "<1.7.0" | ||
}, | ||
"autoload": { | ||
"psr-4": { | ||
"Grongor\\KafkaRest\\": "src/" | ||
} | ||
}, | ||
"autoload-dev": { | ||
"psr-4": { | ||
"Grongor\\KafkaRest\\Tests\\": "tests/" | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
<?xml version="1.0"?> | ||
<ruleset | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:noNamespaceSchemaLocation="vendor/squizlabs/php_codesniffer/phpcs.xsd" | ||
> | ||
<arg name="cache" value=".phpcs-cache"/> | ||
|
||
<rule ref="Cdn77CodingStandard"/> | ||
|
||
<file>src/</file> | ||
<file>tests/</file> | ||
</ruleset> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
parameters: | ||
memory-limit: -1 | ||
level: max | ||
paths: | ||
- %currentWorkingDirectory%/src | ||
- %currentWorkingDirectory%/tests |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- https://phpunit.de/manual/current/en/appendixes.configuration.html --> | ||
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:noNamespaceSchemaLocation="vendor/phpunit/phpunit/phpunit.xsd" | ||
beStrictAboutChangesToGlobalState="true" | ||
beStrictAboutOutputDuringTests="true" | ||
beStrictAboutTodoAnnotatedTests="true" | ||
colors="true" | ||
bootstrap="tests/bootstrap.php" | ||
executionOrder="random" | ||
> | ||
<testsuites> | ||
<testsuite name="Main Test Suite"> | ||
<directory>tests</directory> | ||
</testsuite> | ||
</testsuites> | ||
<filter> | ||
<whitelist> | ||
<directory suffix=".php">src/</directory> | ||
</whitelist> | ||
</filter> | ||
</phpunit> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
namespace Grongor\KafkaRest\Api\Exception; | ||
|
||
use RuntimeException; | ||
use function Safe\sprintf; | ||
|
||
final class UnexpectedApiResponse extends RuntimeException | ||
{ | ||
public static function fromStatus(int $httpCode, string $message) : self | ||
{ | ||
return new self(sprintf('Unexpected HTTP status %d: %s', $httpCode, $message)); | ||
} | ||
|
||
/** | ||
* @param array<string, int|string> $response | ||
*/ | ||
public static function fromResponse(int $httpCode, array $response) : self | ||
{ | ||
return new self( | ||
sprintf('Unexpected response HTTP %d: [%d] %s', $httpCode, $response['error_code'], $response['message']) | ||
); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is produce batch transactional?