Skip to content

Commit

Permalink
First version
Browse files Browse the repository at this point in the history
  • Loading branch information
grongor committed Nov 11, 2019
1 parent 0268094 commit 3c29c1a
Show file tree
Hide file tree
Showing 48 changed files with 3,340 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/vendor
/.phpcs-cache
/.phpunit.result.cache
/composer.lock
/phpcs.xml
/phpstan.neon
/phpunit.xml
44 changes: 44 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
language: php
php:
- 7.2
- 7.3
- 7.4snapshot
- nightly

env:
- DEPENDENCIES=
- DEPENDENCIES=--prefer-lowest

install:
composer update --no-interaction --prefer-stable --prefer-dist $DEPENDENCIES

script: make test

stages:
- Validate composer.json
- Code Quality
- Test

jobs:
allow_failures:
- php: 7.4snapshot
- php: nightly

include:
- stage: Validate composer.json
name: Validate composer.json
php: 7.2
script:
- make validate-composer
- composer global require maglnet/composer-require-checker
- $(composer config -g home)/vendor/bin/composer-require-checker check composer.json
- stage: Code Quality
name: Lint
php: 7.2
script: make lint
- name: Coding Standard
php: 7.2
script: make cs
- name: Static Analysis
php: 7.2
script: make static-analysis
72 changes: 72 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
### BEGIN main targets

.PHONY: build
build: vendor

.PHONY: list
list:
@$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F: '/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}' | sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$'

### END

### BEGIN secondary targets

.PHONY: vendor
vendor: vendor/lock

vendor/lock: composer.json
$(MAKE) validate-composer
$(MAKE) update
touch vendor/lock

.PHONY: update
update:
composer update

### END

### BEGIN tests

.PHONY: test
test:
vendor/bin/phpunit $(PHPUNIT_ARGS)

.PHONY: lint
lint:
vendor/bin/parallel-lint src/ tests/

.PHONY: cs
cs: vendor
vendor/bin/phpcs $(PHPCS_ARGS)

.PHONY: cs-fix
cs-fix: vendor
vendor/bin/phpcbf

.PHONY: static-analysis
static-analysis: vendor
vendor/bin/phpstan analyse

.PHONY: validate-composer
validate-composer:
composer validate --strict

.PHONY: check
check: build validate-composer lint cs static-analysis test

### END

### BEGIN cleaning

.PHONY: clean
clean: clean-cache clean-vendor

