diff --git a/lib/http1.py b/lib/http1.py index b567848..1029f15 100644 --- a/lib/http1.py +++ b/lib/http1.py @@ -1,3 +1,4 @@ + import threading import Queue import httplib @@ -7,18 +8,18 @@ import connect_control from google_ip import google_ip from http_common import * -from config import config class HTTP1_worker(HTTP_worker): version = "1.1" idle_time = 10 * 60 - def __init__(self, ssl_sock, close_cb, retry_task_cb): - super(HTTP1_worker, self).__init__(ssl_sock, close_cb, retry_task_cb) + def __init__(self, ssl_sock, close_cb, retry_task_cb, idle_cb): + super(HTTP1_worker, self).__init__(ssl_sock, close_cb, retry_task_cb, idle_cb) self.last_active_time = self.ssl_sock.create_time self.last_request_time = self.ssl_sock.create_time + self.task = None self.task_queue = Queue.Queue() threading.Thread(target=self.work_loop).start() @@ -29,13 +30,14 @@ def get_rtt_rate(self): def request(self, task): self.accept_task = False + self.task = task self.task_queue.put(task) def keep_alive_thread(self): - ping_interval = 55 + ping_interval = 55.0 while connect_control.keep_running and self.keep_running: - time_to_ping = max(ping_interval - (time.time() - self.last_active_time), 0) + time_to_ping = max(ping_interval - (time.time() - self.last_active_time), 0.2) time.sleep(time_to_ping) time_now = time.time() @@ -46,29 +48,35 @@ def keep_alive_thread(self): self.close("idle timeout") return - self.last_active_time = time_now self.task_queue.put("ping") def work_loop(self): while connect_control.keep_running and self.keep_running: task = self.task_queue.get(True) if not task: - # None task to exit + # None task means exit + self.accept_task = False + self.keep_running = False return elif task == "ping": - self.last_active_time = time.time() if not self.head_request(): # now many gvs don't support gae + self.accept_task = False + self.keep_running = False + if self.task is not None: + self.retry_task_cb(self.task) + self.task = None + google_ip.recheck_ip(self.ssl_sock.ip) self.close("keep alive") return else: + self.last_active_time = time.time() continue - # xlog.debug("http1 get task") + xlog.debug("http1 get task") time_now = time.time() self.last_request_time = time_now - self.last_active_time = time_now self.request_task(task) def request_task(self, task): @@ -98,12 +106,15 @@ def request_task(self, task): xlog.warn("%s h1_request:%r", self.ip, e) google_ip.report_connect_closed(self.ssl_sock.ip, "request_fail") self.retry_task_cb(task) + self.task = None self.close("request fail") return task.set_state("h1_get_head") body_length = int(response.getheader("Content-Length", "0")) task.content_length = body_length + + task.responsed = True response.headers = response.msg.dict response.worker = self response.task = task @@ -128,8 +139,11 @@ def request_task(self, task): speed = body_length / time_cost task.set_state("h1_finish[SP:%d]" % speed) self.report_speed(speed, body_length) + self.task = None self.accept_task = True + self.idle_cb() self.processed_tasks += 1 + self.last_active_time = time.time() return to_read = max(end - start, 65535) @@ -166,31 +180,41 @@ def head_request(self): data = request_data.encode() ret = self.ssl_sock.send(data) if ret != len(data): - xlog.warn("head send len:%r %d", ret, len(data)) + xlog.warn("h1 head send len:%r %d %s", ret, len(data), self.ip) + return False response = httplib.HTTPResponse(self.ssl_sock, buffering=True) self.ssl_sock.settimeout(100) response.begin() status = response.status if status != 200: - xlog.debug("appid:%s head fail status:%d", self.ssl_sock.appid, status) + xlog.debug("%s appid:%s head fail status:%d", self.ip, self.ssl_sock.appid, status) return False self.rtt = (time.time() - start_time) * 1000 return True except httplib.BadStatusLine as e: time_now = time.time() - inactive_time = time_now - self.ssl_sock.last_use_time + inactive_time = time_now - self.last_active_time head_timeout = time_now - start_time xlog.debug("%s keep alive fail, inactive_time:%d head_timeout:%d", self.ssl_sock.ip, inactive_time, head_timeout) except Exception as e: - xlog.warn("%s head appid:%s request fail:%r", self.ssl_sock.ip, self.ssl_sock.appid, e) + xlog.debug("h1 %s appid:%s HEAD keep alive request fail:%r", self.ssl_sock.ip, self.ssl_sock.appid, e) def close(self, reason=""): # Notify loop to exit # This function may be call by out side http2 # When gae_proxy found the appid or ip is wrong + self.accept_task = False + self.keep_running = False + + if self.task is not None: + if self.task.responsed: + self.task.put_data("") + else: + self.retry_task_cb(self.task) + self.task = None super(HTTP1_worker, self).close(reason) self.task_queue.put(None) \ No newline at end of file diff --git a/lib/http2_connection.py b/lib/http2_connection.py index 5321527..865c581 100644 --- a/lib/http2_connection.py +++ b/lib/http2_connection.py @@ -1,3 +1,4 @@ + import Queue import threading import socket @@ -10,8 +11,6 @@ from http_common import * -from hyper.common.bufsocket import BufferedSocket - from hyper.common.bufsocket import BufferedSocket from hyper.packages.hyperframe.frame import ( @@ -60,22 +59,31 @@ def blocked(self): return self.initial_window_size - self.window_size +class RawFrame(object): + def __init__(self, dat): + self.dat = dat + + def serialize(self): + return self.dat + + class HTTP2_worker(HTTP_worker): version = "2" - def __init__(self, ssl_sock, close_cb, retry_task_cb): - super(HTTP2_worker, self).__init__(ssl_sock, close_cb, retry_task_cb) + def __init__(self, ssl_sock, close_cb, retry_task_cb, idle_cb): + super(HTTP2_worker, self).__init__(ssl_sock, close_cb, retry_task_cb, idle_cb) self.max_concurrent = 20 self.network_buffer_size = 128 * 1024 # Google http/2 time out is 4 mins. - ssl_sock.settimeout(240) + self.ssl_sock.settimeout(240) self._sock = BufferedSocket(ssl_sock, self.network_buffer_size) self.next_stream_id = 1 self.streams = {} self.last_ping_time = time.time() + self.last_active_time = self.ssl_sock.create_time # count ping not ACK # increase when send ping @@ -124,8 +132,14 @@ def __init__(self, ssl_sock, close_cb, retry_task_cb): threading.Thread(target=self.send_loop).start() threading.Thread(target=self.recv_loop).start() + # export api def request(self, task): - # this is the export api + + if not self.keep_running: + # race condition + self.retry_task_cb(task) + return + if len(self.streams) > self.max_concurrent: self.accept_task = False @@ -140,6 +154,7 @@ def request_task(self, task): # http/2 client use odd stream_id self.next_stream_id += 2 + self.ssl_sock.settimeout(100) stream = Stream(self, self.ip, stream_id, self.ssl_sock.host, task, self._send_cb, self._close_stream_cb, self.encoder, self.decoder, FlowControlManager(self.local_settings[SettingsFrame.INITIAL_WINDOW_SIZE]), @@ -151,7 +166,7 @@ def send_loop(self): while connect_control.keep_running and self.keep_running: frame = self.send_queue.get(True) if not frame: - # None frame to exist + # None frame means exist break # xlog.debug("%s Send:%s", self.ip, str(frame)) @@ -161,8 +176,11 @@ def send_loop(self): # don't flush for small package # reduce send api call + if self.send_queue._qsize(): + continue + # wait for payload frame - time.sleep(0.001) + time.sleep(0.01) # combine header and payload in one tcp package. if not self.send_queue._qsize(): self._sock.flush() @@ -170,9 +188,12 @@ def send_loop(self): if e.errno not in (errno.EPIPE, errno.ECONNRESET): xlog.warn("%s http2 send fail:%r", self.ip, e) else: - xlog.exceptiong("send error:%r", e) + xlog.exception("send error:%r", e) - self.close("send fail:%r", e) + self.close("send fail:%r" % e) + except Exception as e: + xlog.debug("http2 %s send error:%r", self.ip, e) + self.close("send fail:%r" % e) def recv_loop(self): while connect_control.keep_running and self.keep_running: @@ -187,22 +208,24 @@ def get_rtt_rate(self): def close(self, reason=""): self.keep_running = False + self.accept_task = False # Notify loop to exit # This function may be call by out side http2 # When gae_proxy found the appid or ip is wrong self.send_queue.put(None) for stream in self.streams.values(): - if stream.get_head_time: - # after get header, + if stream.task.responsed: # response have send to client # can't retry stream.close(reason=reason) else: self.retry_task_cb(stream.task) + self.streams = {} super(HTTP2_worker, self).close(reason) def send_ping(self): + # Use less for GAE server. p = PingFrame(0) p.opaque_data = struct.pack("!d", time.time()) self.send_queue.put(p) @@ -210,7 +233,8 @@ def send_ping(self): self.ping_on_way += 1 def _send_preamble(self): - self._sock.send(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n') + self.send_queue.put(RawFrame(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n')) + f = SettingsFrame(0) f.settings[SettingsFrame.ENABLE_PUSH] = 0 f.settings[SettingsFrame.INITIAL_WINDOW_SIZE] = self.local_settings[SettingsFrame.INITIAL_WINDOW_SIZE] @@ -259,6 +283,10 @@ def _close_stream_cb(self, stream_id, reason): if self.keep_running and len(self.streams) < self.max_concurrent and self.remote_window_size > 10000: self.accept_task = True + self.idle_cb() + + if len(self.streams) == 0: + self.ssl_sock.settimeout(240) self.processed_tasks += 1 @@ -266,7 +294,7 @@ def _consume_single_frame(self): try: header = self._sock.recv(9) except Exception as e: - xlog.warn("%s _consume_single_frame:%r", self.ip, e) + xlog.debug("%s _consume_single_frame:%r, inactive time:%d", self.ip, e, time.time()-self.last_active_time) self.close("disconnect:%r" % e) return @@ -274,8 +302,8 @@ def _consume_single_frame(self): frame, length = Frame.parse_frame_header(header) if length > FRAME_MAX_LEN: - xlog.error("Frame size exceeded on stream %d (received: %d, max: %d)", - frame.stream_id, length, FRAME_MAX_LEN) + xlog.error("%s Frame size exceeded on stream %d (received: %d, max: %d)", + self.ip, frame.stream_id, length, FRAME_MAX_LEN) # self._send_rst_frame(frame.stream_id, 6) # 6 = FRAME_SIZE_ERROR data = self._recv_payload(length) @@ -326,6 +354,7 @@ def _consume_frame_payload(self, frame, data): if frame.stream_id != 0: try: self.streams[frame.stream_id].receive_frame(frame) + self.last_active_time = time.time() except KeyError: xlog.error("%s Unexpected stream identifier %d", self.ip, frame.stream_id) else: @@ -353,6 +382,7 @@ def receive_frame(self, frame): p.flags.add('ACK') p.opaque_data = frame.opaque_data self._send_cb(p) + self.last_active_time = time.time() elif frame.type == SettingsFrame.type: if 'ACK' not in frame.flags: @@ -371,10 +401,11 @@ def receive_frame(self, frame): # If an error occured, try to read the error description from # code registry otherwise use the frame's additional data. error_string = frame._extra_info() + time_cost = time.time() - self.last_active_time if frame.additional_data != "session_timed_out": - xlog.warn("goaway:%s", error_string) + xlog.warn("goaway:%s, t:%d", error_string, time_cost) - self.close("GoAway:%s" % error_string) + self.close("GoAway:%s inactive time:%d" % (error_string, time_cost)) elif frame.type == BlockedFrame.type: xlog.warn("%s get BlockedFrame", self.ip) @@ -419,4 +450,4 @@ def _update_settings(self, frame): self.remote_settings[SettingsFrame.SETTINGS_MAX_FRAME_SIZE] = new_size for stream in self.streams.values(): - stream.max_frame_size += new_size \ No newline at end of file + stream.max_frame_size += new_size diff --git a/lib/http2_stream.py b/lib/http2_stream.py index 40461d4..e9ffc20 100644 --- a/lib/http2_stream.py +++ b/lib/http2_stream.py @@ -1,3 +1,22 @@ +# -*- coding: utf-8 -*- +""" +port from hyper/http20/stream for async +remove push support +increase init window size to improve performance +~~~~~~~~~~~~~~~~~~~ + +Objects that make up the stream-level abstraction of hyper's HTTP/2 support. + + +Conceptually, a single HTTP/2 connection is made up of many streams: each +stream is an independent, bi-directional sequence of HTTP headers and data. +Each stream is identified by a monotonically increasing integer, assigned to +the stream by the endpoint that initiated the stream. +""" + + + + from hyper.common.headers import HTTPHeaderMap from hyper.packages.hyperframe.frame import ( FRAME_MAX_LEN, FRAMES, HeadersFrame, DataFrame, PushPromiseFrame, @@ -219,8 +238,12 @@ def receive_frame(self, frame): w.window_increment = increment self._send_cb(w) elif frame.type == RstStreamFrame.type: - xlog.warn("%s Stream %d forcefully closed.", self.ip, self.stream_id) - self.close("RESET") + # Rest Frame send from server is not define in RFC + # but GAE server will not work on this connection anymore + inactive_time = time.time() - self.connection.last_active_time + xlog.debug("%s Stream %d Rest by server, inactive:%d. error code:%d", + self.ip, self.stream_id, inactive_time, frame.error_code) + self.connection.close("RESET") elif frame.type in FRAMES: # This frame isn't valid at this point. #raise ValueError("Unexpected frame %s." % frame) @@ -241,16 +264,16 @@ def receive_frame(self, frame): self.response_header_datas = None self.task.content_length = int(self.response_headers["Content-Length"][0]) - time_now = self.task.set_state("h2_get_head") - self.get_head_time = time_now + self.task.set_state("h2_get_head") + self.get_head_time = time.time() self.send_response() if 'END_STREAM' in frame.flags: #xlog.debug("%s Closing remote side of stream:%d", self.ip, self.stream_id) time_now = time.time() - time_cose = time_now - self.get_head_time - if time_cose: - speed = self.task.content_length / time_cose + time_cost = time_now - self.get_head_time + if time_cost > 0: + speed = self.task.content_length / time_cost self.task.set_state("h2_finish[SP:%d]" % speed) self.connection.report_speed(speed, self.task.content_length) @@ -259,6 +282,12 @@ def receive_frame(self, frame): self.close("end stream") def send_response(self): + if self.task.responsed: + xlog.error("http2_stream send_response but responsed.%s", self.task.url) + self.close("h2 stream send_response but sended.") + return + + self.task.responsed = True status = int(self.response_headers[b':status'][0]) strip_headers(self.response_headers) response = BaseResponse(status=status, headers=self.response_headers) @@ -268,11 +297,14 @@ def send_response(self): self.task.queue.put(response) def close(self, reason=""): - self.task.put_data("") - # empty block means fail or closed. - self._close_cb(self.stream_id, reason) + if not self.task.responsed: + self.connection.retry_task_cb(self.task) + else: + self.task.put_data("") + # empty block means fail or closed. + def _handle_header_block(self, headers): """ Handles the logic for receiving a completed headers block. diff --git a/lib/http_common.py b/lib/http_common.py index 1b16bfd..5cffb12 100644 --- a/lib/http_common.py +++ b/lib/http_common.py @@ -7,10 +7,18 @@ class GAE_Exception(Exception): - def __init__(self, type, message): - xlog.debug("GAE_Exception %r %r", type, message) - self.type = type - self.message = message + def __init__(self, error_code, message): + xlog.debug("GAE_Exception %r %r", error_code, message) + self.error_code = error_code + self.message = "%r:%s" % (error_code, message) + + def __str__(self): + # for %s + return repr(self.message) + + def __repr__(self): + # for %r + return repr(self.message) class BaseResponse(object): @@ -21,17 +29,31 @@ def __init__(self, status=601, reason="", headers={}, body=""): class Task(object): - def __init__(self, headers, body, queue): + def __init__(self, headers, body, queue, url): self.headers = headers self.body = body self.queue = queue + self.url = url self.start_time = time.time() - self.trace_time = {} + self.unique_id = "%s:%f" % (url, self.start_time) + self.trace_time = [] self.body_queue = Queue.Queue() self.body_len = 0 self.body_readed = 0 self.content_length = None self.read_buffer = "" + self.responsed = False + self.retry_count = 0 + + def to_string(self): + out_str = " Task:%s\r\n" % self.url + out_str += " responsed:%d" % self.responsed + out_str += " retry_count:%d" % self.retry_count + out_str += " start_time:%d" % (time.time() - self.start_time) + out_str += " body_readed:%d\r\n" % self.body_readed + out_str += " Trace:%s" % self.get_trace() + out_str += "\r\n" + return out_str def put_data(self, data): self.body_queue.put(data) @@ -58,41 +80,51 @@ def read(self, size=None): self.read_buffer = "" else: data = self.body_queue.get(block=True) + if not data: + return "" self.body_readed += len(data) return data def set_state(self, stat): + # for debug trace time_now = time.time() - self.trace_time[time_now] = stat + self.trace_time.append((time_now, stat)) + # xlog.debug("%s stat:%s", self.url, stat) return time_now def get_trace(self): - tr_list = collections.OrderedDict(sorted(self.trace_time.items())) out_list = [] last_time = self.start_time - for t, stat in tr_list.items(): + for t, stat in self.trace_time: time_diff = int((t - last_time) * 1000) last_time = t - if time_diff > 1: - out_list.append("%s:%d" % (stat, time_diff)) + out_list.append("%d:%s" % (time_diff, stat)) + out_list.append(":%d" % (time.time()-last_time)) return ",".join(out_list) def response_fail(self, reason=""): + if self.responsed: + xlog.error("http_common responsed_fail but responed.%s", self.url) + self.put_data("") + return + + self.responsed = True err_text = "response_fail:%s" % reason - xlog.debug(err_text) + xlog.debug("%s %s", self.url, err_text) res = BaseResponse(body=err_text) self.queue.put(res) class HTTP_worker(object): - def __init__(self, ssl_sock, close_cb, retry_task_cb): + def __init__(self, ssl_sock, close_cb, retry_task_cb, idle_cb): self.ssl_sock = ssl_sock self.init_rtt = ssl_sock.handshake_time / 3 self.rtt = self.init_rtt self.ip = ssl_sock.ip self.close_cb = close_cb self.retry_task_cb = retry_task_cb + self.idle_cb = idle_cb self.accept_task = True self.keep_running = True self.processed_tasks = 0 @@ -111,4 +143,4 @@ def close(self, reason): self.keep_running = False self.ssl_sock.close() xlog.debug("%s worker close:%s", self.ip, reason) - self.close_cb(self) \ No newline at end of file + self.close_cb(self) diff --git a/lib/openssl_wrap.py b/lib/openssl_wrap.py index 7ce93a0..fe3ad79 100644 --- a/lib/openssl_wrap.py +++ b/lib/openssl_wrap.py @@ -1,3 +1,16 @@ + +# OpenSSL is more stable then ssl +# but OpenSSL is different then ssl, so need a wrapper + +# this wrap has a close callback. +# Which is used by google ip manager(google_ip.py) +# google ip manager keep a connection number counter for every ip. + +# the wrap is used to keep some attribute like ip/appid for ssl + +# __iowait and makefile is used for gevent but not use now. + + import sys import os import select @@ -8,16 +21,16 @@ import OpenSSL SSLError = OpenSSL.SSL.WantReadError +current_path = os.path.dirname(os.path.abspath(__file__)) + from xlog import getLogger xlog = getLogger("gae_proxy") -file_path = os.path.dirname(os.path.abspath(__file__)) -current_path = os.path.abspath(os.path.join(file_path, os.pardir)) - ssl_version = '' openssl_version = OpenSSL.version.__version__ support_alpn_npn = "no" + class SSLConnection(object): """OpenSSL Connection Wrapper""" @@ -28,11 +41,14 @@ def __init__(self, context, sock, ip=None, on_close=None): self._connection = OpenSSL.SSL.Connection(context, sock) self._makefile_refs = 0 self.on_close = on_close + self.timeout = self._sock.gettimeout() or 0.1 + self.running = True + self.socket_closed = False def __del__(self): - if self._sock: + if not self.socket_closed: socket.socket.close(self._sock) - self._sock = None + self.socket_closed = True if self.on_close: self.on_close(self.ip) @@ -41,40 +57,45 @@ def __getattr__(self, attr): return getattr(self._connection, attr) def __iowait(self, io_func, *args, **kwargs): - timeout = self._sock.gettimeout() or 0.1 fd = self._sock.fileno() time_start = time.time() - while self._connection: + while self.running: + time_now = time.time() + wait_timeout = max(0.1, self.timeout - (time_now - time_start)) + wait_timeout = min(wait_timeout, 10) + # in case socket was blocked by FW + # recv is called before send request, which timeout is 240 + # then send request is called and timeout change to 100 + try: return io_func(*args, **kwargs) except (OpenSSL.SSL.WantReadError, OpenSSL.SSL.WantX509LookupError): sys.exc_clear() - _, _, errors = select.select([fd], [], [fd], timeout) + _, _, errors = select.select([fd], [], [fd], wait_timeout) if errors: raise - time_now = time.time() - if time_now - time_start > timeout: + if time_now - time_start > self.timeout: break except OpenSSL.SSL.WantWriteError: sys.exc_clear() - _, _, errors = select.select([], [fd], [fd], timeout) + _, _, errors = select.select([], [fd], [fd], wait_timeout) if errors: raise time_now = time.time() - if time_now - time_start > timeout: + if time_now - time_start > self.timeout: break except OpenSSL.SSL.SysCallError as e: if e[0] == 10035 and 'WSAEWOULDBLOCK' in e[1]: sys.exc_clear() if io_func == self._connection.send: - _, _, errors = select.select([], [fd], [fd], timeout) + _, _, errors = select.select([], [fd], [fd], wait_timeout) else: - _, _, errors = select.select([fd], [], [fd], timeout) + _, _, errors = select.select([fd], [], [fd], wait_timeout) if errors: raise time_now = time.time() - if time_now - time_start > timeout: + if time_now - time_start > self.timeout: break else: raise e @@ -82,6 +103,8 @@ def __iowait(self, io_func, *args, **kwargs): #xlog.exception("e:%r", e) raise e + return 0 + def accept(self): sock, addr = self._sock.accept() client = OpenSSL.SSL.Connection(sock._context, sock) @@ -139,7 +162,7 @@ def recv_into(self, buf): pass return ret - while True: + while self.running: try: ret = self.__iowait(self._connection.recv_into, buf) if not ret: @@ -169,15 +192,23 @@ def write(self, buf, flags=0): def close(self): if self._makefile_refs < 1: - self._connection = None - if self._sock: + self.running = False + if not self.socket_closed: socket.socket.close(self._sock) - self._sock = None + self.socket_closed = True if self.on_close: self.on_close(self.ip) else: self._makefile_refs -= 1 + def settimeout(self, t): + if not self.running: + return + + if self.timeout != t: + self._sock.settimeout(t) + self.timeout = t + def makefile(self, mode='r', bufsize=-1): self._makefile_refs += 1 return socket._fileobject(self, mode, bufsize, close=True)