Skip to content

Commit

Permalink
New tests for pipelined responses
Browse files Browse the repository at this point in the history
- Implement method `flush` to send all pipelined
  responses immediatly.
- Check that after error during pipelined response
  processing occurs Tempesta reestablish connection
  • Loading branch information
EvgeniiMekhanik committed Sep 23, 2024
1 parent 3f5f615 commit 36c12d5
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 15 deletions.
15 changes: 11 additions & 4 deletions framework/deproxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ def __init__(
self._cur_responses_list = []
dbg(self, 6, "New server connection", prefix="\t")

def flush(self):
self._response_buffer.append(b"".join(self._cur_responses_list))
self._cur_pipelined = 0
self._cur_responses_list = []

def writable(self):
if (
self._server.segment_gap != 0
Expand Down Expand Up @@ -97,10 +102,8 @@ def handle_read(self):
tf_cfg.dbg(5, response)
self._cur_responses_list.append(response)
self._cur_pipelined = self._cur_pipelined + 1
if self._pipelined == self._cur_pipelined:
self._response_buffer.append(b"".join(self._cur_responses_list))
self._cur_pipelined = 0
self._cur_responses_list = []
if self._cur_pipelined >= self._pipelined:
self.flush()

if need_close:
self.close()
Expand Down Expand Up @@ -252,6 +255,10 @@ def wait_for_connections(self, timeout=1):
lambda: len(self._connections) < self.conns_n, timeout, poll_freq=0.001
)

def flush(self):
for conn in self._connections:
conn.flush()

@property
def response(self) -> bytes:
return self.__response
Expand Down
25 changes: 18 additions & 7 deletions http2_general/test_h2_responses.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ class H2ResponsesPipelined(H2ResponsesPipelinedBase):
tempesta = {
"config": """
listen 443 proto=h2;
frang_limits {
http_strict_host_checking false;
}
srv_group default {
server ${server_ip}:8000 conns_n=1;
}
Expand Down Expand Up @@ -208,6 +211,7 @@ def test_success_pipelined(self):
)
def test_bad_pipelined(self, name, bad_num):
srv = self.setup_and_start(3)
# The next connection will be not pipelined
self.disable_deproxy_auto_parser()

i = 0
Expand All @@ -231,8 +235,6 @@ def test_bad_pipelined(self, name, bad_num):
client.make_request(self.get_request)
self.assertTrue(srv.wait_for_requests(i))

self.assertEqual(len(srv.requests), i)

i = 0
for id in self.clients_ids:
client = self.get_client(id)
Expand All @@ -245,20 +247,29 @@ def test_bad_pipelined(self, name, bad_num):
self.assertTrue(client.wait_for_response())
self.assertEqual(client._last_response.status, "200")

"""
It seems that we should resend request to server connection
after it reestablished but currently it doesn't work.
srv.wait_for_connections()
req_count = i

i = 0
j = 0
for id in self.clients_ids:
i = i + 1
client = self.get_client(id)
if i > bad_num:
self.assertTrue(client.wait_for_response(timeout=50))
"""
j = j + 1
self.assertTrue(srv.wait_for_requests(req_count + j))
srv.flush()
self.assertTrue(client.wait_for_response())
self.assertEqual(client._last_response.status, "200")


class H2HmResponsesPipelined(H2ResponsesPipelinedBase):
tempesta = {
"config": """
listen 443 proto=h2;
frang_limits {
http_strict_host_checking false;
}
health_check hm0 {
request "GET / HTTP/1.0\r\n\r\n";
Expand Down
22 changes: 18 additions & 4 deletions t_fault_injection/test_fault_injection_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ class TestFailFunctionPipelinedResponses(TestFailFunctionBase):
},
]

ids = ["deproxy_1", "deproxy_2", "deproxy_3"]
clients_ids = ["deproxy_1", "deproxy_2", "deproxy_3"]

@parameterize.expand(
[
Expand All @@ -206,11 +206,10 @@ def test(self, name, func_name, id, msg, times, retval):
srv.pipelined = 3
srv.conns_n = 1
self.start_all_services(client=False)

self.setup_fail_function_test(func_name, times, retval)

i = 0
for id in self.ids:
for id in self.clients_ids:
i = i + 1
client = self.get_client(id)
request = client.create_request(method="GET", headers=[])
Expand All @@ -219,7 +218,7 @@ def test(self, name, func_name, id, msg, times, retval):
srv.wait_for_requests(i)

i = 0
for id in self.ids:
for id in self.clients_ids:
i = i + 1
client = self.get_client(id)
if i >= 2:
Expand All @@ -232,5 +231,20 @@ def test(self, name, func_name, id, msg, times, retval):
"Tempesta doesn't report error",
)

srv.wait_for_connections()
req_count = i

i = 0
j = 0
for id in self.clients_ids:
i = i + 1
client = self.get_client(id)
if i >= 2:
j = j + 1
self.assertTrue(srv.wait_for_requests(req_count + j))
srv.flush()
self.assertTrue(client.wait_for_response())
self.assertEqual(client._last_response.status, "200")

# This should be called in case if test fails also
self.teardown_fail_function_test()
8 changes: 8 additions & 0 deletions tests_disabled_tcpseg.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@
"name": "http2_general.test_h2_hpack.TestHpack.test_big_header_and_body_in_response",
"reason": "These tests should not be run with TCP segmentation."
},
{
"name": "http2_general.test_h2_responses.H2ResponsesPipelined",
"reason": "These tests should not be run with TCP segmentation."
},
{
"name": "http2_general.test_h2_responses.H2HmResponsesPipelined",
"reason": "These tests should not be run with TCP segmentation."
},
{
"name": "http2_general.test_h2_frame.TestH2Frame.test_triple_header_frame_in_single_stream",
"reason": "These tests should not be run with TCP segmentation."
Expand Down

0 comments on commit 36c12d5

Please sign in to comment.