From bebcae90e5d23e3b224e1bcb6ea036ac03997db4 Mon Sep 17 00:00:00 2001 From: Alexander Ostapenko Date: Mon, 27 Nov 2017 19:51:39 +0700 Subject: [PATCH] Fix #860: Add functional tests for TCP connection closing. --- tempesta_fw/t/functional/helpers/__init__.py | 2 +- tempesta_fw/t/functional/helpers/analyzer.py | 123 ++++++++++++++++++ tempesta_fw/t/functional/helpers/deproxy.py | 12 +- .../selftests/test_deproxy_message.py | 2 +- .../t/functional/tcp_connection/__init__.py | 3 + .../tcp_connection/test_connection_close.py | 78 +++++++++++ 6 files changed, 212 insertions(+), 8 deletions(-) create mode 100755 tempesta_fw/t/functional/helpers/analyzer.py create mode 100644 tempesta_fw/t/functional/tcp_connection/__init__.py create mode 100644 tempesta_fw/t/functional/tcp_connection/test_connection_close.py diff --git a/tempesta_fw/t/functional/helpers/__init__.py b/tempesta_fw/t/functional/helpers/__init__.py index 76f8b9e7c8..3007f1110c 100644 --- a/tempesta_fw/t/functional/helpers/__init__.py +++ b/tempesta_fw/t/functional/helpers/__init__.py @@ -1,3 +1,3 @@ -__all__ = ['tf_cfg', 'deproxy', 'nginx', 'tempesta', 'error', 'flacky'] +__all__ = ['tf_cfg', 'deproxy', 'nginx', 'tempesta', 'error', 'flacky', 'analyzer'] # vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tempesta_fw/t/functional/helpers/analyzer.py b/tempesta_fw/t/functional/helpers/analyzer.py new file mode 100755 index 0000000000..8bc752abab --- /dev/null +++ b/tempesta_fw/t/functional/helpers/analyzer.py @@ -0,0 +1,123 @@ +""" +Instruments for network traffic analysis +""" +from __future__ import print_function +import os +from threading import Thread +from scapy.all import * +from . import remote, tf_cfg + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + + +FIN = 0x01 +SYN = 0x02 +RST = 0x04 +PSH = 0x08 +ACK = 0x10 +URG = 0x20 +ECE = 0x40 +CWR = 0x80 + + +class Sniffer(object): + + def __init__(self, node, host, count=0, + timeout=30, port=80, + node_close=True): + self.node = node + self.port = port + self.thread = None + self.captured = 0 + self.packets = [] + self.dump_file = '/tmp/tmp_packet_dump' + cmd = 'timeout %s tcpdump -i any %s-w - tcp port %s || true' + count_flag = ('-c %s ' % count) if count else '' + self.cmd = cmd % (timeout, count_flag, port) + self.err_msg = ' '.join(["Can't %s sniffer on", host]) + self.node_side_close = node_close + + def sniff(self): + stdout, stderr = self.node.run_cmd(self.cmd, timeout=None, + err_msg=(self.err_msg % 'start')) + match = re.search(r'(\d+) packets captured', stderr) + if match: + self.captured = int(match.group(1)) + with open(self.dump_file, 'w') as f: + f.write(stdout) + + def start(self): + self.thread = Thread(target=self.sniff) + self.thread.start() + + def stop(self): + if self.thread: + self.thread.join() + if os.path.exists(self.dump_file): + self.packets = sniff(count=self.captured, + offline=self.dump_file) + os.remove(self.dump_file) + + def check_results(self): + """Analyzing captured packets. Should be called after start-stop cycle. + Should be redefined in sublasses. + """ + return True + +class AnalyzerCloseRegular(Sniffer): + + def portcmp(self, packet, invert=False): + if self.node_side_close and invert: + return packet[TCP].dport == self.port + elif self.node_side_close and not invert: + return packet[TCP].sport == self.port + elif not self.node_side_close and invert: + return packet[TCP].sport == self.port + else: + return packet[TCP].dport == self.port + + def check_results(self): + """Four-way (FIN-ACK-FIN-ACK) and + three-way (FIN-ACK/FIN-ACK) handshake order checking. + """ + if not self.packets: + return False + + dbg_dump(5, self.packets, 'AnalyzerCloseRegular: FIN sequence:') + + count_seq = 0 + l_seq = 0 + for p in self.packets: + if p[TCP].flags & RST: + return False + if count_seq >= 4: + return False + if count_seq == 0 and p[TCP].flags & FIN and self.portcmp(p): + l_seq = p[TCP].seq + p[IP].len - p[IP].ihl * 4 - p[TCP].dataofs * 4 + count_seq += 1 + continue + if count_seq == 1 and p[TCP].flags & ACK and self.portcmp(p, invert=True): + if p[TCP].ack > l_seq: + count_seq += 1 + if count_seq == 2 and p[TCP].flags & FIN and self.portcmp(p, invert=True): + l_seq = p[TCP].seq + p[IP].len - p[IP].ihl * 4 - p[TCP].dataofs * 4 + count_seq += 1 + continue + if count_seq == 3 and p[TCP].flags & ACK and self.portcmp(p): + if p[TCP].ack > l_seq: + count_seq += 1 + + if count_seq != 4: + return False + + return True + +def dbg_dump(level, packets, msg): + if tf_cfg.v_level() >= level: + print(msg, file=sys.stderr) + for p in packets: + print(p.show(), file=sys.stderr) + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tempesta_fw/t/functional/helpers/deproxy.py b/tempesta_fw/t/functional/helpers/deproxy.py index ee46aa56ac..f5e1a475f4 100644 --- a/tempesta_fw/t/functional/helpers/deproxy.py +++ b/tempesta_fw/t/functional/helpers/deproxy.py @@ -270,10 +270,6 @@ def read_sized_body(self, stream, size): if size == 0: return self.body = stream.read(size) - # Remove CRLF - line = stream.readline() - if line.rstrip('\r\n'): - raise ParseError('No CRLF after body.') if len(self.body) != size: raise ParseError(("Wrong body size: expect %d but got %d!" % (size, len(self.body)))) @@ -413,6 +409,10 @@ def parse_firstline(self, stream): except: raise ParseError('Invalid Status code!') + def get_length(self): + firstline = ' '.join([self.version, self.status, self.reason]) + return len('\r\n'.join([firstline, str(self)])) + def __eq__(self, other): return ((self.status == other.status) and (self.version == other.version) @@ -459,7 +459,6 @@ def __init__(self, host=None, port=80): def clear(self): self.request_buffer = '' - self.response_buffer = '' def set_request(self, request): if request: @@ -484,6 +483,7 @@ def handle_read(self): try: response = Response(self.response_buffer, body_void=(self.request.method == 'HEAD')) + self.response_buffer = self.response_buffer[response.get_length():] except IncompliteMessage: return except ParseError: @@ -533,7 +533,7 @@ def handle_read(self): tf_cfg.dbg(4, ('Deproxy: SrvConnection: Can\'t parse message\n' '<<<<<\n%s>>>>>' % self.request_buffer)) - # Hande will be called even if buffer is empty. + # Handler will be called even if buffer is empty. if not self.request_buffer: return tf_cfg.dbg(4, '\tDeproxy: SrvConnection: Recieve request from Tempesta.') diff --git a/tempesta_fw/t/functional/selftests/test_deproxy_message.py b/tempesta_fw/t/functional/selftests/test_deproxy_message.py index 490142a5c4..03a3c057cb 100644 --- a/tempesta_fw/t/functional/selftests/test_deproxy_message.py +++ b/tempesta_fw/t/functional/selftests/test_deproxy_message.py @@ -1,6 +1,6 @@ from __future__ import print_function import unittest -from helpers import deproxy +from helpers import deproxy, error class TestDeproxyMessage(unittest.TestCase): diff --git a/tempesta_fw/t/functional/tcp_connection/__init__.py b/tempesta_fw/t/functional/tcp_connection/__init__.py new file mode 100644 index 0000000000..ed40a0164f --- /dev/null +++ b/tempesta_fw/t/functional/tcp_connection/__init__.py @@ -0,0 +1,3 @@ +__all__ = ['test_connection_close'] + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4 diff --git a/tempesta_fw/t/functional/tcp_connection/test_connection_close.py b/tempesta_fw/t/functional/tcp_connection/test_connection_close.py new file mode 100644 index 0000000000..84cefa06b4 --- /dev/null +++ b/tempesta_fw/t/functional/tcp_connection/test_connection_close.py @@ -0,0 +1,78 @@ +""" +Tests for TCP connection closing. +""" + +from __future__ import print_function +from testers import functional +from helpers import analyzer, deproxy, chains +import asyncore + +__author__ = 'Tempesta Technologies, Inc.' +__copyright__ = 'Copyright (C) 2017 Tempesta Technologies, Inc.' +__license__ = 'GPL2' + + +class CloseConnection(functional.FunctionalTest): + """Regular connection closing.""" + + def stop_and_close(self): + asyncore.close_all() + self.client.close() + self.client = None + self.tempesta.stop() + self.tempesta = None + self.tester.close_all() + self.tester = None + + def create_sniffer(self): + self.sniffer = analyzer.AnalyzerCloseRegular(self.tempesta.node, + self.tempesta.host, + node_close=False, + timeout=10) + + def assert_results(self): + self.assertTrue(self.sniffer.check_results(), + msg='Incorrect FIN-ACK sequence detected.') + + def create_chains(self): + return [chains.base(forward=True)] + + def run_sniffer(self): + self.sniffer.start() + self.generic_test_routine('cache 0;\n', self.create_chains()) + self.stop_and_close() + self.sniffer.stop() + + def test(self): + self.create_sniffer() + self.run_sniffer() + self.assert_results() + + +class CloseConnectionError403(CloseConnection): + """Connection closing due to 403 error, generated by Tempesta.""" + + def assert_tempesta(self): + pass + + def create_chains(self): + chain_200 = chains.base(forward=True) + chain_200.request.body = ''.join(['Arbitrary data ' for _ in range(300)]) + chain_200.request.update() + response_403 = deproxy.Response.create( + status=403, + headers=['Content-Length: 0'], + date=deproxy.HttpMessage.date_time_string() + ) + chain_403 = deproxy.MessageChain(request = deproxy.Request(), + expected_response = response_403) + return [chain_200, chain_403] + + def create_sniffer(self): + self.sniffer = analyzer.AnalyzerCloseRegular(self.tempesta.node, + self.tempesta.host, + timeout=10) + + + +# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4