Skip to content

Commit

Permalink
Merge pull request #867 from tempesta-tech/ao-860
Browse files Browse the repository at this point in the history
Fix #860: Add functional tests for TCP connection closing.
  • Loading branch information
aleksostapenko authored Dec 4, 2017
2 parents 93740e5 + 8155a09 commit 87d11b3
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 24 deletions.
2 changes: 1 addition & 1 deletion tempesta_fw/t/functional/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__all__ = ['tf_cfg', 'deproxy', 'nginx', 'tempesta', 'error', 'flacky']
__all__ = ['tf_cfg', 'deproxy', 'nginx', 'tempesta', 'error', 'flacky', 'analyzer']

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
130 changes: 130 additions & 0 deletions tempesta_fw/t/functional/helpers/analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
"""
Instruments for network traffic analysis
"""
from __future__ import print_function
import os
from threading import Thread
from scapy.all import *
from . import remote, tf_cfg, error

__author__ = 'Tempesta Technologies, Inc.'
__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.'
__license__ = 'GPL2'


FIN = 0x01
SYN = 0x02
RST = 0x04
PSH = 0x08
ACK = 0x10
URG = 0x20
ECE = 0x40
CWR = 0x80


class Sniffer(object):

def __init__(self, node, host, count=0,
timeout=30, port=80,
node_close=True):
self.node = node
self.port = port
self.thread = None
self.captured = 0
self.packets = []
self.dump_file = '/tmp/tmp_packet_dump'
cmd = 'timeout %s tcpdump -i any %s -w - tcp port %s || true'
count_flag = ('-c %s' % count) if count else ''
self.cmd = cmd % (timeout, count_flag, port)
self.err_msg = ' '.join(["Can't %s sniffer on", host])
self.node_side_close = node_close

def sniff(self):
'''Thread function for starting system sniffer and saving
its output. We need to use temporary file here, because
scapy.sniff(offline=file_obj) interface does not support
neither StringIO objects nor paramiko file objects.
'''
stdout, stderr = self.node.run_cmd(self.cmd, timeout=None,
err_msg=(self.err_msg % 'start'))
match = re.search(r'(\d+) packets captured', stderr)
if match:
self.captured = int(match.group(1))
with open(self.dump_file, 'w') as f:
f.write(stdout)

def start(self):
self.thread = Thread(target=self.sniff)
self.thread.start()

def stop(self):
if self.thread:
self.thread.join()
if os.path.exists(self.dump_file):
self.packets = sniff(count=self.captured,
offline=self.dump_file)
os.remove(self.dump_file)
else:
error.bug('Dump file "%s" does not exist!' % self.dump_file)

def check_results(self):
"""Analyzing captured packets. Should be called after start-stop cycle.
Should be redefined in sublasses.
"""
return True

class AnalyzerCloseRegular(Sniffer):

def portcmp(self, packet, invert=False):
if self.node_side_close and invert:
return packet[TCP].dport == self.port
elif self.node_side_close and not invert:
return packet[TCP].sport == self.port
elif not self.node_side_close and invert:
return packet[TCP].sport == self.port
else:
return packet[TCP].dport == self.port

def check_results(self):
"""Four-way (FIN-ACK-FIN-ACK) and
three-way (FIN-ACK/FIN-ACK) handshake order checking.
"""
if not self.packets:
return False

dbg_dump(5, self.packets, 'AnalyzerCloseRegular: FIN sequence:')

count_seq = 0
l_seq = 0
for p in self.packets:
if p[TCP].flags & RST:
return False
if count_seq >= 4:
return False
if count_seq == 0 and p[TCP].flags & FIN and self.portcmp(p):
l_seq = p[TCP].seq + p[IP].len - p[IP].ihl * 4 - p[TCP].dataofs * 4
count_seq += 1
continue
if count_seq == 1 and p[TCP].flags & ACK and self.portcmp(p, invert=True):
if p[TCP].ack > l_seq:
count_seq += 1
if count_seq == 2 and p[TCP].flags & FIN and self.portcmp(p, invert=True):
l_seq = p[TCP].seq + p[IP].len - p[IP].ihl * 4 - p[TCP].dataofs * 4
count_seq += 1
continue
if count_seq == 3 and p[TCP].flags & ACK and self.portcmp(p):
if p[TCP].ack > l_seq:
count_seq += 1

if count_seq != 4:
return False

return True

def dbg_dump(level, packets, msg):
if tf_cfg.v_level() >= level:
print(msg, file=sys.stderr)
for p in packets:
print(p.show(), file=sys.stderr)

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
44 changes: 22 additions & 22 deletions tempesta_fw/t/functional/helpers/deproxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,20 +212,26 @@ def __init__(self, message_text=None, body_parsing=True, body_void=False):

def parse_text(self, message_text, body_parsing=True):
self.body_parsing = body_parsing
self.msg = message_text
stream = StringIO(self.msg)
stream = StringIO(message_text)
self.__parse(stream)
self.__set_str_msg()

def __parse(self, stream):
self.parse_firstline(stream)
self.parse_headers(stream)
self.body = ''
self.parse_body(stream)

def __set_str_msg(self):
self.msg = str(self)

@abc.abstractmethod
def parse_firstline(self, stream):
pass

def get_firstline(self):
return ''

def parse_headers(self, stream):
self.headers = HeaderCollection.from_stream(stream)

Expand Down Expand Up @@ -270,10 +276,6 @@ def read_sized_body(self, stream, size):
if size == 0:
return
self.body = stream.read(size)
# Remove CRLF
line = stream.readline()
if line.rstrip('\r\n'):
raise ParseError('No CRLF after body.')
if len(self.body) != size:
raise ParseError(("Wrong body size: expect %d but got %d!"
% (size, len(self.body))))
Expand All @@ -292,11 +294,11 @@ def __ne__(self, other):
return not HttpMessage.__eq__(self, other)

