Skip to content
This repository has been archived by the owner on Jan 17, 2021. It is now read-only.

Commit

Permalink
fix task dispatch bug #142
Browse files Browse the repository at this point in the history
  • Loading branch information
xyuanmu committed Sep 29, 2017
1 parent cf9d1c9 commit 6effbab
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 76 deletions.
52 changes: 38 additions & 14 deletions lib/http1.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import threading
import Queue
import httplib
Expand All @@ -7,18 +8,18 @@
import connect_control
from google_ip import google_ip
from http_common import *
from config import config


class HTTP1_worker(HTTP_worker):
version = "1.1"
idle_time = 10 * 60

def __init__(self, ssl_sock, close_cb, retry_task_cb):
super(HTTP1_worker, self).__init__(ssl_sock, close_cb, retry_task_cb)
def __init__(self, ssl_sock, close_cb, retry_task_cb, idle_cb):
super(HTTP1_worker, self).__init__(ssl_sock, close_cb, retry_task_cb, idle_cb)

self.last_active_time = self.ssl_sock.create_time
self.last_request_time = self.ssl_sock.create_time
self.task = None

self.task_queue = Queue.Queue()
threading.Thread(target=self.work_loop).start()
Expand All @@ -29,13 +30,14 @@ def get_rtt_rate(self):

def request(self, task):
self.accept_task = False
self.task = task
self.task_queue.put(task)

def keep_alive_thread(self):
ping_interval = 55
ping_interval = 55.0

while connect_control.keep_running and self.keep_running:
time_to_ping = max(ping_interval - (time.time() - self.last_active_time), 0)
time_to_ping = max(ping_interval - (time.time() - self.last_active_time), 0.2)
time.sleep(time_to_ping)

time_now = time.time()
Expand All @@ -46,29 +48,35 @@ def keep_alive_thread(self):
self.close("idle timeout")
return

self.last_active_time = time_now
self.task_queue.put("ping")

def work_loop(self):
while connect_control.keep_running and self.keep_running:
task = self.task_queue.get(True)
if not task:
# None task to exit
# None task means exit
self.accept_task = False
self.keep_running = False
return
elif task == "ping":
self.last_active_time = time.time()
if not self.head_request():
# now many gvs don't support gae
self.accept_task = False
self.keep_running = False
if self.task is not None:
self.retry_task_cb(self.task)
self.task = None

google_ip.recheck_ip(self.ssl_sock.ip)
self.close("keep alive")
return
else:
self.last_active_time = time.time()
continue

# xlog.debug("http1 get task")
xlog.debug("http1 get task")
time_now = time.time()
self.last_request_time = time_now
self.last_active_time = time_now
self.request_task(task)

def request_task(self, task):
Expand Down Expand Up @@ -98,12 +106,15 @@ def request_task(self, task):
xlog.warn("%s h1_request:%r", self.ip, e)
google_ip.report_connect_closed(self.ssl_sock.ip, "request_fail")
self.retry_task_cb(task)
self.task = None
self.close("request fail")
return

task.set_state("h1_get_head")
body_length = int(response.getheader("Content-Length", "0"))
task.content_length = body_length

task.responsed = True
response.headers = response.msg.dict
response.worker = self
response.task = task
Expand All @@ -128,8 +139,11 @@ def request_task(self, task):
speed = body_length / time_cost
task.set_state("h1_finish[SP:%d]" % speed)
self.report_speed(speed, body_length)
self.task = None
self.accept_task = True
self.idle_cb()
self.processed_tasks += 1
self.last_active_time = time.time()
return

to_read = max(end - start, 65535)
Expand Down Expand Up @@ -166,31 +180,41 @@ def head_request(self):
data = request_data.encode()
ret = self.ssl_sock.send(data)
if ret != len(data):
xlog.warn("head send len:%r %d", ret, len(data))
xlog.warn("h1 head send len:%r %d %s", ret, len(data), self.ip)
return False
response = httplib.HTTPResponse(self.ssl_sock, buffering=True)
self.ssl_sock.settimeout(100)
response.begin()

status = response.status
if status != 200:
xlog.debug("appid:%s head fail status:%d", self.ssl_sock.appid, status)
xlog.debug("%s appid:%s head fail status:%d", self.ip, self.ssl_sock.appid, status)
return False