.PHONY: clean-cache
clean-cache:
rm -rf var/cache/*

.PHONY: clean-vendor
clean-vendor:
rm -rf vendor

### END
116 changes: 115 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,115 @@
# php-kafka-rest-client
kafka-rest-client [![Build Status](https://travis-ci.org/grongor/php-kafka-rest-client.svg?branch=master)](https://travis-ci.org/grongor/php-kafka-rest-client)
=================

This is a modern PHP client for [Confluent REST Proxy](https://docs.confluent.io/current/kafka-rest/) (version 2).

HTTP client used for communication with API adheres to the modern [HTTP standards](http://php-http.org) like
[PSR-7](https://www.php-fig.org/psr/psr-7/), [PSR-17](https://www.php-fig.org/psr/psr-17/)
and [PSR-18](https://www.php-fig.org/psr/psr-18/).

Aside from implementing the REST Proxy API this library adds some convenient classes useful for the most interactions
with Apache Kafka - see [examples](#Examples) for the whole list and a simple tutorial on how to use them.

Missing features
----------------

Some features were deliberately skipped to simplify the implementation/shorten the development time.
If you need anything and you're willing create a pull request, I'd be happy to check it out
and (if it makes sense) merge it. You can start the discussion by opening an issue.

- version 1 of the REST Proxy API
- It's easy to upgrade the REST Proxy, so I don't see any point in implementing the first version.
- AVRO embedded format
- JSON embedded format
- I think that the binary format is sufficient. You can always serialize/deserialize your objects before/after
they interact with this library, therefore direct integration here is just a complication.
- async operations
- To simplify the library all the methods are synchronous. That might definitely change if the need arises.
- all not-yet-implemented features mentioned [here](https://docs.confluent.io/current/kafka-rest/#features)
- When these features are implemented, they will be added here ASAP.

Examples
--------

### Producer

Producer allows you to produce messages, either one by one or in batches. Both use the same underlying API method
so it's more efficient to use the batch one.

Both `produce` and `produceBatch` return nothing on success and throw an exception on failure.
There might be partial success/failure so the thrown exception `FailedToProduceMessages` contains two public properties,
`$retryable` and `$nonRetryable`, where each contains an array of
`['error' => 'error provided by Kafka', message => (Message object given to produce)]`.
Whether the error is [non-]retryable is based on the `error_code` as documented
[here](https://docs.confluent.io/current/kafka-rest/api.html#post--topics-(string-topic_name).
It's up to you what you do with those.

```php
$producer = new Producer($restClient);
$producer->produce('your-topic', new Message('some-message'));

$messages = [new Message('some-message'), new Message('and-some-other-message')];
$producer->produceBatch('your-topic', $messages);
```

### Consumer

Consumer allows you to consume messages one by one until you return from the loop, application throws exception
or is otherwise forced to exit.
The `consume` method returns a [Generator](https://www.php.net/manual/en/class.generator.php) and loops indefinitely,
yielding messages as they are available. `consume` method accepts two parameters: `timeout` and `maxBytes`.
`timeout` is the maximum time the consumer will wait for the messages. `maxBytes` is the maximum size of the messages
to fetch in a single request. Both of these settings are complementary to the settings in `ConsumerOptions` and to the
server settings (check the Kafka documentation for more information).

If you didn't set the consumer to auto-commit messages then you should use consumers `commit` method to do so. Simply
pass it a message you wish to commit. For most cases it's recommended to leave auto-commit off and manually commit
each message as you process them.

```php
$consumerFactory = new ConsumerFactory($restClient);
$consumer = $consumerFactory->create('your-consumer-group', Subscription::topic('your-topic'));
foreach ($consumer->consume() as $message) {
// Do your magic
$logger->info('got new message', $message->content);

// ... and when you are done, commit the message.
$consumer->commit($message);
}
```

### BatchConsumer

BatchConsumer works the same way as Consumer does; the difference is that BatchConsumer doesn't yield each message
separately but first puts them in batches (`MessagesBatch`). These batches can be configured to be "limited" by either
count of messages in the batch (`maxCount`), by time (`maxDuration`) or by both (setting both `maxCount` and
`maxDuration`). If you set `maxDuration` then the batch won't ever take longer than that (+ few ms for processing) as
it changes the `timeout` parameter of the consumer (consumer won't get stuck on network waiting for more messages).

BatchConsumer can be quite useful for processing of "large" data sets, where you would have to otherwise batch
the messages yourself (eg. for inserting into database, where batch operations are always better).

As mentioned in the Consumer example, you may need to commit the messages. For that there is a `commit` method,
which accepts the yielded `MessagesBatch` and commits all the messages inside it in one request.

```php
$batchConsumerFactory = new BatchConsumerFactory($restClient);
$batchConsumer = $batchConsumerFactory->create(
'your-consumer-group',
Subscription::topic('your-topic'),
$maxCount = 10000, // Yield the batch when there is 10 000 messages in it
$maxDuration = 60 // or when 60 seconds passes, whichever comes first.
);
foreach ($batchConsumer->consume() as $messagesBatch) {
// The batch might be empty if you specified the maxDuration.
if ($messagesBatch->isEmpty()) {
continue;
}

// Do your magic
$database->insert($messagesBatch->getMessages());

// ... and when you are done, commit the batch.
$batchConsumer->commit($messagesBatch);
}
```
54 changes: 54 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
{
"name": "grongor/kafka-rest-client",
"description": "Client for Confluent REST Proxy of Apache Kafka",
"type": "library",
"license": "MIT",
"keywords": ["kafka", "confluent", "proxy", "rest", "client", "librdkafka", "rdkafka"],
"config": {
"sort-packages": true
},
"require": {
"php": "^7.2|^8.0",
"beberlei/assert": "^3.2",
"jms/serializer": "^3.3",
"lcobucci/clock": "^1.2",
"psr/http-client": "^1.0",
"psr/http-factory": "^1.0",
"psr/http-message": "^1.0 >=1.0.1",
"psr/log": "^1.1",
"teapot/status-code": "^1.1",
"thecodingmachine/safe": "^0.1.16"
},
"require-dev": {
"cdn77/coding-standard": "^2.0",
"guzzlehttp/psr7": "^1.6",
"http-interop/http-factory-guzzle": "^1.0",
"jakub-onderka/php-parallel-lint": "^1.0.0",
"mockery/mockery": "^1.2 >=1.2.3",
"php-http/message": "^1.0",
"phpstan/extension-installer": "^1.0",
"phpstan/phpstan": "^0.11.19",
"phpstan/phpstan-beberlei-assert": "^0.11.2",
"phpstan/phpstan-mockery": "^0.11.3",
"phpstan/phpstan-phpunit": "^0.11.2",
"phpstan/phpstan-strict-rules": "^0.11.1",
"phpunit/phpunit": "^8.0",
"roave/security-advisories": "dev-master",
"slevomat/coding-standard": "dev-master as 5.0.4",
"thecodingmachine/phpstan-safe-rule": "^0.1.4"
},
"conflict": {
"php-http/httplug": "^1.0",
"doctrine/annotations": "<1.7.0"
},
"autoload": {
"psr-4": {
"Grongor\\KafkaRest\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"Grongor\\KafkaRest\\Tests\\": "tests/"
}
}
}
12 changes: 12 additions & 0 deletions phpcs.xml.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0"?>
<ruleset
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="vendor/squizlabs/php_codesniffer/phpcs.xsd"
>
<arg name="cache" value=".phpcs-cache"/>

<rule ref="Cdn77CodingStandard"/>

<file>src/</file>
<file>tests/</file>
</ruleset>
6 changes: 6 additions & 0 deletions phpstan.neon.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
parameters:
memory-limit: -1
level: max
paths:
- %currentWorkingDirectory%/src
- %currentWorkingDirectory%/tests
22 changes: 22 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- https://phpunit.de/manual/current/en/appendixes.configuration.html -->
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="vendor/phpunit/phpunit/phpunit.xsd"
beStrictAboutChangesToGlobalState="true"
beStrictAboutOutputDuringTests="true"
beStrictAboutTodoAnnotatedTests="true"
colors="true"
bootstrap="tests/bootstrap.php"
executionOrder="random"
>
<testsuites>
<testsuite name="Main Test Suite">
<directory>tests</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory suffix=".php">src/</directory>
</whitelist>
</filter>
</phpunit>
26 changes: 26 additions & 0 deletions src/Api/Exception/UnexpectedApiResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace Grongor\KafkaRest\Api\Exception;

use RuntimeException;
use function Safe\sprintf;

final class UnexpectedApiResponse extends RuntimeException
{
public static function fromStatus(int $httpCode, string $message) : self
{
return new self(sprintf('Unexpected HTTP status %d: %s', $httpCode, $message));
}

/**
* @param array<string, int|string> $response
*/
public static function fromResponse(int $httpCode, array $response) : self
{
return new self(
sprintf('Unexpected response HTTP %d: [%d] %s', $httpCode, $response['error_code'], $response['message'])
);
}
}
Loading

0 comments on commit 3c29c1a

Please sign in to comment.