From fedb4ed22e322ecc9f519d3b0dec2855936f4b6c Mon Sep 17 00:00:00 2001 From: Micheal X Date: Mon, 17 Jul 2023 17:40:45 +1200 Subject: [PATCH] 5.4.2, improve performance. --- .../lib/noarch/front_base/http2_stream.py | 37 ++++++++++++-- .../lib/noarch/front_base/http_common.py | 50 ++++++++++++++----- .../lib/noarch/front_base/http_dispatcher.py | 2 +- .../lib/noarch/hyper/common/bufsocket.py | 4 ++ code/default/version.txt | 2 +- code/default/x_tunnel/local/direct_front.py | 4 ++ code/default/x_tunnel/local/proxy_session.py | 7 +-- .../x_tunnel/local/tls_relay_front/config.py | 3 +- .../local/tls_relay_front/ip_manager.py | 3 +- 9 files changed, 87 insertions(+), 25 deletions(-) diff --git a/code/default/lib/noarch/front_base/http2_stream.py b/code/default/lib/noarch/front_base/http2_stream.py index 5679296153..2f583a7c98 100644 --- a/code/default/lib/noarch/front_base/http2_stream.py +++ b/code/default/lib/noarch/front_base/http2_stream.py @@ -70,6 +70,8 @@ def __init__(self, self.task = task self.state = STATE_IDLE self.get_head_time = None + self.start_connection_point = self.connection._sock.bytes_received + self.get_head_stream_num = 0 # There are two flow control windows: one for data we're sending, # one for data being sent to us. @@ -292,6 +294,7 @@ def receive_frame(self, frame): self.response_header_datas = None self.get_head_time = time.time() + self.get_head_stream_num = len(self.connection.streams) length = self.response_headers.get("Content-Length", None) if isinstance(length, list): @@ -305,14 +308,38 @@ def receive_frame(self, frame): if self.config.http2_show_debug: self.logger.debug("%s Closing remote side of stream:%d", self.connection.ssl_sock.ip_str, self.stream_id) + xcost = self.response_headers.get("X-Cost", -1) + if isinstance(xcost, list): + xcost = float(xcost[0]) + rcost = self.response_headers.get("R-Cost", -1) + if isinstance(rcost, list): + rcost = float(rcost[0]) + time_now = time.time() - time_cost = time_now - self.get_head_time - if time_cost > 0 and \ - isinstance(self.task.content_length, int) and \ - not self.task.finished: - speed = self.task.content_length / time_cost + whole_cost = time_now - self.start_time + receive_cost = time_now - self.get_head_time + bytes_received = self.connection._sock.bytes_received - self.start_connection_point + if receive_cost > 0 and bytes_received > 10000 and not self.task.finished and receive_cost > 0.001: + # speed = bytes_received / receive_cost + speed = (len(self.request_body) + bytes_received) / (whole_cost - xcost) + self.connection.update_speed(speed) self.task.set_state("h2_finish[SP:%d]" % speed) + send_cost = len(self.request_body) / speed + streams_cost = ((self.connection.max_payload /2) * self.get_head_stream_num) / speed + + if xcost >= 0: + rtt = whole_cost - xcost - send_cost - receive_cost # - streams_cost + if self.config.http2_show_debug: + self.logger.debug("%s RTT:%f rtt:%f send_len:%d recv_len:%d " + "whole_Cost:%f xcost:%f rcost:%f send_cost:%f recv_cost:%f " + "streams_cost:%f Speed: %f", + self.connection.ssl_sock.ip_str, + self.connection.rtt * 1000, rtt * 1000, + len(self.request_body), bytes_received, + whole_cost, xcost, rcost, send_cost, receive_cost, streams_cost, speed) + self.connection.update_rtt(rtt) + self._close_remote() self.close("end stream") diff --git a/code/default/lib/noarch/front_base/http_common.py b/code/default/lib/noarch/front_base/http_common.py index be9d948a00..3b19d544c9 100644 --- a/code/default/lib/noarch/front_base/http_common.py +++ b/code/default/lib/noarch/front_base/http_common.py @@ -179,13 +179,15 @@ def finish(self): class HttpWorker(object): + max_payload = 128 * 1024 + def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb, idle_cb, log_debug_data): self.logger = logger self.ip_manager = ip_manager self.config = config self.ssl_sock = ssl_sock - self.rtt = ssl_sock.handshake_time - self.speed = 200 + self.rtt = ssl_sock.handshake_time * 0.001 + self.speed = 5000000 self.ip_str = ssl_sock.ip_str self.close_cb = close_cb self.retry_task_cb = retry_task_cb @@ -195,7 +197,8 @@ def __init__(self, logger, ip_manager, config, ssl_sock, close_cb, retry_task_cb self.keep_running = True self.processed_tasks = 0 self.continue_fail_tasks = 0 - self.speed_history = [] + self.rtt_history = [self.rtt,] + self.speed_history = [self.speed, self.speed, self.speed] self.last_recv_time = self.ssl_sock.create_time self.last_send_time = self.ssl_sock.create_time self.life_end_time = self.ssl_sock.create_time + \ @@ -207,6 +210,7 @@ def __str__(self): o += " running: %s\r\n" % (self.keep_running) o += " processed_tasks: %d\r\n" % (self.processed_tasks) o += " continue_fail_tasks: %s\r\n" % (self.continue_fail_tasks) + o += " rtt_history: %s\r\n" % (self.rtt_history) o += " speed_history: %s\r\n" % (self.speed_history) if self.version != "1.1": o += "streams: %d\r\n" % len(self.streams) @@ -215,13 +219,26 @@ def __str__(self): o += " score: %f\r\n" % (self.get_score()) return o + def update_rtt(self, rtt): + self.rtt_history.append(rtt) + if len(self.rtt_history) > 10: + self.rtt_history.pop(0) + # self.rtt = sum(self.rtt_history) / len(self.rtt_history) + + def update_speed(self, speed): + self.speed_history.append(speed) + if len(self.speed_history) > 10: + self.speed_history.pop(0) + self.speed = sum(self.speed_history) / len(self.speed_history) + def update_debug_data(self, rtt, sent, received, speed): - self.rtt = rtt - if sent + received > 10000: - self.speed_history.append(speed) - if len(self.speed_history) > 10: - self.speed_history.pop(0) - self.speed = sum(self.speed_history) / len(self.speed_history) + # if sent + received > 10000: + # self.speed_history.append(speed) + # if len(self.speed_history) > 10: + # self.speed_history.pop(0) + # self.speed = sum(self.speed_history) / len(self.speed_history) + # else: + # self.rtt = rtt self.log_debug_data(rtt, sent, received) @@ -243,13 +260,20 @@ def close(self, reason): def get_score(self): # The smaller, the better - score = (50 - (self.speed/6.0)) + (self.rtt/20.0) + + score = self.rtt if self.version != "1.1": - score += len(self.streams) * 3 + response_body_len = self.max_payload + for _, stream in self.streams.items(): + if stream.response_body_len == 0: + response_body_len += self.max_payload + else: + response_body_len += stream.response_body_len - stream.task.body_len + score += response_body_len / self.speed if self.config.show_state_debug: - self.logger.debug("get_score %s, speed:%d rtt:%d stream_num:%d score:%d", self.ip_str, - self.speed, self.rtt, len(self.streams), score) + self.logger.debug("get_score %s, speed:%f rtt:%d stream_num:%d score:%f", self.ip_str, + self.speed * 0.000001, self.rtt * 1000, len(self.streams), score) return score diff --git a/code/default/lib/noarch/front_base/http_dispatcher.py b/code/default/lib/noarch/front_base/http_dispatcher.py index c52f9712b9..20ad18ff26 100644 --- a/code/default/lib/noarch/front_base/http_dispatcher.py +++ b/code/default/lib/noarch/front_base/http_dispatcher.py @@ -154,7 +154,7 @@ def create_worker_thread(self): self.connect_all_workers = True try: - ssl_sock = self.connection_manager.get_ssl_connection(timeout=10) + ssl_sock = self.connection_manager.get_ssl_connection(timeout=60) except Exception as e: self._debug_log("create_worker_thread get_ssl_connection fail:%r", e) ssl_sock = None diff --git a/code/default/lib/noarch/hyper/common/bufsocket.py b/code/default/lib/noarch/hyper/common/bufsocket.py index 5e8de87526..5668ed248e 100644 --- a/code/default/lib/noarch/hyper/common/bufsocket.py +++ b/code/default/lib/noarch/hyper/common/bufsocket.py @@ -104,6 +104,9 @@ def __init__(self, sck, buffer_size=1000): # The number of bytes in the buffer. self._bytes_in_buffer = 0 + # record all bytes received from beginning + self.bytes_received = 0 + # following is define for send buffer # all send will be cache and send when flush called, # combine data to reduce the api call @@ -247,6 +250,7 @@ def recv(self, amt): if not count and amt > self._bytes_in_buffer: raise ConnectionResetError() self._bytes_in_buffer += count + self.bytes_received += count # Read out the bytes and update the index. amt = min(amt, self._bytes_in_buffer) diff --git a/code/default/version.txt b/code/default/version.txt index 1e20ec35c6..f430587706 100644 --- a/code/default/version.txt +++ b/code/default/version.txt @@ -1 +1 @@ -5.4.0 \ No newline at end of file +5.4.2 \ No newline at end of file diff --git a/code/default/x_tunnel/local/direct_front.py b/code/default/x_tunnel/local/direct_front.py index 1036062476..3c6fc6c58f 100644 --- a/code/default/x_tunnel/local/direct_front.py +++ b/code/default/x_tunnel/local/direct_front.py @@ -94,6 +94,10 @@ def request(method, host, schema="http", path="/", headers={}, data="", timeout= return response.text, response.status, response +def start(): + pass + + def stop(): pass diff --git a/code/default/x_tunnel/local/proxy_session.py b/code/default/x_tunnel/local/proxy_session.py index 399d3dc233..f7f903a0e8 100644 --- a/code/default/x_tunnel/local/proxy_session.py +++ b/code/default/x_tunnel/local/proxy_session.py @@ -613,7 +613,7 @@ def normal_round_trip_worker(self, work_id): request_session_id = self.session_id upload_data_head = struct.pack("