self.rtt = (time.time() - start_time) * 1000
return True
except httplib.BadStatusLine as e:
time_now = time.time()
inactive_time = time_now - self.ssl_sock.last_use_time
inactive_time = time_now - self.last_active_time
head_timeout = time_now - start_time
xlog.debug("%s keep alive fail, inactive_time:%d head_timeout:%d",
self.ssl_sock.ip, inactive_time, head_timeout)
except Exception as e:
xlog.warn("%s head appid:%s request fail:%r", self.ssl_sock.ip, self.ssl_sock.appid, e)
xlog.debug("h1 %s appid:%s HEAD keep alive request fail:%r", self.ssl_sock.ip, self.ssl_sock.appid, e)

def close(self, reason=""):
# Notify loop to exit
# This function may be call by out side http2
# When gae_proxy found the appid or ip is wrong
self.accept_task = False
self.keep_running = False

if self.task is not None:
if self.task.responsed:
self.task.put_data("")
else:
self.retry_task_cb(self.task)
self.task = None

super(HTTP1_worker, self).close(reason)
self.task_queue.put(None)
69 changes: 50 additions & 19 deletions lib/http2_connection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

import Queue
import threading
import socket
Expand All @@ -10,8 +11,6 @@
from http_common import *


from hyper.common.bufsocket import BufferedSocket

from hyper.common.bufsocket import BufferedSocket

