diff --git a/Makefile b/Makefile index de614a3304..418fd8b91e 100644 --- a/Makefile +++ b/Makefile @@ -139,7 +139,7 @@ lib-profile: --plugin proxy.plugin.WebServerPlugin \ --local-executor \ --backlog 65536 \ - --open-file-limit 65536 + --open-file-limit 65536 \ --log-file /dev/null devtools: diff --git a/README.md b/README.md index 1642bba145..4d611de448 100644 --- a/README.md +++ b/README.md @@ -128,47 +128,47 @@ ```console # On Macbook Pro 2019 / 2.4 GHz 8-Core Intel Core i9 / 32 GB RAM ❯ ./helper/benchmark.sh - CONCURRENCY: 100 workers, TOTAL REQUESTS: 100000 req, QPS: 5000 req/sec, TIMEOUT: 1 sec + CONCURRENCY: 100 workers, TOTAL REQUESTS: 100000 req, QPS: 8000 req/sec, TIMEOUT: 1 sec Summary: - Total: 3.1560 secs - Slowest: 0.0375 secs - Fastest: 0.0006 secs - Average: 0.0031 secs - Requests/sec: 31685.9140 + Total: 3.1217 secs + Slowest: 0.0499 secs + Fastest: 0.0004 secs + Average: 0.0030 secs + Requests/sec: 32033.7261 Total data: 1900000 bytes Size/request: 19 bytes Response time histogram: - 0.001 [1] | - 0.004 [91680] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ - 0.008 [7929] |■■■ - 0.012 [263] | - 0.015 [29] | - 0.019 [8] | - 0.023 [23] | - 0.026 [15] | - 0.030 [27] | - 0.034 [16] | - 0.037 [9] | + 0.000 [1] | + 0.005 [92268] |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■ + 0.010 [7264] |■■■ + 0.015 [318] | + 0.020 [102] | + 0.025 [32] | + 0.030 [6] | + 0.035 [4] | + 0.040 [1] | + 0.045 [2] | + 0.050 [2] | Latency distribution: - 10% in 0.0022 secs - 25% in 0.0025 secs - 50% in 0.0029 secs - 75% in 0.0034 secs - 90% in 0.0041 secs - 95% in 0.0048 secs - 99% in 0.0066 secs + 10% in 0.0017 secs + 25% in 0.0020 secs + 50% in 0.0025 secs + 75% in 0.0036 secs + 90% in 0.0050 secs + 95% in 0.0060 secs + 99% in 0.0087 secs Details (average, fastest, slowest): - DNS+dialup: 0.0000 secs, 0.0006 secs, 0.0375 secs + DNS+dialup: 0.0000 secs, 0.0004 secs, 0.0499 secs DNS-lookup: 0.0000 secs, 0.0000 secs, 0.0000 secs - req write: 0.0000 secs, 0.0000 secs, 0.0046 secs - resp wait: 0.0030 secs, 0.0006 secs, 0.0320 secs - resp read: 0.0000 secs, 0.0000 secs, 0.0029 secs + req write: 0.0000 secs, 0.0000 secs, 0.0020 secs + resp wait: 0.0030 secs, 0.0004 secs, 0.0462 secs + resp read: 0.0000 secs, 0.0000 secs, 0.0027 secs Status code distribution: [200] 100000 responses diff --git a/docs/conf.py b/docs/conf.py index 68fb0974c3..7391c7911f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -267,6 +267,7 @@ (_py_class_role, 'HttpWebServerBasePlugin'), (_py_class_role, 'multiprocessing.context.Process'), (_py_class_role, 'multiprocessing.synchronize.Lock'), + (_py_class_role, 'NonBlockingQueue'), (_py_class_role, 'paramiko.channel.Channel'), (_py_class_role, 'proxy.http.parser.parser.T'), (_py_class_role, 'proxy.plugin.cache.store.base.CacheStore'), diff --git a/helper/benchmark.sh b/helper/benchmark.sh index 653605c43c..aaa62321f0 100755 --- a/helper/benchmark.sh +++ b/helper/benchmark.sh @@ -32,7 +32,7 @@ if [ $(basename $PWD) != "proxy.py" ]; then fi TIMEOUT=1 -QPS=20000 +QPS=8000 CONCURRENCY=100 TOTAL_REQUESTS=100000 OPEN_FILE_LIMIT=65536 @@ -41,15 +41,6 @@ PID_FILE=/tmp/proxy.pid ulimit -n $OPEN_FILE_LIMIT -# time python -m \ -# proxy \ -# --enable-web-server \ -# --plugin proxy.plugin.WebServerPlugin \ -# --backlog $BACKLOG \ -# --open-file-limit $OPEN_FILE_LIMIT \ -# --pid-file $PID_FILE \ -# --log-file /dev/null - PID=$(cat $PID_FILE) if [[ -z "$PID" ]]; then echo "Either pid file doesn't exist or no pid found in the pid file" diff --git a/proxy/common/backports.py b/proxy/common/backports.py index 4734287073..b6f5dfd927 100644 --- a/proxy/common/backports.py +++ b/proxy/common/backports.py @@ -9,8 +9,11 @@ :license: BSD, see LICENSE for more details. """ import time +import threading -from typing import Any +from typing import Any, Deque +from queue import Empty +from collections import deque class cached_property: @@ -80,3 +83,36 @@ def __get__(self, inst: Any, owner: Any) -> Any: finally: cache[self.__name__] = (value, now) return value + + +class NonBlockingQueue: + '''Simple, unbounded, non-blocking FIFO queue. + + Supports only a single consumer. + + NOTE: This is available in Python since 3.7 as SimpleQueue. + Here because proxy.py still supports 3.6 + ''' + + def __init__(self) -> None: + self._queue: Deque[Any] = deque() + self._count: threading.Semaphore = threading.Semaphore(0) + + def put(self, item: Any) -> None: + '''Put the item on the queue.''' + self._queue.append(item) + self._count.release() + + def get(self) -> Any: + '''Remove and return an item from the queue.''' + if not self._count.acquire(False, None): + raise Empty + return self._queue.popleft() + + def empty(self) -> bool: + '''Return True if the queue is empty, False otherwise (not reliable!).''' + return len(self._queue) == 0 + + def qsize(self) -> int: + '''Return the approximate size of the queue (not reliable!).''' + return len(self._queue) diff --git a/proxy/common/constants.py b/proxy/common/constants.py index e679a2e8ef..c941676f0d 100644 --- a/proxy/common/constants.py +++ b/proxy/common/constants.py @@ -108,7 +108,10 @@ def _env_threadless_compliant() -> bool: DEFAULT_MAX_SEND_SIZE = 16 * 1024 DEFAULT_WORK_KLASS = 'proxy.http.HttpProtocolHandler' DEFAULT_ENABLE_PROXY_PROTOCOL = False -DEFAULT_SELECTOR_SELECT_TIMEOUT = 0.1 +# 25 milliseconds to keep the loops hot +# Will consume ~0.3-0.6% CPU when idle. +DEFAULT_SELECTOR_SELECT_TIMEOUT = 25 / 1000 +DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT = 1 # in seconds DEFAULT_DEVTOOLS_DOC_URL = 'http://proxy' DEFAULT_DEVTOOLS_FRAME_ID = secrets.token_hex(8) diff --git a/proxy/common/utils.py b/proxy/common/utils.py index b25b680522..673b73d1ba 100644 --- a/proxy/common/utils.py +++ b/proxy/common/utils.py @@ -180,10 +180,10 @@ def find_http_line(raw: bytes) -> Tuple[Optional[bytes], bytes]: """Find and returns first line ending in CRLF along with following buffer. If no ending CRLF is found, line is None.""" - parts = raw.split(CRLF) - if len(parts) == 1: - return None, raw - return parts[0], CRLF.join(parts[1:]) + parts = raw.split(CRLF, 1) + return (None, raw) \ + if len(parts) == 1 \ + else (parts[0], parts[1]) def wrap_socket( diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 3fa011d6ff..05cfa09c26 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -13,7 +13,6 @@ acceptor pre """ -import queue import socket import logging import argparse @@ -26,11 +25,11 @@ from multiprocessing.reduction import recv_handle from typing import List, Optional, Tuple -from typing import Any # noqa: W0611 pylint: disable=unused-import from ...common.flag import flags from ...common.utils import is_threadless from ...common.logger import Logger +from ...common.backports import NonBlockingQueue from ...common.constants import DEFAULT_LOCAL_EXECUTOR from ..event import EventQueue @@ -103,7 +102,7 @@ def __init__( self.sock: Optional[socket.socket] = None # Internals self._total: Optional[int] = None - self._local_work_queue: Optional['queue.Queue[Any]'] = None + self._local_work_queue: Optional['NonBlockingQueue'] = None self._local: Optional[LocalExecutor] = None self._lthread: Optional[threading.Thread] = None @@ -118,7 +117,7 @@ def accept(self, events: List[Tuple[selectors.SelectorKey, int]]) -> None: work = (conn, addr or None) if self.flags.local_executor: assert self._local_work_queue - self._local_work_queue.put_nowait(work) + self._local_work_queue.put(work) else: self._work(*work) @@ -171,7 +170,7 @@ def run(self) -> None: def _start_local(self) -> None: assert self.sock - self._local_work_queue = queue.Queue() + self._local_work_queue = NonBlockingQueue() self._local = LocalExecutor( work_queue=self._local_work_queue, flags=self.flags, diff --git a/proxy/core/acceptor/listener.py b/proxy/core/acceptor/listener.py index 53559f1fbe..bef4b4461f 100644 --- a/proxy/core/acceptor/listener.py +++ b/proxy/core/acceptor/listener.py @@ -99,6 +99,7 @@ def shutdown(self) -> None: def _listen_unix_socket(self) -> None: self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._socket.bind(self.flags.unix_socket_path) self._socket.listen(self.flags.backlog) self._socket.setblocking(False) @@ -106,6 +107,7 @@ def _listen_unix_socket(self) -> None: def _listen_server_port(self) -> None: self._socket = socket.socket(self.flags.family, socket.SOCK_STREAM) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self._socket.bind((str(self.flags.hostname), self.flags.port)) self._socket.listen(self.flags.backlog) self._socket.setblocking(False) diff --git a/proxy/core/acceptor/local.py b/proxy/core/acceptor/local.py index 632497b541..bb4909815f 100644 --- a/proxy/core/acceptor/local.py +++ b/proxy/core/acceptor/local.py @@ -19,14 +19,16 @@ import contextlib from typing import Optional -from typing import Any # noqa: W0611 pylint: disable=unused-import +from typing import Any + +from ...common.backports import NonBlockingQueue # noqa: W0611, F401 pylint: disable=unused-import from .threadless import Threadless logger = logging.getLogger(__name__) -class LocalExecutor(Threadless['queue.Queue[Any]']): +class LocalExecutor(Threadless['NonBlockingQueue']): """A threadless executor implementation which uses a queue to receive new work.""" def __init__(self, *args: Any, **kwargs: Any) -> None: @@ -44,7 +46,7 @@ def work_queue_fileno(self) -> Optional[int]: def receive_from_work_queue(self) -> bool: with contextlib.suppress(queue.Empty): - work = self.work_queue.get(block=False) + work = self.work_queue.get() if isinstance(work, bool) and work is False: return True assert isinstance(work, tuple) diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index fd7ee98add..24976f097f 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -26,7 +26,7 @@ from ...common.logger import Logger from ...common.types import Readables, Writables -from ...common.constants import DEFAULT_SELECTOR_SELECT_TIMEOUT +from ...common.constants import DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT, DEFAULT_SELECTOR_SELECT_TIMEOUT from ..connection import TcpClientConnection from ..event import eventNames, EventQueue @@ -86,6 +86,7 @@ def __init__( Dict[int, int], ] = {} self.wait_timeout: float = DEFAULT_SELECTOR_SELECT_TIMEOUT + self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT @property @abstractmethod @@ -216,12 +217,9 @@ async def _selected_events(self) -> Tuple[ work_by_ids[key.data][1].append(key.fileobj) return (work_by_ids, new_work_available) - async def _wait_for_tasks( - self, - pending: Set['asyncio.Task[bool]'], - ) -> None: + async def _wait_for_tasks(self) -> None: finished, self.unfinished = await asyncio.wait( - pending, + self.unfinished, timeout=self.wait_timeout, return_when=asyncio.FIRST_COMPLETED, ) @@ -236,10 +234,6 @@ def _fromfd(self, fileno: int) -> socket.socket: type=socket.SOCK_STREAM, ) - # TODO: Use cached property to avoid execution repeatedly - # within a second interval. Note that our selector timeout - # is 0.1 second which can unnecessarily result in cleanup - # checks within a second boundary. def _cleanup_inactive(self) -> None: inactive_works: List[int] = [] for work_id in self.works: @@ -294,21 +288,35 @@ async def _run_once(self) -> bool: if teardown: return teardown if len(work_by_ids) == 0: - self._cleanup_inactive() return False # Invoke Threadless.handle_events self.unfinished.update(self._create_tasks(work_by_ids)) # logger.debug('Executing {0} works'.format(len(self.unfinished))) - await self._wait_for_tasks(self.unfinished) + await self._wait_for_tasks() # logger.debug( # 'Done executing works, {0} pending, {1} registered'.format( # len(self.unfinished), len(self.registered_events_by_work_ids), # ), # ) - # Remove and shutdown inactive workers - self._cleanup_inactive() return False + async def _run_forever(self) -> None: + tick = 0 + try: + while True: + if await self._run_once(): + break + # Check for inactive and shutdown signal only second + if (tick * DEFAULT_SELECTOR_SELECT_TIMEOUT) > self.cleanup_inactive_timeout: + self._cleanup_inactive() + if self.running.is_set(): + break + tick = 0 + tick += 1 + finally: + if self.loop: + self.loop.stop() + def run(self) -> None: Logger.setup( self.flags.log_file, self.flags.log_level, @@ -324,10 +332,9 @@ def run(self) -> None: data=wqfileno, ) assert self.loop - while not self.running.is_set(): - # logger.debug('Working on {0} works'.format(len(self.works))) - if self.loop.run_until_complete(self._run_once()): - break + # logger.debug('Working on {0} works'.format(len(self.works))) + self.loop.create_task(self._run_forever()) + self.loop.run_forever() except KeyboardInterrupt: pass finally: @@ -336,4 +343,5 @@ def run(self) -> None: self.selector.unregister(wqfileno) self.close_work_queue() assert self.loop is not None + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) self.loop.close() diff --git a/proxy/core/connection/connection.py b/proxy/core/connection/connection.py index 65b01769bd..69036755aa 100644 --- a/proxy/core/connection/connection.py +++ b/proxy/core/connection/connection.py @@ -41,6 +41,7 @@ def __init__(self, tag: int) -> None: self.buffer: List[memoryview] = [] self.closed: bool = False self._reusable: bool = False + self._num_buffer = 0 @property @abstractmethod @@ -74,10 +75,11 @@ def close(self) -> bool: return self.closed def has_buffer(self) -> bool: - return len(self.buffer) > 0 + return self._num_buffer > 0 def queue(self, mv: memoryview) -> None: self.buffer.append(mv) + self._num_buffer += 1 def flush(self) -> int: """Users must handle BrokenPipeError exceptions""" @@ -87,6 +89,7 @@ def flush(self) -> int: sent: int = self.send(mv[:DEFAULT_MAX_SEND_SIZE]) if sent == len(mv): self.buffer.pop(0) + self._num_buffer -= 1 else: self.buffer[0] = memoryview(mv[sent:]) del mv @@ -103,3 +106,4 @@ def reset(self) -> None: assert not self.closed self._reusable = True self.buffer = [] + self._num_buffer = 0 diff --git a/proxy/http/handler.py b/proxy/http/handler.py index 1551b298f1..a29e852e7b 100644 --- a/proxy/http/handler.py +++ b/proxy/http/handler.py @@ -314,45 +314,6 @@ def _optionally_wrap_socket( conn = wrap_socket(conn, self.flags.keyfile, self.flags.certfile) return conn - # FIXME: Returning events is only necessary because we cannot use async context manager - # for < Python 3.8. As a reason, this method is no longer a context manager and caller - # is responsible for unregistering the descriptors. - async def _selected_events(self) -> Tuple[Dict[int, int], Readables, Writables]: - assert self.selector - events = await self.get_events() - for fd in events: - self.selector.register(fd, events[fd]) - ev = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT) - readables = [] - writables = [] - for key, mask in ev: - if mask & selectors.EVENT_READ: - readables.append(key.fileobj) - if mask & selectors.EVENT_WRITE: - writables.append(key.fileobj) - return (events, readables, writables) - - def _flush(self) -> None: - assert self.selector - logger.debug('Flushing pending data') - try: - self.selector.register( - self.work.connection, - selectors.EVENT_WRITE, - ) - while self.work.has_buffer(): - logging.debug('Waiting for client read ready') - ev: List[ - Tuple[selectors.SelectorKey, int] - ] = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT) - if len(ev) == 0: - continue - self.work.flush() - except BrokenPipeError: - pass - finally: - self.selector.unregister(self.work.connection) - def _connection_inactive_for(self) -> float: return time.time() - self.last_activity @@ -403,3 +364,42 @@ async def _run_once(self) -> bool: # work fds repeatedly. for fd in events: self.selector.unregister(fd) + + # FIXME: Returning events is only necessary because we cannot use async context manager + # for < Python 3.8. As a reason, this method is no longer a context manager and caller + # is responsible for unregistering the descriptors. + async def _selected_events(self) -> Tuple[Dict[int, int], Readables, Writables]: + assert self.selector + events = await self.get_events() + for fd in events: + self.selector.register(fd, events[fd]) + ev = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT) + readables = [] + writables = [] + for key, mask in ev: + if mask & selectors.EVENT_READ: + readables.append(key.fileobj) + if mask & selectors.EVENT_WRITE: + writables.append(key.fileobj) + return (events, readables, writables) + + def _flush(self) -> None: + assert self.selector + logger.debug('Flushing pending data') + try: + self.selector.register( + self.work.connection, + selectors.EVENT_WRITE, + ) + while self.work.has_buffer(): + logging.debug('Waiting for client read ready') + ev: List[ + Tuple[selectors.SelectorKey, int] + ] = self.selector.select(timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT) + if len(ev) == 0: + continue + self.work.flush() + except BrokenPipeError: + pass + finally: + self.selector.unregister(self.work.connection) diff --git a/tests/core/test_listener.py b/tests/core/test_listener.py index 667b6f082b..5ca29e58f9 100644 --- a/tests/core/test_listener.py +++ b/tests/core/test_listener.py @@ -34,8 +34,14 @@ def test_setup_and_teardown(self, mock_socket: mock.Mock) -> None: socket.AF_INET6 if flags.hostname.version == 6 else socket.AF_INET, socket.SOCK_STREAM, ) - sock.setsockopt.assert_called_with( - socket.SOL_SOCKET, socket.SO_REUSEADDR, 1, + self.assertEqual(sock.setsockopt.call_count, 2) + self.assertEqual( + sock.setsockopt.call_args_list[0][0], + (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1), + ) + self.assertEqual( + sock.setsockopt.call_args_list[1][0], + (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), ) sock.bind.assert_called_with( (str(flags.hostname), 0), @@ -68,8 +74,14 @@ def test_unix_path_listener(self, mock_socket: mock.Mock, mock_remove: mock.Mock socket.AF_UNIX, socket.SOCK_STREAM, ) - sock.setsockopt.assert_called_with( - socket.SOL_SOCKET, socket.SO_REUSEADDR, 1, + self.assertEqual(sock.setsockopt.call_count, 2) + self.assertEqual( + sock.setsockopt.call_args_list[0][0], + (socket.SOL_SOCKET, socket.SO_REUSEADDR, 1), + ) + self.assertEqual( + sock.setsockopt.call_args_list[1][0], + (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), ) sock.bind.assert_called_with(sock_path) sock.listen.assert_called_with(flags.backlog)