def __str__(self):
return ''.join([str(self.headers), '\r\n', self.body, str(self.trailer)])
return ''.join([self.get_firstline(), '\r\n', str(self.headers), '\r\n',
self.body, str(self.trailer)])

def update(self, firstline):
message = '\r\n'.join([firstline, str(self)])
self.parse_text(message)
def update(self):
self.parse_text(str(self))

def set_expected(self, *args, **kwargs):
for obj in [self.headers, self.trailer]:
Expand Down Expand Up @@ -366,6 +368,9 @@ def parse_firstline(self, stream):
if not self.method in self.methods:
raise ParseError('Invalid request method!')

def get_firstline(self):
return ' '.join([self.method, self.uri, self.version])

def __eq__(self, other):
return ((self.method == other.method)
and (self.version == other.version)
Expand All @@ -375,10 +380,6 @@ def __eq__(self, other):
def __ne__(self, other):
return not Request.__eq__(self, other)

def update(self):
HttpMessage.update(self,
' '.join([self.method, self.uri, self.version]))

@staticmethod
def create(method, headers, uri='/', version='HTTP/1.1', date=False,
body=None):
Expand Down Expand Up @@ -413,6 +414,11 @@ def parse_firstline(self, stream):
except:
raise ParseError('Invalid Status code!')

def get_firstline(self):
status = int(self.status)
reason = BaseHTTPRequestHandler.responses[status][0]
return ' '.join([self.version, self.status, reason])

def __eq__(self, other):
return ((self.status == other.status)
and (self.version == other.version)
Expand All @@ -422,12 +428,6 @@ def __eq__(self, other):
def __ne__(self, other):
return not Response.__eq__(self, other)

def update(self):
status = int(self.status)
reason = reason = BaseHTTPRequestHandler.responses[status][0]
HttpMessage.update(self,
' '.join([self.version, self.status, reason]))

@staticmethod
def create(status, headers, version='HTTP/1.1', date=False,
srv_version=None, body=None, body_void=False):
Expand Down Expand Up @@ -459,7 +459,6 @@ def __init__(self, host=None, port=80):

def clear(self):
self.request_buffer = ''
self.response_buffer = ''

def set_request(self, request):
if request:
Expand All @@ -484,6 +483,7 @@ def handle_read(self):
try:
response = Response(self.response_buffer,
body_void=(self.request.method == 'HEAD'))
self.response_buffer = self.response_buffer[len(response.msg):]
except IncompliteMessage:
return
except ParseError:
Expand Down Expand Up @@ -533,7 +533,7 @@ def handle_read(self):
tf_cfg.dbg(4, ('Deproxy: SrvConnection: Can\'t parse message\n'
'<<<<<\n%s>>>>>'
% self.request_buffer))
# Hande will be called even if buffer is empty.
# Handler will be called even if buffer is empty.
if not self.request_buffer:
return
tf_cfg.dbg(4, '\tDeproxy: SrvConnection: Recieve request from Tempesta.')
Expand Down
2 changes: 1 addition & 1 deletion tempesta_fw/t/functional/selftests/test_deproxy_message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import print_function
import unittest
from helpers import deproxy
from helpers import deproxy, error

class TestDeproxyMessage(unittest.TestCase):

Expand Down
3 changes: 3 additions & 0 deletions tempesta_fw/t/functional/tcp_connection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ['test_connection_close']

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
81 changes: 81 additions & 0 deletions tempesta_fw/t/functional/tcp_connection/test_connection_close.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
Tests for TCP connection closing.
"""

from __future__ import print_function
from testers import functional
from helpers import analyzer, deproxy, chains
import asyncore

__author__ = 'Tempesta Technologies, Inc.'
__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.'
__license__ = 'GPL2'


class CloseConnection(functional.FunctionalTest):
"""Regular connection closing."""

def stop_and_close(self):
'''To check the correctness of connection closing - we need to close
it before stopping sniffer and analyzing sniffer's output (and throwing
an exception in case of failure); so, we need to close Deproxy client
and server connections in test_* function (not in tearDown).
'''
asyncore.close_all()
self.client.close()
self.client = None
self.tempesta.stop()
self.tempesta = None
self.tester.close_all()
self.tester = None

def create_sniffer(self):
self.sniffer = analyzer.AnalyzerCloseRegular(self.tempesta.node,
self.tempesta.host,
node_close=False,
timeout=10)

def assert_results(self):
self.assertTrue(self.sniffer.check_results(),
msg='Incorrect FIN-ACK sequence detected.')

def create_chains(self):
return [chains.base(forward=True)]

def run_sniffer(self):
self.sniffer.start()
self.generic_test_routine('cache 0;\n', self.create_chains())
self.stop_and_close()
self.sniffer.stop()

def test(self):
self.create_sniffer()
self.run_sniffer()
self.assert_results()


class CloseConnectionError403(CloseConnection):
"""Connection closing due to 403 error, generated by Tempesta."""

def assert_tempesta(self):
pass

def create_chains(self):
chain_200 = chains.base(forward=True)
chain_200.request.body = ''.join(['Arbitrary data ' for _ in range(300)])
chain_200.request.update()
response_403 = deproxy.Response.create(
status=403,
headers=['Content-Length: 0'],
date=deproxy.HttpMessage.date_time_string()
)
chain_403 = deproxy.MessageChain(request = deproxy.Request(),
expected_response = response_403)
return [chain_200, chain_403]

def create_sniffer(self):
self.sniffer = analyzer.AnalyzerCloseRegular(self.tempesta.node,
self.tempesta.host,
timeout=10)

# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4

0 comments on commit 87d11b3

Please sign in to comment.