Skip to content
This repository has been archived by the owner on Nov 14, 2022. It is now read-only.

Commit

Permalink
new dnstap output
Browse files Browse the repository at this point in the history
  • Loading branch information
dmachard committed Dec 30, 2020
1 parent 7a15f92 commit 9e95407
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 21 deletions.
23 changes: 22 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ in JSON, YAML or one line text format and more.
* [TCP](#tcp)
* [Syslog](#syslog)
* [Metrics](#metrics)
* [Dnstap](#dnstap)
* [More options](#more-options)
* [External config file](#external-config-file)
* [Verbose mode](#verbose-mode)
Expand Down Expand Up @@ -181,6 +182,7 @@ Outputs handler can be configured to forward messages in several modes.
- [Metrics](#metrics)
- [TCP](#tcp)
- [Syslog](#syslog)
- [Dnstap](#dnstap)
### Stdout
Expand Down Expand Up @@ -360,7 +362,7 @@ Sep 22 12:43:01 bind CLIENT_RESPONSE NOERROR 192.168.1.100 51718 INET UDP 203b w

This output enables to generate metrics in one line and print-it to stdout. Add the following configuration as external config to activate this output:

```
```yaml
output:
metrics:
# enable or disable
Expand All @@ -384,6 +386,25 @@ Example of output
18 UDP, 0 TCP, 17 DOMAINS
```

### Dnstap

This output enables to send dnstap messages to a remote dnstap receiver. Add the following configuration as external config to activate this output:

```yaml
# forward to another remote dnstap receiver
dnstap:
# enable or disable
enable: true
# retry interval in seconds to connect
retry: 1
# remote ipv4 or ipv6 address of the remote dnstap receiver
remote-address: 10.0.0.51
# remote port of the remote dnstap receiver
remote-port: 6000
# dnstap identity
dnstap-identity: dnstap-receiver
```
## More options
### External config file
Expand Down
9 changes: 7 additions & 2 deletions dnstap_receiver/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,12 @@ async def create_server(loop, cfg, stats, cfg_stats):

# create the server
listen_address = (cfg["local-address"], cfg["local-port"])
srv = await loop.create_server(app.make_handler(access_log=None), *listen_address)
clogger.debug("Api rest: listening on %s:%s" % listen_address )
try:
srv = await loop.create_server(app.make_handler(access_log=None), *listen_address)
except OSError as e:
clogger.error( "http api: %s" % e)
return None

clogger.debug("Webserver: listening on %s:%s" % listen_address )

return srv
22 changes: 11 additions & 11 deletions dnstap_receiver/codecs/dnstap_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ async def cb_ondnstap(dnstap_decoder, payload, cfg, queues_list, stats, geoip_re
# decode binary payload
dnstap_decoder.ParseFromString(payload)
dm = dnstap_decoder.message

tap = { "identity": UnknownValue.name,
"qname": UnknownValue.name,
"rrtype": UnknownValue.name,
"query-ip": UnknownValue.name, "query-port": UnknownValue.name,
"response-ip": UnknownValue.name, "response-port": UnknownValue.name,
"latency": UnknownValue.name}

# type: CLIENT_QUERY
# socket_family: INET
Expand Down Expand Up @@ -60,6 +53,13 @@ async def cb_ondnstap(dnstap_decoder, payload, cfg, queues_list, stats, geoip_re
# response_message: "\2300\201\200\000\001\000\001\000\000\000\001\003www\006google\003com\000\000\034
# \000\001\300\014\000\034\000\001\000\000\000\236\000\020*\000\024P@\007\010\006\000\000\000\000\000\000 \004\000\000)\004\320\000\000\000\000\000\000"

tap = { "identity": UnknownValue.name,
"qname": UnknownValue.name,
"rrtype": UnknownValue.name,
"query-ip": UnknownValue.name, "query-port": UnknownValue.name,
"response-ip": UnknownValue.name, "response-port": UnknownValue.name,
"latency": UnknownValue.name}

# filtering by dnstap identity ?
if len(dnstap_decoder.identity): tap["identity"] = dnstap_decoder.identity.decode()
if cfg["filter"]["dnstap-identities"] is not None:
Expand Down Expand Up @@ -95,8 +95,8 @@ async def cb_ondnstap(dnstap_decoder, payload, cfg, queues_list, stats, geoip_re
if (dm.type % 2 ) == 1 :
tap["length"] = len(dm.query_message)
tap["timestamp"] = dm.query_time_sec + round(dm.query_time_nsec )*1e-9
tap["time_sec"] = dm.query_time_sec
tap["time_nsec"] = dm.query_time_nsec
tap["time-sec"] = dm.query_time_sec
tap["time-nsec"] = dm.query_time_nsec
tap["type"] = "query"

# hash query and put in cache the arrival time
Expand All @@ -109,8 +109,8 @@ async def cb_ondnstap(dnstap_decoder, payload, cfg, queues_list, stats, geoip_re
if (dm.type % 2 ) == 0 :
tap["length"] = len(dm.response_message)
tap["timestamp"] = dm.response_time_sec + round(dm.response_time_nsec )*1e-9
tap["time_sec"] = dm.response_time_sec
tap["time_nsec"] = dm.response_time_nsec
tap["time-sec"] = dm.response_time_sec
tap["time-nsec"] = dm.response_time_nsec
tap["type"] = "response"

# compute hash of the query and latency
Expand Down
6 changes: 3 additions & 3 deletions dnstap_receiver/dns/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ def decode_question(data):
qname.append(buf[1:length+1])
buf = buf[length+1:]

q = struct.unpack('!HH', buf[1:5])
qtype = q[0]
qclass = q[1]
qtype, qclass = struct.unpack('!HH', buf[1:5])
# qtype = q[0]
# qclass = q[1]
return (b".".join(qname)+ b".", qtype)

def decode_dns(data):
Expand Down
15 changes: 14 additions & 1 deletion dnstap_receiver/dnstap.conf
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,17 @@ output:
# remote ipv4 or ipv6 address of the syslog server
remote-address: null
# remote port of the syslog server
remote-port: null
remote-port: null

# forward to another remote dnstap receiver
dnstap:
# enable or disable
enable: false
# retry interval in seconds to connect
retry: 1
# remote ipv4 or ipv6 address of the remote dnstap receiver
remote-address: null
# remote port of the remote dnstap receiver
remote-port: null
# dnstap identity
dnstap-identity: dnstap-receiver
5 changes: 5 additions & 0 deletions dnstap_receiver/inputs/input_sniffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ async def watch_buffer(cfg, q, queues_list, stats, cache):
# ignore the packet if the dns payload if too small
if len(dns_payload) < DNS_LEN: continue

tap["payload"] = dns_payload
# begin to decode the dns payload
tap["id"] = int.from_bytes(dns_payload[0:2], "big")
tap["type"] = "response" if int.from_bytes(dns_payload[2:4], "big") >> 15 else "query"
Expand All @@ -153,10 +154,14 @@ async def watch_buffer(cfg, q, queues_list, stats, cache):
tap["message"] = "CLIENT_QUERY"
tap["query-ip"] = tap["src-ip"]
tap["query-port"] = tap["src-port"]
tap["response-ip"] = tap["dst-ip"]
tap["response-port"] = tap["dst-port"]
elif cfg["record-client-response"] and tap["type"] == "response" and (tap["src-ip"] in cfg["eth-ip"]):
tap["message"] = "CLIENT_RESPONSE"
tap["query-ip"] = tap["dst-ip"]
tap["query-port"] = tap["dst-port"]
tap["response-ip"] = tap["src-ip"]
tap["response-port"] = tap["src-port"]
else:
continue

Expand Down
123 changes: 123 additions & 0 deletions dnstap_receiver/outputs/output_dnstap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import asyncio
import logging
import socket

clogger = logging.getLogger("dnstap_receiver.console")

from dnstap_receiver.codecs import fstrm
from dnstap_receiver.codecs import dnstap_pb2
from dnstap_receiver.outputs import transform

def checking_conf(cfg):
"""validate the config"""
clogger.debug("Output handler: dnstap")

valid_conf = True

if cfg["remote-address"] is None:
valid_conf = False
clogger.error("Output handler: no remote address provided")

if cfg["remote-port"] is None:
valid_conf = False
clogger.error("Output handler: no port provided")

return valid_conf

async def dnstap_client(output_cfg, queue):
host, port = output_cfg["remote-address"], output_cfg["remote-port"]
clogger.debug("Output handler: connection to %s:%s" % (host,port) )
reader, tcp_writer = await asyncio.open_connection(host, port)
clogger.debug("Output handler: connected")

content_type = b"protobuf:dnstap.Dnstap"
fstrm_handler = fstrm.FstrmHandler()
dnstap = dnstap_pb2.Dnstap()

# framestream - do handshake
ctrl_ready = fstrm_handler.encode(ctrl=fstrm.FSTRM_CONTROL_READY, ct=[content_type])
tcp_writer.write(ctrl_ready)

while True:
data = await reader.read(fstrm_handler.pending_nb_bytes())
if not len(data):
break
fstrm_handler.append(data=data)

# process the buffer, check if we have received a complete frame ?
if fstrm_handler.process():
# Ok, the frame is complete so let's decode it
ctrl, ct, payload = fstrm_handler.decode()

if ctrl == fstrm.FSTRM_CONTROL_ACCEPT:
ctrl_start = fstrm_handler.encode(ctrl=fstrm.FSTRM_CONTROL_START)
tcp_writer.write(ctrl_start)
break

# consume queue and send data frames
while True:
# read item from queue
tap = await queue.get()

dnstap.Clear()

dnstap.type = 1
dnstap.version = b"-"
dnstap.identity = output_cfg["dnstap-identity"].encode()

dnstap.message.type = dnstap_pb2._MESSAGE_TYPE.values_by_name[tap["message"]].number
dnstap.message.socket_protocol = dnstap_pb2._SOCKETPROTOCOL.values_by_name[tap["protocol"]].number
dnstap.message.socket_family = dnstap_pb2._SOCKETFAMILY.values_by_name[tap["family"]].number

if tap["type"] == "query":
dnstap.message.query_message = tap["payload"]
dnstap.message.query_time_nsec = tap["time-nsec"]
dnstap.message.query_time_sec = tap["time-sec"]
else:
dnstap.message.response_message = tap["payload"]
dnstap.message.response_time_nsec = tap["time-nsec"]
dnstap.message.response_time_sec = tap["time-sec"]

if tap["family"] == "INET":
dnstap.message.query_address = socket.inet_pton(socket.AF_INET, tap["query-ip"])
dnstap.message.response_address = socket.inet_pton(socket.AF_INET, tap["response-ip"])

if tap["family"] == "INET6":
dnstap.message.query_address = socket.inet_pton(socket.AF_INET6, tap["query-ip"])
dnstap.message.response_address = socket.inet_pton(socket.AF_INET6, tap["response-ip"])

dnstap.message.query_port = tap["query-port"]
dnstap.message.query_port = tap["response-port"]

# convert to dnstap message
data = fstrm_handler.encode(ctrl=fstrm.FSTRM_DATA_FRAME, payload=dnstap.SerializeToString())
tcp_writer.write(data)

# connection lost ? exit and try to reconnect
if tcp_writer.transport._conn_lost:
break

# done continue to next item
queue.task_done()

# something
clogger.error("Output handler: connection lost")

async def handle(output_cfg, queue, metrics):
"""handle output"""
server_address = (output_cfg["remote-address"], output_cfg["remote-port"])
loop = asyncio.get_event_loop()

clogger.debug("Output handler: DNS tap enabled")
while True:
try:
await dnstap_client(output_cfg, queue)
except ConnectionRefusedError:
clogger.error('Output handler: connection to remote dnstap receiver failed!')
except TimeoutError:
clogger.error('Output handler: connection to remote dnstap receiver timed out!')
else:
clogger.error('Output handler: connection to remote dnstap receiver is closed.')

clogger.debug("Output handler: retry to connect every %ss" % output_cfg["retry"])
await asyncio.sleep(output_cfg["retry"])
16 changes: 13 additions & 3 deletions dnstap_receiver/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from dnstap_receiver.outputs import output_syslog
from dnstap_receiver.outputs import output_tcp
from dnstap_receiver.outputs import output_metrics
from dnstap_receiver.outputs import output_dnstap

from dnstap_receiver import api_server
from dnstap_receiver import statistics
Expand Down Expand Up @@ -152,6 +153,12 @@ def setup_outputs(cfg, stats):
queues_list.append(queue_metrics)
loop.create_task(output_metrics.handle(conf["metrics"], queue_metrics, stats))

if conf["dnstap"]["enable"]:
if not output_dnstap.checking_conf(cfg=conf["dnstap"]): return
queue_dnstap = asyncio.Queue()
queues_list.append(queue_dnstap)
loop.create_task(output_dnstap.handle(conf["dnstap"], queue_dnstap, stats))

return queues_list

def setup_inputs(cfg, queues_outputs, stats, geoip_reader, running):
Expand All @@ -177,8 +184,11 @@ def setup_webserver(cfg, stats):
"""setup web api"""
if not cfg["web-api"]["enable"]: return

loop.create_task(api_server.create_server(loop, cfg=cfg["web-api"], stats=stats, cfg_stats=cfg["statistics"]) )

svr = api_server.create_server(loop, cfg=cfg["web-api"], stats=stats, cfg_stats=cfg["statistics"])
if svr is None: return

loop.create_task( svr)

def setup_geoip(cfg):
if not cfg["enable"]: return None
if cfg["city-database"] is None: return None
Expand Down Expand Up @@ -222,7 +232,7 @@ def start_receiver():

# start the http api
setup_webserver(cfg, stats)

# run event loop
try:
loop.run_forever()
Expand Down

0 comments on commit 9e95407

Please sign in to comment.