diff --git a/docs/transport/kafka.md b/docs/transport/kafka.md index 78ba998dc..4fefb6914 100644 --- a/docs/transport/kafka.md +++ b/docs/transport/kafka.md @@ -41,7 +41,7 @@ $connectionFactory = new RdKafkaConnectionFactory('kafka:'); $connectionFactory = new RdKafkaConnectionFactory([]); // connect to Kafka broker at example.com:1000 plus custom options -$connectionFactory = new RdKafkaConnectionFactory([ +$connectionFactory = new RdKafkaConnectionFactory([ 'global' => [ 'group.id' => uniqid('', true), 'metadata.broker.list' => 'example.com:1000', @@ -54,11 +54,11 @@ $connectionFactory = new RdKafkaConnectionFactory([ $context = $connectionFactory->createContext(); -// if you have enqueue/enqueue library installed you can use a factory to build context from DSN +// if you have enqueue/enqueue library installed you can use a factory to build context from DSN $context = (new \Enqueue\ConnectionFactoryFactory())->create('kafka:')->createContext(); ``` -## Send message to topic +## Send message to topic ```php createTopic('foo'); $context->createProducer()->send($fooTopic, $message); ``` -## Send message to queue +## Send message to queue ```php createQueue('foo'); $consumer = $context->createConsumer($fooQueue); -// Enable async commit to gain better performance. +// Enable async commit to gain better performance (true by default since version 0.9.9). //$consumer->setCommitAsync(true); $message = $consumer->receive(); @@ -108,7 +108,7 @@ $consumer->acknowledge($message); ## Serialize message By default the transport serializes messages to json format but you might want to use another format such as [Apache Avro](https://avro.apache.org/docs/1.2.0/). -For that you have to implement Serializer interface and set it to the context, producer or consumer. +For that you have to implement Serializer interface and set it to the context, producer or consumer. If a serializer set to context it will be injected to all consumers and producers created by the context. ```php @@ -119,7 +119,7 @@ use Enqueue\RdKafka\RdKafkaMessage; class FooSerializer implements Serializer { public function toMessage($string) {} - + public function toString(RdKafkaMessage $message) {} } @@ -145,4 +145,44 @@ $consumer->setOffset(123); $message = $consumer->receive(2000); ``` -[back to index](index.md) \ No newline at end of file +## Usage with Symfony bundle + +Set your enqueue to use rdkafka as your transport + +```yaml +# app/config/config.yml + +enqueue: + default: + transport: "rdkafka:" + client: ~ +``` + +You can also you extended configuration to pass additional options, if you don't want to pass them via DSN string or +need to pass specific options. Since rdkafka uses librdkafka (being basically a wrapper around it) most configuration +options are identical to those found at https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md. + +```yaml +# app/config/config.yml + +enqueue: + default: + transport: + dsn: "rdkafka://" + global: + ### Make sure this is unique for each application / consumer group and does not change + ### Otherwise, Kafka won't be able to track your last offset and will always start according to + ### `auto.offset.reset` setting. + ### See Kafka documentation regarding `group.id` property if you want to know more + group.id: 'foo-app' + metadata.broker.list: 'example.com:1000' + topic: + auto.offset.reset: beginning + ### Commit async is true by default since version 0.9.9. + ### It is suggested to set it to true in earlier versions since otherwise consumers become extremely slow, + ### waiting for offset to be stored on Kafka before continuing. + commit_async: true + client: ~ +``` + +[back to index](index.md) diff --git a/pkg/rdkafka/RdKafkaConsumer.php b/pkg/rdkafka/RdKafkaConsumer.php index 5e89f3d34..9522b32ee 100644 --- a/pkg/rdkafka/RdKafkaConsumer.php +++ b/pkg/rdkafka/RdKafkaConsumer.php @@ -51,7 +51,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd $this->context = $context; $this->topic = $topic; $this->subscribed = false; - $this->commitAsync = false; + $this->commitAsync = true; $this->setSerializer($serializer); }