Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RdKafka Transport #134

Merged
merged 11 commits into from
Jul 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
sudo: required

git:
depth: 10

Expand All @@ -6,37 +8,27 @@ language: php
matrix:
include:
- php: 5.6
sudo: false
env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true
- php: 7.1
sudo: false
env: SYMFONY_VERSION=3.0.* PHPSTAN=true
- php: 7.1
sudo: false
env: SYMFONY_VERSION=3.0.* PHP_CS_FIXER=true
- php: 7.0
sudo: false
env: SYMFONY_VERSION=2.8.* UNIT_TESTS=true
- php: 5.6
sudo: false
env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true SYMFONY_DEPRECATIONS_HELPER=weak
- php: 7.0
sudo: false
env: SYMFONY_VERSION=3.0.* UNIT_TESTS=true SYMFONY_DEPRECATIONS_HELPER=weak
- php: 7.1
sudo: required
services: docker
env: SYMFONY_VERSION=2.8.* FUNCTIONAL_TESTS=true
- php: 7.1
sudo: required
services: docker
env: SYMFONY_VERSION=3.0.* FUNCTIONAL_TESTS=true
- php: 7.1
sudo: required
services: docker
env: SYMFONY_VERSION=3.2.* FUNCTIONAL_TESTS=true
- php: 7.1
sudo: required
services: docker
env: SYMFONY_VERSION=3.3.* FUNCTIONAL_TESTS=true

Expand All @@ -54,8 +46,7 @@ install:
- if [ "$FUNCTIONAL_TESTS" = true ]; then bin/dev -b; fi

script:
# misssing pkg/amqp-ext pkg/job-queue pkg/redis
- if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test; fi
- if [ "$PHPSTAN" = true ]; then php -d memory_limit=512M bin/phpstan analyse -l 1 -c phpstan.neon pkg/amqp-ext pkg/async-event-dispatcher pkg/dbal pkg/enqueue pkg/enqueue-bundle pkg/fs pkg/gearman pkg/job-queue pkg/null pkg/pheanstalk pkg/redis pkg/simple-client pkg/sqs pkg/stomp pkg/test pkg/rdkafka; fi
- if [ "$PHP_CS_FIXER" = true ]; then IFS=$'\n'; COMMIT_SCA_FILES=($(git diff --name-only --diff-filter=ACMRTUXB "${TRAVIS_COMMIT_RANGE}")); unset IFS; fi
- if [ "$PHP_CS_FIXER" = true ]; then ./bin/php-cs-fixer fix --config=.php_cs.dist -v --dry-run --stop-on-violation --using-cache=no --path-mode=intersection -- "${COMMIT_SCA_FILES[@]}"; fi
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ Features:

