diff --git a/salt/returners/kafka_return.py b/salt/returners/kafka_return.py index d0bcef44641b..474b4958c8f6 100644 --- a/salt/returners/kafka_return.py +++ b/salt/returners/kafka_return.py @@ -3,66 +3,68 @@ ''' Return data to a Kafka topic -:maintainer: Christer Edwards (christer.edwards@gmail.com) -:maturity: 0.1 -:depends: kafka-python +:maintainer: Justin Desilets (justin.desilets@gmail.com) +:maturity: 20181119 +:depends: confluent-kafka :platform: all -To enable this returner install kafka-python and enable the following settings -in the minion config: +To enable this returner install confluent-kafka and enable the following +settings in the minion config: - returner.kafka.hostnames: - - "server1" - - "server2" - - "server3" + returner.kafka.bootstrap: + - "server1:9092" + - "server2:9092" + - "server3:9092" returner.kafka.topic: 'topic' -To use the kafka returner, append '--return kafka' to the Salt command, eg; +To use the kafka returner, append `--return kafka` to the Salt command, eg; salt '*' test.ping --return kafka ''' - -# Import Python libs from __future__ import absolute_import, print_function, unicode_literals import logging import salt.utils.json # Import third-party libs try: - from kafka import KafkaClient, SimpleProducer + from confluent_kafka import Producer HAS_KAFKA = True except ImportError: HAS_KAFKA = False log = logging.getLogger(__name__) + __virtualname__ = 'kafka' def __virtual__(): if not HAS_KAFKA: - return False, 'Could not import kafka returner; kafka-python is not installed.' + return False, 'Could not import kafka returner; confluent-kafka is not installed.' return __virtualname__ -def _get_conn(ret=None): +def _get_conn(): ''' Return a kafka connection ''' - if __salt__['config.option']('returner.kafka.hostnames'): - hostnames = __salt__['config.option']('returner.kafka.hostnames') - return KafkaClient(hostnames) + if __salt__['config.option']('returner.kafka.bootstrap'): + bootstrap = ','.join(__salt__['config.option']('returner.kafka.bootstrap')) else: - log.error('Unable to find kafka returner config option: hostnames') + log.error('Unable to find kafka returner config option: bootstrap') + return None + return bootstrap -def _close_conn(conn): - ''' - Close the kafka connection - ''' - conn.close() +def _delivery_report(err, msg): + ''' Called once for each message produced to indicate delivery result. + Triggered by poll() or flush(). ''' + if err is not None: + log.error('Message delivery failed: %s', err) + else: + log.debug('Message delivered to %s [%s]', msg.topic(), msg.partition()) def returner(ret): @@ -72,10 +74,11 @@ def returner(ret): if __salt__['config.option']('returner.kafka.topic'): topic = __salt__['config.option']('returner.kafka.topic') - conn = _get_conn(ret) - producer = SimpleProducer(conn) - producer.send_messages(topic, salt.utils.json.dumps(ret)) + conn = _get_conn() + producer = Producer({'bootstrap.servers': conn}) + producer.poll(0) + producer.produce(topic, salt.utils.json.dumps(ret), str(ret).encode('utf-8'), callback=_delivery_report) - _close_conn(conn) + producer.flush() else: log.error('Unable to find kafka returner config option: topic')