Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

Rabbitmq support #51

Merged
merged 9 commits into from
Nov 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,49 @@ Configuration
topic: null
```

### RabbitMQ

This output enables to send dnstap messages to a RabbitMQ queue.

Install extra python library for rabbitmq

```python
pip install dnstap_receiver[rabbitmq]
```

Configuration

```yaml
# forward to a RabbitMQ queue
rabbitmq:
# enable or disable
enable: false
# format available text|json|yaml
format: json
# connection configuration
connection:
username: null
password: null
host: 127.0.0.1
port: 5672
# Queue to forward messages to
queue:
queue: null
passive: false
durable: true
exclusive: false
auto_delete: false
# Exchange, default ''
exchange: ""
# Routing key, default = queue
routing-key: null
# Retries to connect/publish
retry-count: 2
# Retry delay seconds
retry-delay: 0.5
```


### PostgreSQL

This output enables to send dnstap messages to a PostgreSQL.
Expand Down
28 changes: 28 additions & 0 deletions dnstap_receiver/dnstap.conf
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,34 @@ output:
# Kafka topic to forward messages to
topic: null

# forward to a RabbitMQ queue
rabbitmq:
# enable or disable
enable: false
# format available text|json|yaml
format: json
# connection configuration
connection:
username: null
password: null
host: 127.0.0.1
port: 5672
# Queue to forward messages to
queue:
queue: null
passive: false
durable: true
exclusive: false
auto_delete: false
# Exchange, default ''
exchange: ""
# Routing key, default = queue
routing-key: null
# Retries to connect/publish
retry-count: 2
# Retry delay seconds
retry-delay: 0.5

# forward to postgresql server
pgsql:
# enable or disable
Expand Down
127 changes: 127 additions & 0 deletions dnstap_receiver/outputs/output_rabbitmq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import asyncio
import logging
import time

from dnstap_receiver.outputs import transform
from dnstap_receiver import statistics

try:
import pika
has_pika = True
except:
has_pika = False

clogger = logging.getLogger("dnstap_receiver.console")


def checking_conf(cfg: dict) -> bool:
"""validate the config"""
clogger.debug("Output handler: rabbitmq")

valid_conf = True

if not has_pika:
valid_conf = False
clogger.error("Output handler: rabbitmq: confluent_kafka dependency is missing")

if cfg["connection"]["username"] is None \
or cfg["connection"]["password"] is None \
or cfg["connection"]["host"] is None \
or cfg["connection"]["port"] is None:
valid_conf = False
clogger.error("Output handler: rabbitmq: missing connection details")

if cfg["queue"]["queue"] is None:
valid_conf = False
clogger.error("Output handler: rabbitmq: no queue provided")

return valid_conf


class RabbitMQ:
"""Class to handle RabbitMQ connections and channel push"""

def __init__(self, output_cfg: dict) -> None:
self.cfg = output_cfg
self.cfg["routing_key"] = output_cfg.get("routing_key", output_cfg["queue"]["queue"])

self.connection: pika.BlockingConnection = None
self.channel: pika.adapters.blocking_connection.BlockingChannel = None

self.credentials = pika.PlainCredentials(
self.cfg["connection"]["username"],
self.cfg["connection"]["password"]
)
self.connection_params = pika.ConnectionParameters(
host=self.cfg["connection"]["host"],
port=self.cfg["connection"]["port"],
credentials=self.credentials
)
self.init_connection()

def init_connection(self) -> None:
"""init connection and channel"""
if self.connection and self.connection.is_open:
return

self.connection = pika.BlockingConnection(self.connection_params)

self.channel = self.connection.channel()
self.channel.queue_declare(
queue = self.cfg["queue"]["queue"],
passive = self.cfg["queue"]["passive"],
durable = self.cfg["queue"]["durable"],
exclusive = self.cfg["queue"]["exclusive"],
auto_delete = self.cfg["queue"]["auto_delete"]
)


def publish(self, msg) -> None:
"""publish msg to the channel"""
for attempt in range(self.cfg['retry-count']):
try:
self.init_connection()
clogger.debug("RabbitMQ publish")
self.channel.basic_publish(
exchange=self.cfg["exchange"],
routing_key=self.cfg["routing_key"],
body=msg
)
except (pika.exceptions.ConnectionClosed,
pika.exceptions.StreamLostError,
pika.exceptions.ChannelWrongStateError,
ConnectionResetError
) as connection_error:
clogger.debug(connection_error)
clogger.debug(f"Publish failed, trying to reconnect, attepmt {attempt +1}")
time.sleep(self.cfg['retry-delay'])
else:
break
else:
clogger.error(f"Publish failed. Connection error after {self.cfg['retry-count']}")


def close_connection(self):
"""properly close the connection"""
if self.connection and self.connection.is_open:
self.connection.close()



async def handle(output_cfg: dict, queue: asyncio.Queue, _metrics: statistics.Statistics, start_shutdown: asyncio.Event):
"""Connect to rabbit and push the messages from the queue"""

rabbitmq = RabbitMQ(output_cfg=output_cfg)
clogger.info("Output handler: rabbitmq: Enabled")
while not start_shutdown.is_set():
try:
tapmsg = await asyncio.wait_for(queue.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
msg = transform.convert_dnstap(fmt=output_cfg["format"], tapmsg=tapmsg)
rabbitmq.publish(msg)
queue.task_done()

# tell producer to shut down
clogger.info("Output handler: rabbitmq: Triggering producer shutdown")
rabbitmq.close_connection()
7 changes: 7 additions & 0 deletions dnstap_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dnstap_receiver.outputs import output_dnstap
from dnstap_receiver.outputs import output_kafka
from dnstap_receiver.outputs import output_pgsql
from dnstap_receiver.outputs import output_rabbitmq

from dnstap_receiver import api_server
from dnstap_receiver import statistics
Expand Down Expand Up @@ -176,6 +177,12 @@ def setup_outputs(cfg, stats, start_shutdown):
queues_list.append(queue_pgsql)
loop.create_task(output_pgsql.handle(conf["pgsql"], queue_pgsql, stats, start_shutdown))

if conf["rabbitmq"]["enable"]:
if not output_rabbitmq.checking_conf(cfg=conf["rabbitmq"]): return
queue_rabbitmq = asyncio.Queue()
queues_list.append(queue_rabbitmq)
loop.create_task(output_rabbitmq.handle(conf["rabbitmq"], queue_rabbitmq, stats, start_shutdown))

return queues_list

def setup_inputs(cfg, queues_outputs, stats, geoip_reader, start_shutdown):
Expand Down
1 change: 1 addition & 0 deletions setup.j2
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ setuptools.setup(
extras_require={
"kafka": ["confluent-kafka"],
"pgsql": ["asyncpg"],
"rabbitmq": ["pika"],
}
)
7 changes: 7 additions & 0 deletions tests/dnstap_rabbitmq.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
trace:
# enable verbose mode
verbose: true

output:
rabbitmq:
enable: true
7 changes: 7 additions & 0 deletions tests/test_external_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,10 @@ def test5_output_pgsql_enable(self):
o = execute_dnstap(cmd)

self.assertRegex(o, b"Output handler: pgsql")

def test5_output_rabbitmq_enable(self):
"""test to enable rabbitmq output"""
cmd = 'python3 -c "from dnstap_receiver.receiver import start_receiver; start_receiver()" -c ./tests/dnstap_rabbitmq.conf'
o = execute_dnstap(cmd)

self.assertRegex(o, b"Output handler: rabbitmq")