* [Feature rich](docs/quick_tour.md).
* Implements [JMS](https://docs.oracle.com/javaee/7/api/javax/jms/package-summary.html) like transports based on a[queue-interop](https://github.com/queue-interop/queue-interop) interfaces.
* Supported transports [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ), [Beanstalk](docs/transport/pheanstalk.md), [STOMP](docs/transport/stomp.md), [Amazon SQS](docs/transport/sqs.md), [Redis](docs/transport/redis.md), [Gearman](docs/transport/gearman.md), [Doctrine DBAL](docs/transport/dbal.md), [Filesystem](docs/transport/filesystem.md), [Null](docs/transport/null.md).
* Supported transports
* [AMQP](docs/transport/amqp.md) (RabbitMQ, ActiveMQ)
* [Beanstalk](docs/transport/pheanstalk.md)
* [STOMP](docs/transport/stomp.md)
* [Amazon SQS](docs/transport/sqs.md)
* [Kafka](docs/transport/kafka.md)
* [Redis](docs/transport/redis.md)
* [Gearman](docs/transport/gearman.md)
* [Doctrine DBAL](docs/transport/dbal.md)
* [Filesystem](docs/transport/filesystem.md)
* [Null](docs/transport/null.md).
* [Symfony bundle](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/bundle/quick_tour.md)
* [Magento1 extension](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/magento/quick_tour.md)
* [Message bus](http://www.enterpriseintegrationpatterns.com/patterns/messaging/MessageBus.html) support.
Expand Down
1 change: 1 addition & 0 deletions bin/test
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ waitForService mysql 3306 50
waitForService redis 6379 50
waitForService beanstalkd 11300
waitForService gearmand 4730
waitForService kafka 9092

php pkg/job-queue/Tests/Functional/app/console doctrine:database:create
php pkg/job-queue/Tests/Functional/app/console doctrine:schema:update --force
Expand Down
17 changes: 16 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"enqueue/sqs": "*@dev",
"enqueue/pheanstalk": "*@dev",
"enqueue/gearman": "*@dev",
"enqueue/rdkafka": "*@dev",
"kwn/php-rdkafka-stubs": "^1.0.2",
"enqueue/enqueue-bundle": "*@dev",
"enqueue/job-queue": "*@dev",
"enqueue/simple-client": "*@dev",
Expand All @@ -35,7 +37,16 @@
"phpstan/phpstan": "^0.7.0"
},
"autoload": {
"files": ["pkg/enqueue/functions_include.php"]
"files": [
"pkg/enqueue/functions_include.php",
"pkg/rdkafka/Tests/bootstrap.php"
],
"psr-0": {
"RdKafka": "vendor/kwn/php-rdkafka-stubs/stubs"
},
"psr-4": {
"RdKafka\\": "vendor/kwn/php-rdkafka-stubs/stubs/RdKafka"
}
},
"config": {
"bin-dir": "bin"
Expand Down Expand Up @@ -93,6 +104,10 @@
"type": "path",
"url": "pkg/gearman"
},
{
"type": "path",
"url": "pkg/rdkafka"
},
{
"type": "path",
"url": "pkg/simple-client"
Expand Down
18 changes: 18 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ services:
- redis
- beanstalkd
- gearmand
- kafka
- zookeeper
volumes:
- './:/mqdev'
environment:
Expand All @@ -36,6 +38,8 @@ services:
- BEANSTALKD_PORT=11300
- BEANSTALKD_DSN=beanstalk://beanstalkd:11300
- GEARMAN_DSN=gearman://gearmand:4730
- RDKAFKA_HOST=kafka
- RDKAFKA_PORT=9092

rabbitmq:
image: enqueue/rabbitmq:latest
Expand Down Expand Up @@ -71,6 +75,20 @@ services:
volumes:
- ./:/mqdev

zookeeper:
image: 'wurstmeister/zookeeper'
ports:
- '2181:2181'

kafka:
image: 'wurstmeister/kafka:0.10.2.1'
ports:
- '9092:9092'
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
volumes:
- '/var/run/docker.sock:/var/run/docker.sock'

volumes:
mysql-data:
driver: local
9 changes: 8 additions & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@ FROM formapro/nginx-php-fpm:latest-all-exts
## libs
RUN set -x && \
apt-get update && \
apt-get install -y --no-install-recommends wget curl openssl ca-certificates nano netcat php-redis
apt-get install -y wget curl openssl ca-certificates nano netcat php-dev php-redis git python

## confis

# RUN rm -f /etc/php/7.0/cli/conf.d/*xdebug.ini

## librdkafka
RUN git clone https://github.com/edenhill/librdkafka.git /root/librdkafka
RUN cd /root/librdkafka && git checkout v0.11.0-RC2 && ./configure && make && make install
RUN pecl install rdkafka
RUN echo "extension=rdkafka.so" > /etc/php/7.1/cli/conf.d/10-rdkafka.ini
RUN echo "extension=rdkafka.so" > /etc/php/7.1/fpm/conf.d/10-rdkafka.ini

COPY ./php/cli.ini /etc/php/7.1/cli/conf.d/1-dev_cli.ini
COPY ./bin/dev_entrypoiny.sh /usr/local/bin/entrypoint.sh
RUN chmod u+x /usr/local/bin/entrypoint.sh
Expand Down
2 changes: 2 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
- [Amqp (RabbitMQ, ActiveMQ)](transport/amqp.md)
- [Amazon SQS](transport/sqs.md)
- [Beanstalk (Pheanstalk)](transport/pheanstalk.md)
- [Gearman](transport/gearman.md)
- [Kafka](transport/kafka.md)
- [Stomp](transport/stomp.md)
- [Redis](transport/redis.md)
- [Doctrine DBAL](transport/dbal.md)
Expand Down
91 changes: 91 additions & 0 deletions docs/transport/kafka.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Kafka transport

The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ broker.

* [Installation](#installation)
* [Create context](#create-context)
* [Send message to topic](#send-message-to-topic)
* [Send message to queue](#send-message-to-queue)
* [Consume message](#consume-message)

## Installation

```bash
$ composer require enqueue/rdkafka
```

## Create context

```php
<?php
use Enqueue\RdKafka\RdKafkaConnectionFactory;

// connects to localhost:9092
$connectionFactory = new RdKafkaConnectionFactory();

// same as above
$connectionFactory = new RdKafkaConnectionFactory('rdkafka://');

// same as above
$connectionFactory = new RdKafkaConnectionFactory([]);

// connect to Kafka broker at example.com:1000 plus custom options
$connectionFactory = new RdKafkaConnectionFactory([
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => 'example.com:1000',
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'beginning',
],
]);

$psrContext = $connectionFactory->createContext();
```

## Send message to topic

```php
<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */

$message = $psrContext->createMessage('Hello world!');

$fooTopic = $psrContext->createTopic('foo');

$psrContext->createProducer()->send($fooTopic, $message);
```

## Send message to queue

```php
<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */

$message = $psrContext->createMessage('Hello world!');

$fooQueue = $psrContext->createQueue('foo');

$psrContext->createProducer()->send($fooQueue, $message);
```

## Consume message:

```php
<?php
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */

$fooQueue = $psrContext->createQueue('foo');

$consumer = $psrContext->createConsumer($fooQueue);

$message = $consumer->receive();

// process a message

$consumer->acknowledge($message);
// $consumer->reject($message);
```

[back to index](index.md)
4 changes: 4 additions & 0 deletions phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
<directory>pkg/gearman/Tests</directory>
</testsuite>

<testsuite name="rdkafka transport">
<directory>pkg/rdkafka/Tests</directory>
</testsuite>

<testsuite name="enqueue-bundle">
<directory>pkg/enqueue-bundle/Tests</directory>
</testsuite>
Expand Down
6 changes: 6 additions & 0 deletions pkg/rdkafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*~
/composer.lock
/composer.phar
/phpunit.xml
/vendor/
/.idea/
21 changes: 21 additions & 0 deletions pkg/rdkafka/.travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
sudo: false

git:
depth: 1

language: php

php:
- '5.6'
- '7.0'

cache:
directories:
- $HOME/.composer/cache

install:
- composer self-update
- composer install --prefer-source --ignore-platform-reqs

script:
- vendor/bin/phpunit --exclude-group=functional
20 changes: 20 additions & 0 deletions pkg/rdkafka/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2016 Kotliar Maksym

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
26 changes: 26 additions & 0 deletions pkg/rdkafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# RdKafka Transport

[![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby)
[![Build Status](https://travis-ci.org/php-enqueue/rdkafka.png?branch=master)](https://travis-ci.org/php-enqueue/rdkafka)
[![Total Downloads](https://poser.pugx.org/enqueue/rdkafka/d/total.png)](https://packagist.org/packages/enqueue/rdkafka)
[![Latest Stable Version](https://poser.pugx.org/enqueue/rdkafka/version.png)](https://packagist.org/packages/enqueue/rdkafka)

This is an implementation of PSR specification. It allows you to send and consume message via Kafka protocol.

## Resources

* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md)
* [Questions](https://gitter.im/php-enqueue/Lobby)
* [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues)

## Developed by Forma-Pro

Forma-Pro is a full stack development company which interests also spread to open source development.
Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience.
Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability.

If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com

## License

It is released under the [MIT License](LICENSE).
Loading