Skip to content

Commit

Permalink
Merge pull request rthalley#217 from kalou/ecs
Browse files Browse the repository at this point in the history
Implement EDNS Client Subnet option
  • Loading branch information
rthalley authored Dec 2, 2016
2 parents 8a89596 + 2613b44 commit 3676249
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 1 deletion.
98 changes: 97 additions & 1 deletion dns/edns.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,17 @@

"""EDNS Options"""

from __future__ import absolute_import

import math
import struct
import sys

import dns.inet


NSID = 3
ECS = 8


class Option(object):
Expand Down Expand Up @@ -111,6 +121,9 @@ def __init__(self, otype, data):
def to_wire(self, file):
file.write(self.data)

def to_text(self):
return "Generic %d" % self.otype

@classmethod
def from_wire(cls, otype, wire, current, olen):
return cls(otype, wire[current: current + olen])
Expand All @@ -122,10 +135,93 @@ def _cmp(self, other):
return 1
return -1


class ECSOption(Option):
"""EDNS Client Subnet (ECS, RFC7871)"""

def __init__(self, address, srclen=None, scopelen=0):
"""Generate an ECS option
@ivar address: client address information
@type address: string
@ivar srclen: prefix length, leftmost number of bits of the address
to be used for the lookup. Sent by client, mirrored by server in
responses. If not provided at init, will use /24 for v4 and /56 for v6
@type srclen: int
@ivar scopelen: prefix length, leftmost number of bits of the address
that the response covers. 0 in queries, set by server.
@type scopelen: int
"""
super(ECSOption, self).__init__(ECS)
af = dns.inet.af_for_address(address)

if af == dns.inet.AF_INET6:
self.family = 2
if srclen is None:
srclen = 56
elif af == dns.inet.AF_INET:
self.family = 1
if srclen is None:
srclen = 24
else:
raise ValueError('Bad ip family')

self.srclen = srclen
self.scopelen = scopelen
self.address = address

addrdata = dns.inet.inet_pton(af, address)
nbytes = int(math.ceil(srclen/8.0))

# Truncate to srclen and pad to the end of the last octet needed
# See RFC section 6
self.addrdata = addrdata[:nbytes]
last = chr(ord(self.addrdata[-1:]) & (0xff << srclen % 8))
if sys.version_info >= (3,):
last = last.encode('latin1')
self.addrdata = self.addrdata[:-1] + last

def to_text(self):
return "ECS %s/%s scope/%s" % (self.address, self.srclen,
self.scopelen)

def to_wire(self, file):
"""Opt type and len are handled by renderer"""
file.write(struct.pack('!H', self.family))
file.write(struct.pack('!BB', self.srclen, self.scopelen))
file.write(self.addrdata)

@classmethod
def from_wire(cls, otype, wire, cur, olen):
"""Opt type and len are handled by Message.from_wire"""
family, src, scope = struct.unpack('!HBB', wire[cur:cur+4])
cur += 4

addrlen = int(math.ceil(src/8.0))

if family == 1:
af = dns.inet.AF_INET
pad = 4 - addrlen
elif family == 2:
af = dns.inet.AF_INET6
pad = 16 - addrlen
else:
raise ValueError('unsupported family')

addr = dns.inet.inet_ntop(af, wire[cur:cur+addrlen] + b'\x00' * pad)
return cls(addr, src, scope)

def _cmp(self, other):
if self.addrdata == other.addrdata:
return 0
if self.addrdata > other.addrdata:
return 1
return -1

_type_to_class = {
ECS: ECSOption
}


def get_option_class(otype):
cls = _type_to_class.get(otype)
if cls is None:
Expand Down
2 changes: 2 additions & 0 deletions dns/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ def to_text(self, origin=None, relativize=True, **kw):
s.write(u'eflags %s\n' %
dns.flags.edns_to_text(self.ednsflags))
s.write(u'payload %d\n' % self.payload)
for opt in self.options:
s.write(u'option %s\n' % opt.to_text())
is_update = dns.opcode.is_update(self.flags)
if is_update:
s.write(u';ZONE\n')
Expand Down
63 changes: 63 additions & 0 deletions tests/test_edns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# -*- coding: utf-8
# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

from __future__ import print_function

try:
import unittest2 as unittest
except ImportError:
import unittest

from io import BytesIO

import dns.edns

class OptionTestCase(unittest.TestCase):
def testGenericOption(self):
opt = dns.edns.GenericOption(3, b'data')
io = BytesIO()
opt.to_wire(io)
data = io.getvalue()
self.assertEqual(data, b'data')

def testECSOption_prefix_length(self):
opt = dns.edns.ECSOption('1.2.255.33', 20)
io = BytesIO()
opt.to_wire(io)
data = io.getvalue()
self.assertEqual(data, b'\x00\x01\x14\x00\x01\x02\xf0')

def testECSOption_from_wire(self):
opt = dns.edns.option_from_wire(8, b'\x00\x01\x14\x00\x01\x02\xf0',
0, 7)
self.assertEqual(opt.otype, dns.edns.ECS)
self.assertEqual(opt.address, b'1.2.240.0')
self.assertEqual(opt.srclen, 20)
self.assertEqual(opt.scopelen, 0)

def testECSOption(self):
opt = dns.edns.ECSOption('1.2.3.4', 24)
io = BytesIO()
opt.to_wire(io)
data = io.getvalue()
self.assertEqual(data, b'\x00\x01\x18\x00\x01\x02\x03')

def testECSOption_v6(self):
opt = dns.edns.ECSOption('2001:4b98::1')
io = BytesIO()
opt.to_wire(io)
data = io.getvalue()
self.assertEqual(data, b'\x00\x02\x38\x00\x20\x01\x4b\x98\x00\x00\x00')

0 comments on commit 3676249

Please sign in to comment.