Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kafka] Set commit_async as true by default for Kafka, update docs #810

Merged
merged 1 commit into from
Mar 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions docs/transport/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ $connectionFactory = new RdKafkaConnectionFactory('kafka:');
$connectionFactory = new RdKafkaConnectionFactory([]);

// connect to Kafka broker at example.com:1000 plus custom options
$connectionFactory = new RdKafkaConnectionFactory([
$connectionFactory = new RdKafkaConnectionFactory([
'global' => [
'group.id' => uniqid('', true),
'metadata.broker.list' => 'example.com:1000',
Expand All @@ -54,11 +54,11 @@ $connectionFactory = new RdKafkaConnectionFactory([

$context = $connectionFactory->createContext();

// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
// if you have enqueue/enqueue library installed you can use a factory to build context from DSN
$context = (new \Enqueue\ConnectionFactoryFactory())->create('kafka:')->createContext();
```

## Send message to topic
## Send message to topic

```php
<?php
Expand All @@ -71,7 +71,7 @@ $fooTopic = $context->createTopic('foo');
$context->createProducer()->send($fooTopic, $message);
```

## Send message to queue
## Send message to queue

```php
<?php
Expand All @@ -94,7 +94,7 @@ $fooQueue = $context->createQueue('foo');

$consumer = $context->createConsumer($fooQueue);

// Enable async commit to gain better performance.
// Enable async commit to gain better performance (true by default since version 0.9.9).
//$consumer->setCommitAsync(true);

$message = $consumer->receive();
Expand All @@ -108,7 +108,7 @@ $consumer->acknowledge($message);
## Serialize message

By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/).
For that you have to implement Serializer interface and set it to the context, producer or consumer.
For that you have to implement Serializer interface and set it to the context, producer or consumer.
If a serializer set to context it will be injected to all consumers and producers created by the context.

```php
Expand All @@ -119,7 +119,7 @@ use Enqueue\RdKafka\RdKafkaMessage;
class FooSerializer implements Serializer
{
public function toMessage($string) {}

public function toString(RdKafkaMessage $message) {}
}

Expand All @@ -145,4 +145,44 @@ $consumer->setOffset(123);
$message = $consumer->receive(2000);
```

[back to index](index.md)
## Usage with Symfony bundle

Set your enqueue to use rdkafka as your transport

```yaml
# app/config/config.yml

enqueue:
default:
transport: "rdkafka:"
client: ~
```

You can also you extended configuration to pass additional options, if you don't want to pass them via DSN string or
need to pass specific options. Since rdkafka uses librdkafka (being basically a wrapper around it) most configuration
options are identical to those found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.

```yaml
# app/config/config.yml

enqueue:
default:
transport:
dsn: "rdkafka://"
global:
### Make sure this is unique for each application / consumer group and does not change
### Otherwise, Kafka won't be able to track your last offset and will always start according to
### `auto.offset.reset` setting.
### See Kafka documentation regarding `group.id` property if you want to know more
group.id: 'foo-app'
metadata.broker.list: 'example.com:1000'
topic:
auto.offset.reset: beginning
### Commit async is true by default since version 0.9.9.
### It is suggested to set it to true in earlier versions since otherwise consumers become extremely slow,
### waiting for offset to be stored on Kafka before continuing.
commit_async: true
client: ~
```

[back to index](index.md)
2 changes: 1 addition & 1 deletion pkg/rdkafka/RdKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
$this->context = $context;
$this->topic = $topic;
$this->subscribed = false;
$this->commitAsync = false;
$this->commitAsync = true;

$this->setSerializer($serializer);
}
Expand Down