from hyper.packages.hyperframe.frame import (
Expand Down Expand Up @@ -60,22 +59,31 @@ def blocked(self):
return self.initial_window_size - self.window_size


class RawFrame(object):
def __init__(self, dat):
self.dat = dat

def serialize(self):
return self.dat


class HTTP2_worker(HTTP_worker):
version = "2"

def __init__(self, ssl_sock, close_cb, retry_task_cb):
super(HTTP2_worker, self).__init__(ssl_sock, close_cb, retry_task_cb)
def __init__(self, ssl_sock, close_cb, retry_task_cb, idle_cb):
super(HTTP2_worker, self).__init__(ssl_sock, close_cb, retry_task_cb, idle_cb)

self.max_concurrent = 20
self.network_buffer_size = 128 * 1024

# Google http/2 time out is 4 mins.
ssl_sock.settimeout(240)
self.ssl_sock.settimeout(240)
self._sock = BufferedSocket(ssl_sock, self.network_buffer_size)

self.next_stream_id = 1
self.streams = {}
self.last_ping_time = time.time()
self.last_active_time = self.ssl_sock.create_time

# count ping not ACK
# increase when send ping
Expand Down Expand Up @@ -124,8 +132,14 @@ def __init__(self, ssl_sock, close_cb, retry_task_cb):
threading.Thread(target=self.send_loop).start()
threading.Thread(target=self.recv_loop).start()

# export api
def request(self, task):
# this is the export api

if not self.keep_running:
# race condition
self.retry_task_cb(task)
return

if len(self.streams) > self.max_concurrent:
self.accept_task = False

Expand All @@ -140,6 +154,7 @@ def request_task(self, task):
# http/2 client use odd stream_id
self.next_stream_id += 2

self.ssl_sock.settimeout(100)
stream = Stream(self, self.ip, stream_id, self.ssl_sock.host, task,
self._send_cb, self._close_stream_cb, self.encoder, self.decoder,
FlowControlManager(self.local_settings[SettingsFrame.INITIAL_WINDOW_SIZE]),
Expand All @@ -151,7 +166,7 @@ def send_loop(self):
while connect_control.keep_running and self.keep_running:
frame = self.send_queue.get(True)
if not frame:
# None frame to exist
# None frame means exist
break

# xlog.debug("%s Send:%s", self.ip, str(frame))
Expand All @@ -161,18 +176,24 @@ def send_loop(self):
# don't flush for small package
# reduce send api call

if self.send_queue._qsize():
continue

# wait for payload frame
time.sleep(0.001)
time.sleep(0.01)
# combine header and payload in one tcp package.
if not self.send_queue._qsize():
self._sock.flush()
except socket.error as e:
if e.errno not in (errno.EPIPE, errno.ECONNRESET):
xlog.warn("%s http2 send fail:%r", self.ip, e)
else:
xlog.exceptiong("send error:%r", e)
xlog.exception("send error:%r", e)

self.close("send fail:%r", e)
self.close("send fail:%r" % e)
except Exception as e:
xlog.debug("http2 %s send error:%r", self.ip, e)
self.close("send fail:%r" % e)

def recv_loop(self):
while connect_control.keep_running and self.keep_running:
Expand All @@ -187,30 +208,33 @@ def get_rtt_rate(self):

def close(self, reason=""):
self.keep_running = False
self.accept_task = False
# Notify loop to exit
# This function may be call by out side http2
# When gae_proxy found the appid or ip is wrong
self.send_queue.put(None)

for stream in self.streams.values():
if stream.get_head_time:
# after get header,
if stream.task.responsed:
# response have send to client
# can't retry
stream.close(reason=reason)
else:
self.retry_task_cb(stream.task)
self.streams = {}
super(HTTP2_worker, self).close(reason)

def send_ping(self):
# Use less for GAE server.
p = PingFrame(0)
p.opaque_data = struct.pack("!d", time.time())
self.send_queue.put(p)
self.last_ping_time = time.time()
self.ping_on_way += 1

def _send_preamble(self):
self._sock.send(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n')
self.send_queue.put(RawFrame(b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'))

f = SettingsFrame(0)
f.settings[SettingsFrame.ENABLE_PUSH] = 0
f.settings[SettingsFrame.INITIAL_WINDOW_SIZE] = self.local_settings[SettingsFrame.INITIAL_WINDOW_SIZE]
Expand Down Expand Up @@ -259,23 +283,27 @@ def _close_stream_cb(self, stream_id, reason):

if self.keep_running and len(self.streams) < self.max_concurrent and self.remote_window_size > 10000:
self.accept_task = True
self.idle_cb()

if len(self.streams) == 0:
self.ssl_sock.settimeout(240)

self.processed_tasks += 1

def _consume_single_frame(self):
try:
header = self._sock.recv(9)
except Exception as e:
xlog.warn("%s _consume_single_frame:%r", self.ip, e)
xlog.debug("%s _consume_single_frame:%r, inactive time:%d", self.ip, e, time.time()-self.last_active_time)
self.close("disconnect:%r" % e)
return

# Parse the header. We can use the returned memoryview directly here.
frame, length = Frame.parse_frame_header(header)

if length > FRAME_MAX_LEN:
xlog.error("Frame size exceeded on stream %d (received: %d, max: %d)",
frame.stream_id, length, FRAME_MAX_LEN)
xlog.error("%s Frame size exceeded on stream %d (received: %d, max: %d)",
self.ip, frame.stream_id, length, FRAME_MAX_LEN)
# self._send_rst_frame(frame.stream_id, 6) # 6 = FRAME_SIZE_ERROR

data = self._recv_payload(length)
Expand Down Expand Up @@ -326,6 +354,7 @@ def _consume_frame_payload(self, frame, data):
if frame.stream_id != 0:
try:
self.streams[frame.stream_id].receive_frame(frame)
self.last_active_time = time.time()
except KeyError:
xlog.error("%s Unexpected stream identifier %d", self.ip, frame.stream_id)
else:
Expand Down Expand Up @@ -353,6 +382,7 @@ def receive_frame(self, frame):
p.flags.add('ACK')
p.opaque_data = frame.opaque_data
self._send_cb(p)
self.last_active_time = time.time()

elif frame.type == SettingsFrame.type:
if 'ACK' not in frame.flags:
Expand All @@ -371,10 +401,11 @@ def receive_frame(self, frame):
# If an error occured, try to read the error description from
# code registry otherwise use the frame's additional data.
error_string = frame._extra_info()
time_cost = time.time() - self.last_active_time
if frame.additional_data != "session_timed_out":
xlog.warn("goaway:%s", error_string)
xlog.warn("goaway:%s, t:%d", error_string, time_cost)

self.close("GoAway:%s" % error_string)
self.close("GoAway:%s inactive time:%d" % (error_string, time_cost))

elif frame.type == BlockedFrame.type:
xlog.warn("%s get BlockedFrame", self.ip)
Expand Down Expand Up @@ -419,4 +450,4 @@ def _update_settings(self, frame):
self.remote_settings[SettingsFrame.SETTINGS_MAX_FRAME_SIZE] = new_size

for stream in self.streams.values():
stream.max_frame_size += new_size
stream.max_frame_size += new_size
Loading

0 comments on commit 6effbab

Please sign in to comment.