From 3b38f2e058593d8ac7ad0c1f5c8bb550784ac924 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 04:32:44 +0530 Subject: [PATCH 01/26] Allow overriding work_klass via Proxy context manager kwargs --- examples/https_connect_tunnel.py | 4 ++-- examples/ssl_echo_server.py | 4 ++-- examples/tcp_echo_server.py | 4 ++-- examples/web_scraper.py | 5 +++-- proxy/core/acceptor/pool.py | 3 ++- proxy/proxy.py | 12 +++++++----- 6 files changed, 18 insertions(+), 14 deletions(-) diff --git a/examples/https_connect_tunnel.py b/examples/https_connect_tunnel.py index f98b3e4ec1..b34ed7ff82 100644 --- a/examples/https_connect_tunnel.py +++ b/examples/https_connect_tunnel.py @@ -12,12 +12,12 @@ from typing import Any, Optional +from proxy import Proxy from proxy.common.flag import FlagParser from proxy.common.utils import build_http_response from proxy.http.codes import httpStatusCodes from proxy.http.parser import httpParserStates from proxy.http.methods import httpMethods -from proxy.core.acceptor import AcceptorPool from proxy.core.base import BaseTcpTunnelHandler @@ -75,7 +75,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]: def main() -> None: # This example requires `threadless=True` - with AcceptorPool( + with Proxy( flags=FlagParser.initialize( port=12345, num_workers=1, threadless=True, ), diff --git a/examples/ssl_echo_server.py b/examples/ssl_echo_server.py index b02f1f2f3a..9359bcaeb1 100644 --- a/examples/ssl_echo_server.py +++ b/examples/ssl_echo_server.py @@ -11,9 +11,9 @@ import time from typing import Optional +from proxy import Proxy from proxy.common.flag import FlagParser from proxy.common.utils import wrap_socket -from proxy.core.acceptor import AcceptorPool from proxy.core.connection import TcpClientConnection from proxy.core.base import BaseTcpServerHandler @@ -45,7 +45,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]: def main() -> None: # This example requires `threadless=True` - with AcceptorPool( + with Proxy( flags=FlagParser.initialize( port=12345, num_workers=1, diff --git a/examples/tcp_echo_server.py b/examples/tcp_echo_server.py index 8453c9536b..e65c3bcb2f 100644 --- a/examples/tcp_echo_server.py +++ b/examples/tcp_echo_server.py @@ -11,8 +11,8 @@ import time from typing import Optional +from proxy import Proxy from proxy.common.flag import FlagParser -from proxy.core.acceptor import AcceptorPool from proxy.core.base import BaseTcpServerHandler @@ -30,7 +30,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]: def main() -> None: # This example requires `threadless=True` - with AcceptorPool( + with Proxy( flags=FlagParser.initialize( port=12345, num_workers=1, threadless=True, ), diff --git a/examples/web_scraper.py b/examples/web_scraper.py index 27f79533e7..3c42060ae6 100644 --- a/examples/web_scraper.py +++ b/examples/web_scraper.py @@ -13,8 +13,9 @@ from typing import Dict +from proxy import Proxy from proxy.common.flag import FlagParser -from proxy.core.acceptor import Work, AcceptorPool +from proxy.core.acceptor import Work from proxy.common.types import Readables, Writables @@ -56,7 +57,7 @@ def handle_events( if __name__ == '__main__': - with AcceptorPool( + with Proxy( flags=FlagParser.initialize( port=12345, num_workers=1, diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 426f51be61..40868b79b2 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -99,7 +99,8 @@ class AcceptorPool: def __init__( self, flags: argparse.Namespace, - work_klass: Type[Work], event_queue: Optional[EventQueue] = None, + work_klass: Type[Work], + event_queue: Optional[EventQueue] = None, ) -> None: self.flags = flags # Eventing core queue diff --git a/proxy/proxy.py b/proxy/proxy.py index 583e8d23b4..fb5983dcad 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -101,13 +101,15 @@ class Proxy: for message sharing and/or signaling. """ - def __init__(self, input_args: Optional[List[str]], **opts: Any) -> None: + def __init__( + self, + input_args: Optional[List[str]] = None, + work_klass: Type[Work] = HttpProtocolHandler, + **opts: Any, + ) -> None: + self.work_klass: Type[Work] = work_klass self.flags = FlagParser.initialize(input_args, **opts) self.pool: Optional[AcceptorPool] = None - # TODO(abhinavsingh): Allow users to override the worker class itself - # e.g. A clear text protocol. Or imagine a TelnetProtocolHandler instead - # of default HttpProtocolHandler. - self.work_klass: Type[Work] = HttpProtocolHandler self.event_manager: Optional[EventManager] = None def __enter__(self) -> 'Proxy': From c4fc97f03abe83c893282e3ed4871a52d6fd82c2 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 06:19:43 +0530 Subject: [PATCH 02/26] Decouple acceptor and executor pools --- proxy/common/constants.py | 1 + proxy/core/acceptor/__init__.py | 2 + proxy/core/acceptor/acceptor.py | 152 ++++++++++++------------------- proxy/core/acceptor/executors.py | 114 +++++++++++++++++++++++ proxy/core/acceptor/pool.py | 27 +++--- proxy/proxy.py | 18 +++- 6 files changed, 208 insertions(+), 106 deletions(-) create mode 100644 proxy/core/acceptor/executors.py diff --git a/proxy/common/constants.py b/proxy/common/constants.py index 757f09e3cc..2fa7e9f609 100644 --- a/proxy/common/constants.py +++ b/proxy/common/constants.py @@ -87,6 +87,7 @@ def _env_threadless_compliant() -> bool: DEFAULT_HTTPS_ACCESS_LOG_FORMAT = '{client_ip}:{client_port} - ' + \ '{request_method} {server_host}:{server_port} - ' + \ '{response_bytes} bytes - {connection_time_ms} ms' +DEFAULT_NUM_ACCEPTORS = 0 DEFAULT_NUM_WORKERS = 0 DEFAULT_OPEN_FILE_LIMIT = 1024 DEFAULT_PAC_FILE = None diff --git a/proxy/core/acceptor/__init__.py b/proxy/core/acceptor/__init__.py index cca3bcdb4d..7bc92a84fd 100644 --- a/proxy/core/acceptor/__init__.py +++ b/proxy/core/acceptor/__init__.py @@ -12,10 +12,12 @@ from .pool import AcceptorPool from .work import Work from .threadless import Threadless +from .executors import ThreadlessPool __all__ = [ 'Acceptor', 'AcceptorPool', 'Work', 'Threadless', + 'ThreadlessPool', ] diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index d5eb69529b..367e0a6f1d 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -8,52 +8,31 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. """ -import argparse +import socket import logging -import multiprocessing -import multiprocessing.synchronize +import argparse import selectors -import socket import threading - +import multiprocessing +import multiprocessing.synchronize from multiprocessing import connection from multiprocessing.reduction import send_handle, recv_handle -from typing import Optional, Type, Tuple + +from typing import List, Optional, Tuple, Type from .work import Work -from .threadless import Threadless +from .executors import ThreadlessPool from ..connection import TcpClientConnection from ..event import EventQueue, eventNames -from ...common.constants import DEFAULT_THREADLESS -from ...common.flag import flags + +from ..event import EventQueue from ...common.utils import is_threadless from ...common.logger import Logger logger = logging.getLogger(__name__) -flags.add_argument( - '--threadless', - action='store_true', - default=DEFAULT_THREADLESS, - help='Default: ' + ('True' if DEFAULT_THREADLESS else 'False') + '. ' + - 'Enabled by default on Python 3.8+ (mac, linux). ' + - 'When disabled a new thread is spawned ' - 'to handle each client connection.', -) - -flags.add_argument( - '--threaded', - action='store_true', - default=not DEFAULT_THREADLESS, - help='Default: ' + ('True' if not DEFAULT_THREADLESS else 'False') + '. ' + - 'Disabled by default on Python < 3.8 and windows. ' + - 'When enabled a new thread is spawned ' - 'to handle each client connection.', -) - - class Acceptor(multiprocessing.Process): """Work acceptor process. @@ -83,6 +62,8 @@ def __init__( flags: argparse.Namespace, work_klass: Type[Work], lock: multiprocessing.synchronize.Lock, + executor_queues: List[connection.Connection] = [], + executor_pids: List[int] = [], event_queue: Optional[EventQueue] = None, ) -> None: super().__init__() @@ -98,61 +79,16 @@ def __init__( self.work_queue: connection.Connection = work_queue # Worker class self.work_klass = work_klass - # Selector & threadless states + self.executor_queues = executor_queues + self.executor_pids = executor_pids + # Selector self.running = multiprocessing.Event() self.selector: Optional[selectors.DefaultSelector] = None - self.threadless_process: Optional[Threadless] = None - self.threadless_client_queue: Optional[connection.Connection] = None # File descriptor used to accept new work # Currently, a socket fd is assumed. self.sock: Optional[socket.socket] = None - - def start_threadless_process(self) -> None: - pipe = multiprocessing.Pipe() - self.threadless_client_queue = pipe[0] - self.threadless_process = Threadless( - client_queue=pipe[1], - flags=self.flags, - work_klass=self.work_klass, - event_queue=self.event_queue, - ) - self.threadless_process.start() - logger.debug('Started process %d', self.threadless_process.pid) - - def shutdown_threadless_process(self) -> None: - assert self.threadless_process and self.threadless_client_queue - logger.debug('Stopped process %d', self.threadless_process.pid) - self.threadless_process.running.set() - self.threadless_process.join() - self.threadless_client_queue.close() - - def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: - assert self.threadless_process and self.threadless_client_queue - # Accepted client address is empty string for - # unix socket domain, avoid sending empty string - if not self.flags.unix_socket_path: - self.threadless_client_queue.send(addr) - send_handle( - self.threadless_client_queue, - conn.fileno(), - self.threadless_process.pid, - ) - conn.close() - - def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: - work = self.work_klass( - TcpClientConnection(conn, addr), - flags=self.flags, - event_queue=self.event_queue, - ) - work_thread = threading.Thread(target=work.run) - work_thread.daemon = True - work.publish_event( - event_name=eventNames.WORK_STARTED, - event_payload={'fileno': conn.fileno(), 'addr': addr}, - publisher_id=self.__class__.__name__, - ) - work_thread.start() + # Incremented every time work() is called + self._total: int = 0 def run_once(self) -> None: with self.lock: @@ -162,14 +98,7 @@ def run_once(self) -> None: return conn, addr = self.sock.accept() addr = None if addr == '' else addr - if ( - is_threadless(self.flags.threadless, self.flags.threaded) and - self.threadless_client_queue and - self.threadless_process - ): - self._start_threadless_work(conn, addr) - else: - self._start_threaded_work(conn, addr) + self._work(conn, addr) def run(self) -> None: Logger.setup_logger( @@ -186,15 +115,54 @@ def run(self) -> None: ) try: self.selector.register(self.sock, selectors.EVENT_READ) - if is_threadless(self.flags.threadless, self.flags.threaded): - self.start_threadless_process() while not self.running.is_set(): self.run_once() except KeyboardInterrupt: pass finally: self.selector.unregister(self.sock) - if is_threadless(self.flags.threadless, self.flags.threaded): - self.shutdown_threadless_process() self.sock.close() logger.debug('Acceptor#%d shutdown', self.idd) + + def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: + if is_threadless(self.flags.threadless, self.flags.threaded): + self._start_threadless_work(conn, addr) + else: + self._start_threaded_work(conn, addr) + self._total += 1 + + def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: + # Index of worker to which this work should be dispatched + # Use round-robin strategy by default + index = self._total % self.flags.num_workers + # Accepted client address is empty string for + # unix socket domain, avoid sending empty string + if not self.flags.unix_socket_path: + self.executor_queues[index].send(addr) + send_handle( + self.executor_queues[index], + conn.fileno(), + self.executor_pids[index], + ) + conn.close() + logger.debug('Dispatched work#{0} from acceptor#{1} to worker#{1}'.format( + self._total, self.idd, index)) + + def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: + work = self.work_klass( + TcpClientConnection(conn, addr), + flags=self.flags, + event_queue=self.event_queue, + ) + # TODO: Keep reference to threads and join during shutdown. + # This will ensure connections are not abruptly closed on shutdown. + work_thread = threading.Thread(target=work.run) + work_thread.daemon = True + work_thread.start() + work.publish_event( + event_name=eventNames.WORK_STARTED, + event_payload={'fileno': conn.fileno(), 'addr': addr}, + publisher_id=self.__class__.__name__, + ) + logger.debug('Started work#{0} in a new thread#{1}'.format( + self._total, work_thread.ident)) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py new file mode 100644 index 0000000000..06f41a1c07 --- /dev/null +++ b/proxy/core/acceptor/executors.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +import logging +import argparse +import multiprocessing + +from multiprocessing import connection + +from typing import Optional, List, Type + +from .work import Work +from .threadless import Threadless + +from ..event import EventQueue +from ..event import EventQueue + +from ...common.flag import flags +from ...common.utils import is_threadless +from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS + +logger = logging.getLogger(__name__) + + +flags.add_argument( + '--threadless', + action='store_true', + default=DEFAULT_THREADLESS, + help='Default: ' + ('True' if DEFAULT_THREADLESS else 'False') + '. ' + + 'Enabled by default on Python 3.8+ (mac, linux). ' + + 'When disabled a new thread is spawned ' + 'to handle each client connection.', +) + +flags.add_argument( + '--threaded', + action='store_true', + default=not DEFAULT_THREADLESS, + help='Default: ' + ('True' if not DEFAULT_THREADLESS else 'False') + '. ' + + 'Disabled by default on Python < 3.8 and windows. ' + + 'When enabled a new thread is spawned ' + 'to handle each client connection.', +) + +flags.add_argument( + '--num-workers', + type=int, + default=DEFAULT_NUM_WORKERS, + help='Defaults to number of CPU cores.', +) + + +class ThreadlessPool: + """Manages lifecycle of threadless pool and delegates work to them + using a round-robin strategy. + """ + + def __init__( + self, flags: argparse.Namespace, + work_klass: Type[Work], + event_queue: Optional[EventQueue] = None, + ) -> None: + self.flags = flags + self.work_klass = work_klass + self.event_queue = event_queue + self._workers: List[Threadless] = [] + self.work_queues: List[connection.Connection] = [] + self.work_pids: List[int] = [] + + def setup(self) -> None: + """Setup threadless processes.""" + if is_threadless(self.flags.threadless, self.flags.threaded): + for index in range(self.flags.num_workers): + self._start_worker(index) + logger.debug('Started {0} threadless workers'.format( + self.flags.num_workers)) + + def shutdown(self) -> None: + """Shutdown threadless processes.""" + if is_threadless(self.flags.threadless, self.flags.threaded): + for _ in range(self.flags.num_workers): + self._shutdown_worker() + logger.debug('Stopped {0} threadless workers'.format( + self.flags.num_workers)) + + def _start_worker(self, index: int) -> None: + pipe = multiprocessing.Pipe() + self.work_queues.append(pipe[0]) + w = Threadless( + client_queue=pipe[1], + flags=self.flags, + work_klass=self.work_klass, + event_queue=self.event_queue, + ) + self._workers.append(w) + w.start() + self.work_pids.append(w.pid) + logger.debug('Started threadless#%d process#%d', index, w.pid) + + def _shutdown_worker(self) -> None: + w = self._workers.pop() + pid = w.pid + w.running.set() + w.join() + self.work_pids.pop() + self.work_queues.pop().close() + logger.debug('Stopped threadless process#%d', pid) diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 40868b79b2..493a95fa4f 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -25,10 +25,10 @@ from ..event import EventQueue -from ...common.utils import bytes_, is_threadless +from ...common.utils import bytes_ from ...common.flag import flags from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME -from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_PORT +from ...common.constants import DEFAULT_NUM_ACCEPTORS, DEFAULT_PORT from ...common.constants import DEFAULT_PID_FILE logger = logging.getLogger(__name__) @@ -65,9 +65,9 @@ ) flags.add_argument( - '--num-workers', + '--num-acceptors', type=int, - default=DEFAULT_NUM_WORKERS, + default=DEFAULT_NUM_ACCEPTORS, help='Defaults to number of CPU cores.', ) @@ -100,6 +100,8 @@ class AcceptorPool: def __init__( self, flags: argparse.Namespace, work_klass: Type[Work], + executor_queues: List[connection.Connection] = [], + executor_pids: List[int] = [], event_queue: Optional[EventQueue] = None, ) -> None: self.flags = flags @@ -112,7 +114,9 @@ def __init__( # Work queue used to share file descriptor with acceptor processes self.work_queues: List[connection.Connection] = [] # Work class implementation - self.work_klass = work_klass + self.work_klass: Type[Work] = work_klass + self.executor_queues: List[connection.Connection] = executor_queues + self.executor_pids: List[int] = executor_pids def __enter__(self) -> 'AcceptorPool': self.setup() @@ -141,7 +145,7 @@ def setup(self) -> None: self._start_acceptors() # Send file descriptor to all acceptor processes. assert self.socket is not None - for index in range(self.flags.num_workers): + for index in range(self.flags.num_acceptors): send_handle( self.work_queues[index], self.socket.fileno(), @@ -151,7 +155,7 @@ def setup(self) -> None: self.socket.close() def shutdown(self) -> None: - logger.info('Shutting down %d workers' % self.flags.num_workers) + logger.info('Shutting down %d acceptors' % self.flags.num_acceptors) for acceptor in self.acceptors: acceptor.running.set() for acceptor in self.acceptors: @@ -177,7 +181,7 @@ def _listen_server_port(self) -> None: def _start_acceptors(self) -> None: """Start acceptor processes.""" - for acceptor_id in range(self.flags.num_workers): + for acceptor_id in range(self.flags.num_acceptors): work_queue = multiprocessing.Pipe() acceptor = Acceptor( idd=acceptor_id, @@ -186,6 +190,8 @@ def _start_acceptors(self) -> None: work_klass=self.work_klass, lock=LOCK, event_queue=self.event_queue, + executor_queues=self.executor_queues, + executor_pids=self.executor_pids, ) acceptor.start() logger.debug( @@ -195,10 +201,7 @@ def _start_acceptors(self) -> None: ) self.acceptors.append(acceptor) self.work_queues.append(work_queue[0]) - mode = 'threadless' if is_threadless( - self.flags.threadless, self.flags.threaded, - ) else 'threaded' - logger.info('Started %d %s workers' % (self.flags.num_workers, mode)) + logger.info('Started %d acceptors' % self.flags.num_acceptors) def _write_pid_file(self) -> None: if self.flags.pid_file is not None: diff --git a/proxy/proxy.py b/proxy/proxy.py index fb5983dcad..486cb071f7 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -17,7 +17,7 @@ from proxy.core.acceptor.work import Work -from .core.acceptor import AcceptorPool +from .core.acceptor import AcceptorPool, ThreadlessPool from .http.handler import HttpProtocolHandler from .core.event import EventManager from .common.flag import FlagParser, flags @@ -110,6 +110,7 @@ def __init__( self.work_klass: Type[Work] = work_klass self.flags = FlagParser.initialize(input_args, **opts) self.pool: Optional[AcceptorPool] = None + self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None def __enter__(self) -> 'Proxy': @@ -117,10 +118,21 @@ def __enter__(self) -> 'Proxy': logger.info('Core Event enabled') self.event_manager = EventManager() self.event_manager.setup() + event_queue = self.event_manager.queue \ + if self.event_manager is not None \ + else None + self.executors = ThreadlessPool( + flags=self.flags, + work_klass=self.work_klass, + event_queue=event_queue, + ) + self.executors.setup() self.pool = AcceptorPool( flags=self.flags, work_klass=self.work_klass, - event_queue=self.event_manager.queue if self.event_manager is not None else None, + event_queue=event_queue, + executor_queues=self.executors.work_queues, + executor_pids=self.executors.work_pids, ) self.pool.setup() assert self.pool is not None @@ -144,6 +156,8 @@ def __exit__( ) -> None: assert self.pool self.pool.shutdown() + assert self.executors + self.executors.shutdown() if self.flags.enable_events: assert self.event_manager is not None self.event_manager.shutdown() From 3cddf7b8183da130b5515261df48547432542f6b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 9 Nov 2021 00:50:36 +0000 Subject: [PATCH 03/26] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- proxy/core/acceptor/acceptor.py | 14 ++++++++++---- proxy/core/acceptor/executors.py | 14 ++++++++++---- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 367e0a6f1d..1797d51cff 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -145,8 +145,11 @@ def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, self.executor_pids[index], ) conn.close() - logger.debug('Dispatched work#{0} from acceptor#{1} to worker#{1}'.format( - self._total, self.idd, index)) + logger.debug( + 'Dispatched work#{0} from acceptor#{1} to worker#{1}'.format( + self._total, self.idd, index, + ), + ) def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: work = self.work_klass( @@ -164,5 +167,8 @@ def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, in event_payload={'fileno': conn.fileno(), 'addr': addr}, publisher_id=self.__class__.__name__, ) - logger.debug('Started work#{0} in a new thread#{1}'.format( - self._total, work_thread.ident)) + logger.debug( + 'Started work#{0} in a new thread#{1}'.format( + self._total, work_thread.ident, + ), + ) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 06f41a1c07..2dd7d50344 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -79,16 +79,22 @@ def setup(self) -> None: if is_threadless(self.flags.threadless, self.flags.threaded): for index in range(self.flags.num_workers): self._start_worker(index) - logger.debug('Started {0} threadless workers'.format( - self.flags.num_workers)) + logger.debug( + 'Started {0} threadless workers'.format( + self.flags.num_workers, + ), + ) def shutdown(self) -> None: """Shutdown threadless processes.""" if is_threadless(self.flags.threadless, self.flags.threaded): for _ in range(self.flags.num_workers): self._shutdown_worker() - logger.debug('Stopped {0} threadless workers'.format( - self.flags.num_workers)) + logger.debug( + 'Stopped {0} threadless workers'.format( + self.flags.num_workers, + ), + ) def _start_worker(self, index: int) -> None: pipe = multiprocessing.Pipe() From 50b1e14ba8942c1ba681bdc8d6f64b1c59bdacaf Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 06:39:30 +0530 Subject: [PATCH 04/26] Add `--num_acceptors` flag and better load balancing --- proxy/common/flag.py | 7 ++++++- proxy/core/acceptor/acceptor.py | 9 ++++++--- proxy/core/acceptor/executors.py | 17 +++++++++++++++-- proxy/core/acceptor/pool.py | 4 ++-- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/proxy/common/flag.py b/proxy/common/flag.py index a340617088..62e7fcd961 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -22,7 +22,7 @@ from .plugins import Plugins from .types import IpAddress from .utils import text_, bytes_, is_py2, set_open_file_limit -from .constants import COMMA, DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_NUM_WORKERS +from .constants import COMMA, DEFAULT_DATA_DIRECTORY_PATH, DEFAULT_NUM_ACCEPTORS, DEFAULT_NUM_WORKERS from .constants import DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE from .constants import PLUGIN_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL from .constants import PLUGIN_HTTP_PROXY, PLUGIN_INSPECT_TRAFFIC, PLUGIN_PAC_FILE @@ -247,6 +247,11 @@ def initialize( args.num_workers = cast( int, num_workers if num_workers > 0 else multiprocessing.cpu_count(), ) + num_acceptors = opts.get('num_acceptors', args.num_acceptors) + num_acceptors = num_acceptors if num_acceptors is not None else DEFAULT_NUM_ACCEPTORS + args.num_acceptors = cast( + int, num_acceptors if num_acceptors > 0 else multiprocessing.cpu_count(), + ) args.static_server_dir = cast( str, opts.get( diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 367e0a6f1d..412431713e 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -133,8 +133,11 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: # Index of worker to which this work should be dispatched - # Use round-robin strategy by default - index = self._total % self.flags.num_workers + # Use round-robin strategy by default. + # + # By default all acceptors will start sending work to + # 1st workers. To randomize, we offset index by idd. + index = (self._total + self.idd) % self.flags.num_workers # Accepted client address is empty string for # unix socket domain, avoid sending empty string if not self.flags.unix_socket_path: @@ -145,7 +148,7 @@ def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, self.executor_pids[index], ) conn.close() - logger.debug('Dispatched work#{0} from acceptor#{1} to worker#{1}'.format( + logger.debug('Dispatched work#{0} from acceptor#{1} to worker#{2}'.format( self._total, self.idd, index)) def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 06f41a1c07..c3b151328d 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -15,6 +15,7 @@ from multiprocessing import connection from typing import Optional, List, Type +from types import TracebackType from .work import Work from .threadless import Threadless @@ -74,12 +75,24 @@ def __init__( self.work_queues: List[connection.Connection] = [] self.work_pids: List[int] = [] + def __enter__(self) -> 'ThreadlessPool': + self.setup() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.shutdown() + def setup(self) -> None: """Setup threadless processes.""" if is_threadless(self.flags.threadless, self.flags.threaded): for index in range(self.flags.num_workers): self._start_worker(index) - logger.debug('Started {0} threadless workers'.format( + logger.info('Started {0} threadless workers'.format( self.flags.num_workers)) def shutdown(self) -> None: @@ -87,7 +100,7 @@ def shutdown(self) -> None: if is_threadless(self.flags.threadless, self.flags.threaded): for _ in range(self.flags.num_workers): self._shutdown_worker() - logger.debug('Stopped {0} threadless workers'.format( + logger.info('Stopped {0} threadless workers'.format( self.flags.num_workers)) def _start_worker(self, index: int) -> None: diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 493a95fa4f..473d515a18 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -142,7 +142,7 @@ def setup(self) -> None: # the server port when `--port=0` is used. assert self.socket self.flags.port = self.socket.getsockname()[1] - self._start_acceptors() + self._start() # Send file descriptor to all acceptor processes. assert self.socket is not None for index in range(self.flags.num_acceptors): @@ -179,7 +179,7 @@ def _listen_server_port(self) -> None: self.socket.listen(self.flags.backlog) self.socket.setblocking(False) - def _start_acceptors(self) -> None: + def _start(self) -> None: """Start acceptor processes.""" for acceptor_id in range(self.flags.num_acceptors): work_queue = multiprocessing.Pipe() From 96d1b5191c874c8799b9a7dfa99b375ee3c48f33 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 06:41:52 +0530 Subject: [PATCH 05/26] Remove unused --- proxy/core/acceptor/acceptor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 3d63ede2bf..57ecbbb0c7 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -21,7 +21,6 @@ from typing import List, Optional, Tuple, Type from .work import Work -from .executors import ThreadlessPool from ..connection import TcpClientConnection from ..event import EventQueue, eventNames From f7aace95d63e0e501ae6bfe7e251528711cffb8c Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 06:53:41 +0530 Subject: [PATCH 06/26] Lint errors --- proxy/core/acceptor/acceptor.py | 9 ++++----- proxy/core/acceptor/executors.py | 2 +- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 57ecbbb0c7..f72f8ba055 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -25,7 +25,6 @@ from ..connection import TcpClientConnection from ..event import EventQueue, eventNames -from ..event import EventQueue from ...common.utils import is_threadless from ...common.logger import Logger @@ -61,8 +60,8 @@ def __init__( flags: argparse.Namespace, work_klass: Type[Work], lock: multiprocessing.synchronize.Lock, - executor_queues: List[connection.Connection] = [], - executor_pids: List[int] = [], + executor_queues: List[connection.Connection], + executor_pids: List[int], event_queue: Optional[EventQueue] = None, ) -> None: super().__init__() @@ -148,7 +147,7 @@ def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, ) conn.close() logger.debug( - 'Dispatched work#{0} from acceptor#{1} to worker#{1}'.format( + 'Dispatched work#{0} from acceptor#{1} to worker#{2}'.format( self._total, self.idd, index, ), ) @@ -170,7 +169,7 @@ def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, in publisher_id=self.__class__.__name__, ) logger.debug( - 'Started work#{0} in a new thread#{1}'.format( + 'Started work#{0} in thread#{1}'.format( self._total, work_thread.ident, ), ) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 544e1f36ff..d0056393dd 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -20,7 +20,6 @@ from .work import Work from .threadless import Threadless -from ..event import EventQueue from ..event import EventQueue from ...common.flag import flags @@ -120,6 +119,7 @@ def _start_worker(self, index: int) -> None: ) self._workers.append(w) w.start() + assert w.pid self.work_pids.append(w.pid) logger.debug('Started threadless#%d process#%d', index, w.pid) From a8aeaf5a4971a089d287aaffa5a11aa79376ef23 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 06:56:00 +0530 Subject: [PATCH 07/26] Another arg not kwarg --- proxy/core/acceptor/pool.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 473d515a18..95bc515401 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -100,8 +100,8 @@ class AcceptorPool: def __init__( self, flags: argparse.Namespace, work_klass: Type[Work], - executor_queues: List[connection.Connection] = [], - executor_pids: List[int] = [], + executor_queues: List[connection.Connection], + executor_pids: List[int], event_queue: Optional[EventQueue] = None, ) -> None: self.flags = flags From 24c661cdf759cf26c9ab6cc31d3a342a40ae0369 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 07:21:58 +0530 Subject: [PATCH 08/26] Move start work staticmethods within ExecutorPool --- proxy/core/acceptor/acceptor.py | 85 +++++++++++++------------------- proxy/core/acceptor/executors.py | 60 +++++++++++++++++++++- 2 files changed, 92 insertions(+), 53 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index f72f8ba055..2731ba646e 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -12,18 +12,19 @@ import logging import argparse import selectors -import threading import multiprocessing import multiprocessing.synchronize + from multiprocessing import connection -from multiprocessing.reduction import send_handle, recv_handle +from multiprocessing.reduction import recv_handle from typing import List, Optional, Tuple, Type +from proxy.core.acceptor.executors import ThreadlessPool + from .work import Work -from ..connection import TcpClientConnection -from ..event import EventQueue, eventNames +from ..event import EventQueue from ...common.utils import is_threadless from ...common.logger import Logger @@ -124,52 +125,34 @@ def run(self) -> None: def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: if is_threadless(self.flags.threadless, self.flags.threaded): - self._start_threadless_work(conn, addr) + # Index of worker to which this work should be dispatched + # Use round-robin strategy by default. + # + # By default all acceptors will start sending work to + # 1st workers. To randomize, we offset index by idd. + index = (self._total + self.idd) % self.flags.num_workers + ThreadlessPool.start_threadless_work( + self.executor_pids[index], + self.executor_queues[index], + conn, + addr, + self.flags.unix_socket_path, + ) + logger.debug( + 'Dispatched work#{0} from acceptor#{1} to worker#{2}'.format( + self._total, self.idd, index, + ), + ) else: - self._start_threaded_work(conn, addr) + work, thread = ThreadlessPool.start_threaded_work( + self.flags, self.work_klass, + conn, addr, + event_queue=self.event_queue, + publisher_id=self.__class__.__name__, + ) + logger.debug( + 'Started work#{0} in thread#{1}'.format( + self._total, thread.ident, + ), + ) self._total += 1 - - def _start_threadless_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: - # Index of worker to which this work should be dispatched - # Use round-robin strategy by default. - # - # By default all acceptors will start sending work to - # 1st workers. To randomize, we offset index by idd. - index = (self._total + self.idd) % self.flags.num_workers - # Accepted client address is empty string for - # unix socket domain, avoid sending empty string - if not self.flags.unix_socket_path: - self.executor_queues[index].send(addr) - send_handle( - self.executor_queues[index], - conn.fileno(), - self.executor_pids[index], - ) - conn.close() - logger.debug( - 'Dispatched work#{0} from acceptor#{1} to worker#{2}'.format( - self._total, self.idd, index, - ), - ) - - def _start_threaded_work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: - work = self.work_klass( - TcpClientConnection(conn, addr), - flags=self.flags, - event_queue=self.event_queue, - ) - # TODO: Keep reference to threads and join during shutdown. - # This will ensure connections are not abruptly closed on shutdown. - work_thread = threading.Thread(target=work.run) - work_thread.daemon = True - work_thread.start() - work.publish_event( - event_name=eventNames.WORK_STARTED, - event_payload={'fileno': conn.fileno(), 'addr': addr}, - publisher_id=self.__class__.__name__, - ) - logger.debug( - 'Started work#{0} in thread#{1}'.format( - self._total, work_thread.ident, - ), - ) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index d0056393dd..38fd601d74 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -8,19 +8,23 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. """ +import socket import logging import argparse +import threading import multiprocessing from multiprocessing import connection +from multiprocessing.reduction import send_handle -from typing import Optional, List, Type +from typing import Optional, List, Tuple, Type from types import TracebackType from .work import Work from .threadless import Threadless -from ..event import EventQueue +from ..connection import TcpClientConnection +from ..event import EventQueue, eventNames from ...common.flag import flags from ...common.utils import is_threadless @@ -60,6 +64,12 @@ class ThreadlessPool: """Manages lifecycle of threadless pool and delegates work to them using a round-robin strategy. + + Example usage: + + with ThreadlessPool(flags=..., work_klass=...) as pool: + while True: + time.sleep(1) """ def __init__( @@ -131,3 +141,49 @@ def _shutdown_worker(self) -> None: self.work_pids.pop() self.work_queues.pop().close() logger.debug('Stopped threadless process#%d', pid) + + @staticmethod + def start_threadless_work( + worker_pid: int, + work_queue: connection.Connection, + conn: socket.socket, + addr: Optional[Tuple[str, int]], + unix_socket_path: Optional[str] = None, + ) -> None: + # Accepted client address is empty string for + # unix socket domain, avoid sending empty string + if not unix_socket_path: + work_queue.send(addr) + send_handle( + work_queue, + conn.fileno(), + worker_pid, + ) + conn.close() + + @staticmethod + def start_threaded_work( + flags: argparse.ArgumentParser, + work_klass: Type[Work], + conn: socket.socket, + addr: Optional[Tuple[str, int]], + event_queue: Optional[EventQueue] = None, + publisher_id: Optional[str] = None, + ) -> threading.Thread: + work = work_klass( + TcpClientConnection(conn, addr), + flags=flags, + event_queue=event_queue, + ) + # TODO: Keep reference to threads and join during shutdown. + # This will ensure connections are not abruptly closed on shutdown. + thread = threading.Thread(target=work.run) + thread.daemon = True + thread.start() + work.publish_event( + event_name=eventNames.WORK_STARTED, + event_payload={'fileno': conn.fileno(), 'addr': addr}, + publisher_id=publisher_id or 'thread#{0}'.format( + thread.ident), + ) + return (work, thread) From 6492cf444b6dfb62745f69c414fb7472a183d480 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 07:24:16 +0530 Subject: [PATCH 09/26] mypy fixes --- proxy/core/acceptor/executors.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 38fd601d74..5b5a2a7432 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -163,13 +163,13 @@ def start_threadless_work( @staticmethod def start_threaded_work( - flags: argparse.ArgumentParser, + flags: argparse.Namespace, work_klass: Type[Work], conn: socket.socket, addr: Optional[Tuple[str, int]], event_queue: Optional[EventQueue] = None, publisher_id: Optional[str] = None, - ) -> threading.Thread: + ) -> Tuple[Work, threading.Thread]: work = work_klass( TcpClientConnection(conn, addr), flags=flags, @@ -184,6 +184,7 @@ def start_threaded_work( event_name=eventNames.WORK_STARTED, event_payload={'fileno': conn.fileno(), 'addr': addr}, publisher_id=publisher_id or 'thread#{0}'.format( - thread.ident), + thread.ident, + ), ) return (work, thread) From 3402eb94f857cba2ac765207da8beec2c93ab6a7 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 07:37:13 +0530 Subject: [PATCH 10/26] Update README with `--num-acceptors` flag --- README.md | 184 +++++++++++++++---------------- proxy/core/acceptor/executors.py | 4 +- proxy/testing/test_case.py | 1 + 3 files changed, 92 insertions(+), 97 deletions(-) diff --git a/README.md b/README.md index 4d63bd82c4..cc89a1c688 100644 --- a/README.md +++ b/README.md @@ -265,9 +265,10 @@ Simply type `proxy` on command line to start with default configuration. ```console ❯ proxy -...[redacted]... - Loaded plugin proxy.http_proxy.HttpProxyPlugin -...[redacted]... - Starting 8 threadless workers -...[redacted]... - Started server on ::1:8899 +...[redacted]... - Loaded plugin proxy.http.proxy.HttpProxyPlugin +...[redacted]... - Started 8 threadless workers +...[redacted]... - Started 8 acceptors +...[redacted]... - Listening on 127.0.0.1:8899 ``` ### Understanding logs @@ -279,10 +280,15 @@ Things to notice from above logs: - As name suggests, this core plugin adds `http(s)` proxy server capabilities to `proxy.py` instance - `Started N threadless workers` - - By default, `proxy.py` will start as many workers as there are CPU cores on the machine + - By default, `proxy.py` will start as many worker processes as there are CPU cores on the machine - Use `--num-workers` flag to customize number of worker processes - See [Threads vs Threadless](#threads-vs-threadless) to understand how to control execution mode +- `Started N acceptors` + - By default, `proxy.py` will start as many acceptor processes as there are CPU cores on the machine + - Use `--num-acceptors` flag to customize number of acceptor processes + - See [High Level Architecture](#high-level-architecture) to understand relationship between acceptors and workers + - `Started server on ::1:8899` - By default, `proxy.py` listens on IPv6 `::1`, which is equivalent of IPv4 `127.0.0.1` - If you want to access `proxy.py` from external host, use `--hostname ::` or `--hostname 0.0.0.0` or bind to any other interface available on your machine. @@ -1318,7 +1324,7 @@ Note that: 1. `proxy.TestCase` overrides `unittest.TestCase.run()` method to setup and teardown `proxy.py`. 2. `proxy.py` server will listen on a random available port on the system. This random port is available as `self.PROXY.pool.flags.port` within your test cases. -3. Only a single worker is started by default (`--num-workers 1`) for faster setup and teardown. +3. Only a single acceptor and worker is started by default (`--num-workers 1 --num-acceptors 1`) for faster setup and teardown. 4. Most importantly, `proxy.TestCase` also ensures `proxy.py` server is up and running before proceeding with execution of tests. By default, `proxy.TestCase` will wait for `10 seconds` for `proxy.py` server to start, @@ -1333,7 +1339,8 @@ Example: class TestProxyPyEmbedded(TestCase): PROXY_PY_STARTUP_FLAGS = [ - '--num-workers', '1', + '--num-workers', '2', + '--num-acceptors', '1', '--enable-web-server', ] @@ -1363,6 +1370,7 @@ class TestProxyPyEmbedded(unittest.TestCase): def run(self, result: Optional[unittest.TestResult] = None) -> Any: with proxy.start([ '--num-workers', '1', + '--num-acceptors', '1', '--port', '... random port ...']): super().run(result) ``` @@ -1844,11 +1852,14 @@ ssl.SSLEOFError: EOF occurred in violation of protocol (_ssl.c:997) will try to utilize all available CPU cores to it for accepting new client connections. This is achieved by starting `AcceptorPool` which listens on configured server port. Then, `AcceptorPool` starts `Acceptor` -processes (`--num-workers`) to accept incoming client connections. +processes (`--num-acceptors`) to accept incoming client connections. +Alongside, if `--threadless` is enabled, `ThreadlessPool` is setup +which starts `Threadless` processes (`--num-workers`) to handle +the incoming client connections. Each `Acceptor` process delegates the accepted client connection -to a `Work` class. Currently, `HttpProtocolHandler` is the default -work klass hardcoded into the code. +to a threadless process via `Work` class. Currently, `HttpProtocolHandler` +is the default work klass. `HttpProtocolHandler` simply assumes that incoming clients will follow HTTP specification. Specific HTTP proxy and HTTP server implementations @@ -1917,23 +1928,18 @@ for list of tests. ```console ❯ proxy -h usage: proxy [-h] [--enable-events] [--enable-conn-pool] [--threadless] [--threaded] - [--pid-file PID_FILE] [--backlog BACKLOG] [--hostname HOSTNAME] - [--port PORT] [--num-workers NUM_WORKERS] - [--unix-socket-path UNIX_SOCKET_PATH] - [--client-recvbuf-size CLIENT_RECVBUF_SIZE] [--key-file KEY_FILE] - [--timeout TIMEOUT] [--version] [--log-level LOG_LEVEL] - [--log-file LOG_FILE] [--log-format LOG_FORMAT] + [--num-workers NUM_WORKERS] [--pid-file PID_FILE] [--backlog BACKLOG] [--hostname HOSTNAME] + [--port PORT] [--num-acceptors NUM_ACCEPTORS] [--unix-socket-path UNIX_SOCKET_PATH] + [--client-recvbuf-size CLIENT_RECVBUF_SIZE] [--key-file KEY_FILE] [--timeout TIMEOUT] + [--version] [--log-level LOG_LEVEL] [--log-file LOG_FILE] [--log-format LOG_FORMAT] [--open-file-limit OPEN_FILE_LIMIT] [--plugins PLUGINS] [--enable-dashboard] - [--disable-http-proxy] [--ca-key-file CA_KEY_FILE] - [--ca-cert-dir CA_CERT_DIR] [--ca-cert-file CA_CERT_FILE] - [--ca-file CA_FILE] [--ca-signing-key-file CA_SIGNING_KEY_FILE] + [--disable-http-proxy] [--ca-key-file CA_KEY_FILE] [--ca-cert-dir CA_CERT_DIR] + [--ca-cert-file CA_CERT_FILE] [--ca-file CA_FILE] [--ca-signing-key-file CA_SIGNING_KEY_FILE] [--cert-file CERT_FILE] [--disable-headers DISABLE_HEADERS] - [--server-recvbuf-size SERVER_RECVBUF_SIZE] [--basic-auth BASIC_AUTH] - [--cache-dir CACHE_DIR] [--filtered-upstream-hosts FILTERED_UPSTREAM_HOSTS] - [--enable-web-server] [--enable-static-server] - [--static-server-dir STATIC_SERVER_DIR] [--pac-file PAC_FILE] - [--pac-file-url-path PAC_FILE_URL_PATH] - [--filtered-client-ips FILTERED_CLIENT_IPS] + [--server-recvbuf-size SERVER_RECVBUF_SIZE] [--basic-auth BASIC_AUTH] [--cache-dir CACHE_DIR] + [--filtered-upstream-hosts FILTERED_UPSTREAM_HOSTS] [--enable-web-server] + [--enable-static-server] [--static-server-dir STATIC_SERVER_DIR] [--pac-file PAC_FILE] + [--pac-file-url-path PAC_FILE_URL_PATH] [--filtered-client-ips FILTERED_CLIENT_IPS] [--filtered-url-regex-config FILTERED_URL_REGEX_CONFIG] [--cloudflare-dns-mode CLOUDFLARE_DNS_MODE] @@ -1941,111 +1947,99 @@ proxy.py v2.4.0 options: -h, --help show this help message and exit - --enable-events Default: False. Enables core to dispatch lifecycle events. - Plugins can be used to subscribe for core events. + --enable-events Default: False. Enables core to dispatch lifecycle events. Plugins can be used + to subscribe for core events. --enable-conn-pool Default: False. (WIP) Enable upstream connection pooling. - --threadless Default: True. Enabled by default on Python 3.8+ (mac, linux). - When disabled a new thread is spawned to handle each client - connection. - --threaded Default: False. Disabled by default on Python < 3.8 and - windows. When enabled a new thread is spawned to handle each - client connection. + --threadless Default: True. Enabled by default on Python 3.8+ (mac, linux). When disabled a + new thread is spawned to handle each client connection. + --threaded Default: False. Disabled by default on Python < 3.8 and windows. When enabled a + new thread is spawned to handle each client connection. + --num-workers NUM_WORKERS + Defaults to number of CPU cores. --pid-file PID_FILE Default: None. Save parent process ID to a file. - --backlog BACKLOG Default: 100. Maximum number of pending connections to proxy - server + --backlog BACKLOG Default: 100. Maximum number of pending connections to proxy server --hostname HOSTNAME Default: ::1. Server IP address. --port PORT Default: 8899. Server port. - --num-workers NUM_WORKERS + --num-acceptors NUM_ACCEPTORS Defaults to number of CPU cores. --unix-socket-path UNIX_SOCKET_PATH - Default: None. Unix socket path to use. When provided --host - and --port flags are ignored + Default: None. Unix socket path to use. When provided --host and --port flags + are ignored --client-recvbuf-size CLIENT_RECVBUF_SIZE - Default: 1 MB. Maximum amount of data received from the client - in a single recv() operation. Bump this value for faster - uploads at the expense of increased RAM. - --key-file KEY_FILE Default: None. Server key file to enable end-to-end TLS - encryption with clients. If used, must also pass --cert-file. - --timeout TIMEOUT Default: 10.0. Number of seconds after which an inactive - connection must be dropped. Inactivity is defined by no data - sent or received by the client. + Default: 1 MB. Maximum amount of data received from the client in a single + recv() operation. Bump this value for faster uploads at the expense of increased + RAM. + --key-file KEY_FILE Default: None. Server key file to enable end-to-end TLS encryption with clients. + If used, must also pass --cert-file. + --timeout TIMEOUT Default: 10.0. Number of seconds after which an inactive connection must be + dropped. Inactivity is defined by no data sent or received by the client. --version, -v Prints proxy.py version. --log-level LOG_LEVEL - Valid options: DEBUG, INFO (default), WARNING, ERROR, - CRITICAL. Both upper and lowercase values are allowed. You may - also simply use the leading character e.g. --log-level d + Valid options: DEBUG, INFO (default), WARNING, ERROR, CRITICAL. Both upper and + lowercase values are allowed. You may also simply use the leading character e.g. + --log-level d --log-file LOG_FILE Default: sys.stdout. Log file destination. --log-format LOG_FORMAT Log format for Python logger. --open-file-limit OPEN_FILE_LIMIT - Default: 1024. Maximum number of files (TCP connections) that - proxy.py can open concurrently. + Default: 1024. Maximum number of files (TCP connections) that proxy.py can open + concurrently. --plugins PLUGINS Comma separated plugins --enable-dashboard Default: False. Enables proxy.py dashboard. --disable-http-proxy Default: False. Whether to disable proxy.HttpProxyPlugin. --ca-key-file CA_KEY_FILE - Default: None. CA key to use for signing dynamically generated - HTTPS certificates. If used, must also pass --ca-cert-file and - --ca-signing-key-file + Default: None. CA key to use for signing dynamically generated HTTPS + certificates. If used, must also pass --ca-cert-file and --ca-signing-key-file --ca-cert-dir CA_CERT_DIR - Default: ~/.proxy.py. Directory to store dynamically generated - certificates. Also see --ca-key-file, --ca-cert-file and --ca- - signing-key-file + Default: ~/.proxy.py. Directory to store dynamically generated certificates. + Also see --ca-key-file, --ca-cert-file and --ca-signing-key-file --ca-cert-file CA_CERT_FILE - Default: None. Signing certificate to use for signing - dynamically generated HTTPS certificates. If used, must also - pass --ca-key-file and --ca-signing-key-file - --ca-file CA_FILE Default: - /Users/abhinavsingh/Dev/proxy.py/venv310/lib/python3.10/site- - packages/certifi/cacert.pem. Provide path to custom CA bundle - for peer certificate verification + Default: None. Signing certificate to use for signing dynamically generated + HTTPS certificates. If used, must also pass --ca-key-file and --ca-signing-key- + file + --ca-file CA_FILE Default: /Users/abhinavsingh/Dev/proxy.py/venv310/lib/python3.10/site- + packages/certifi/cacert.pem. Provide path to custom CA bundle for peer + certificate verification --ca-signing-key-file CA_SIGNING_KEY_FILE - Default: None. CA signing key to use for dynamic generation of - HTTPS certificates. If used, must also pass --ca-key-file and - --ca-cert-file + Default: None. CA signing key to use for dynamic generation of HTTPS + certificates. If used, must also pass --ca-key-file and --ca-cert-file --cert-file CERT_FILE - Default: None. Server certificate to enable end-to-end TLS - encryption with clients. If used, must also pass --key-file. + Default: None. Server certificate to enable end-to-end TLS encryption with + clients. If used, must also pass --key-file. --disable-headers DISABLE_HEADERS - Default: None. Comma separated list of headers to remove - before dispatching client request to upstream server. + Default: None. Comma separated list of headers to remove before dispatching + client request to upstream server. --server-recvbuf-size SERVER_RECVBUF_SIZE - Default: 1 MB. Maximum amount of data received from the server - in a single recv() operation. Bump this value for faster - downloads at the expense of increased RAM. + Default: 1 MB. Maximum amount of data received from the server in a single + recv() operation. Bump this value for faster downloads at the expense of + increased RAM. --basic-auth BASIC_AUTH - Default: No authentication. Specify colon separated - user:password to enable basic authentication. + Default: No authentication. Specify colon separated user:password to enable + basic authentication. --cache-dir CACHE_DIR - Default: A temporary directory. Flag only applicable when - cache plugin is used with on-disk storage. + Default: A temporary directory. Flag only applicable when cache plugin is used + with on-disk storage. --filtered-upstream-hosts FILTERED_UPSTREAM_HOSTS - Default: Blocks Facebook. Comma separated list of IPv4 and - IPv6 addresses. + Default: Blocks Facebook. Comma separated list of IPv4 and IPv6 addresses. --enable-web-server Default: False. Whether to enable proxy.HttpWebServerPlugin. --enable-static-server - Default: False. Enable inbuilt static file server. Optionally, - also use --static-server-dir to serve static content from - custom directory. By default, static file server serves out of - installed proxy.py python module folder. + Default: False. Enable inbuilt static file server. Optionally, also use + --static-server-dir to serve static content from custom directory. By default, + static file server serves out of installed proxy.py python module folder. --static-server-dir STATIC_SERVER_DIR - Default: "public" folder in directory where proxy.py is - placed. This option is only applicable when static server is - also enabled. See --enable-static-server. - --pac-file PAC_FILE A file (Proxy Auto Configuration) or string to serve when the - server receives a direct file request. Using this option - enables proxy.HttpWebServerPlugin. + Default: "public" folder in directory where proxy.py is placed. This option is + only applicable when static server is also enabled. See --enable-static-server. + --pac-file PAC_FILE A file (Proxy Auto Configuration) or string to serve when the server receives a + direct file request. Using this option enables proxy.HttpWebServerPlugin. --pac-file-url-path PAC_FILE_URL_PATH Default: /. Web server path to serve the PAC file. --filtered-client-ips FILTERED_CLIENT_IPS - Default: 127.0.0.1,::1. Comma separated list of IPv4 and IPv6 - addresses. + Default: 127.0.0.1,::1. Comma separated list of IPv4 and IPv6 addresses. --filtered-url-regex-config FILTERED_URL_REGEX_CONFIG - Default: No config. Comma separated list of IPv4 and IPv6 - addresses. + Default: No config. Comma separated list of IPv4 and IPv6 addresses. --cloudflare-dns-mode CLOUDFLARE_DNS_MODE - Default: security. Either "security" (for malware protection) - or "family" (for malware and adult content protection) + Default: security. Either "security" (for malware protection) or "family" (for + malware and adult content protection) Proxy.py not working? Report at: https://github.com/abhinavsingh/proxy.py/issues/new ``` diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 5b5a2a7432..40a3a8a91f 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -101,7 +101,7 @@ def setup(self) -> None: if is_threadless(self.flags.threadless, self.flags.threaded): for index in range(self.flags.num_workers): self._start_worker(index) - logger.debug( + logger.info( 'Started {0} threadless workers'.format( self.flags.num_workers, ), @@ -112,7 +112,7 @@ def shutdown(self) -> None: if is_threadless(self.flags.threadless, self.flags.threaded): for _ in range(self.flags.num_workers): self._shutdown_worker() - logger.debug( + logger.info( 'Stopped {0} threadless workers'.format( self.flags.num_workers, ), diff --git a/proxy/testing/test_case.py b/proxy/testing/test_case.py index cb33d61a79..c2f901021f 100644 --- a/proxy/testing/test_case.py +++ b/proxy/testing/test_case.py @@ -24,6 +24,7 @@ class TestCase(unittest.TestCase): DEFAULT_PROXY_PY_STARTUP_FLAGS = [ '--num-workers', '1', + '--num-acceptors', '1', '--threadless', ] From 3205ccc649069d44ab5c655b3c3125315de08cbc Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Tue, 9 Nov 2021 09:08:46 +0530 Subject: [PATCH 11/26] Rename `Proxy.pool` to `Proxy.acceptors` --- README.md | 6 +++--- proxy/proxy.py | 14 +++++++------- proxy/testing/test_case.py | 4 ++-- tests/testing/test_embed.py | 14 +++++++------- 4 files changed, 19 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index cc89a1c688..094787a372 100644 --- a/README.md +++ b/README.md @@ -1261,10 +1261,10 @@ import proxy if __name__ == '__main__': with proxy.Proxy([]) as p: - print(p.pool.flags.port) + print(p.acceptors.flags.port) ``` -`pool.flags.port` will give you access to the random port allocated by the kernel. +`acceptors.flags.port` will give you access to the random port allocated by the kernel. ## Loading Plugins @@ -1323,7 +1323,7 @@ Note that: 1. `proxy.TestCase` overrides `unittest.TestCase.run()` method to setup and teardown `proxy.py`. 2. `proxy.py` server will listen on a random available port on the system. - This random port is available as `self.PROXY.pool.flags.port` within your test cases. + This random port is available as `self.PROXY.acceptors.flags.port` within your test cases. 3. Only a single acceptor and worker is started by default (`--num-workers 1 --num-acceptors 1`) for faster setup and teardown. 4. Most importantly, `proxy.TestCase` also ensures `proxy.py` server is up and running before proceeding with execution of tests. By default, diff --git a/proxy/proxy.py b/proxy/proxy.py index 486cb071f7..cae527fa39 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -109,7 +109,7 @@ def __init__( ) -> None: self.work_klass: Type[Work] = work_klass self.flags = FlagParser.initialize(input_args, **opts) - self.pool: Optional[AcceptorPool] = None + self.acceptors: Optional[AcceptorPool] = None self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None @@ -127,15 +127,15 @@ def __enter__(self) -> 'Proxy': event_queue=event_queue, ) self.executors.setup() - self.pool = AcceptorPool( + self.acceptors = AcceptorPool( flags=self.flags, work_klass=self.work_klass, event_queue=event_queue, executor_queues=self.executors.work_queues, executor_pids=self.executors.work_pids, ) - self.pool.setup() - assert self.pool is not None + self.acceptors.setup() + assert self.acceptors is not None if self.flags.unix_socket_path: logger.info( 'Listening on %s' % @@ -144,7 +144,7 @@ def __enter__(self) -> 'Proxy': else: logger.info( 'Listening on %s:%s' % - (self.pool.flags.hostname, self.pool.flags.port), + (self.acceptors.flags.hostname, self.acceptors.flags.port), ) return self @@ -154,8 +154,8 @@ def __exit__( exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: - assert self.pool - self.pool.shutdown() + assert self.acceptors + self.acceptors.shutdown() assert self.executors self.executors.shutdown() if self.flags.enable_events: diff --git a/proxy/testing/test_case.py b/proxy/testing/test_case.py index c2f901021f..317acd51c9 100644 --- a/proxy/testing/test_case.py +++ b/proxy/testing/test_case.py @@ -47,8 +47,8 @@ def setUpClass(cls) -> None: ) cls.PROXY.__enter__() - assert cls.PROXY.pool - cls.wait_for_server(cls.PROXY.pool.flags.port) + assert cls.PROXY.acceptors + cls.wait_for_server(cls.PROXY.acceptors.flags.port) @staticmethod def wait_for_server( diff --git a/tests/testing/test_embed.py b/tests/testing/test_embed.py index 2f9f7d0346..7fcd16965e 100644 --- a/tests/testing/test_embed.py +++ b/tests/testing/test_embed.py @@ -34,13 +34,13 @@ class TestProxyPyEmbedded(TestCase): def test_with_proxy(self) -> None: """Makes a HTTP request to in-build web server via proxy server.""" - assert self.PROXY and self.PROXY.pool - with socket_connection(('localhost', self.PROXY.pool.flags.port)) as conn: + assert self.PROXY and self.PROXY.acceptors + with socket_connection(('localhost', self.PROXY.acceptors.flags.port)) as conn: conn.send( build_http_request( - httpMethods.GET, b'http://localhost:%d/' % self.PROXY.pool.flags.port, + httpMethods.GET, b'http://localhost:%d/' % self.PROXY.acceptors.flags.port, headers={ - b'Host': b'localhost:%d' % self.PROXY.pool.flags.port, + b'Host': b'localhost:%d' % self.PROXY.acceptors.flags.port, }, ), ) @@ -73,15 +73,15 @@ def test_proxy_no_vcr(self) -> None: self.make_http_request_using_proxy() def make_http_request_using_proxy(self) -> None: - assert self.PROXY and self.PROXY.pool + assert self.PROXY and self.PROXY.acceptors proxy_handler = urllib.request.ProxyHandler({ - 'http': 'http://localhost:%d' % self.PROXY.pool.flags.port, + 'http': 'http://localhost:%d' % self.PROXY.acceptors.flags.port, }) opener = urllib.request.build_opener(proxy_handler) with self.assertRaises(urllib.error.HTTPError): r: http.client.HTTPResponse = opener.open( 'http://localhost:%d/' % - self.PROXY.pool.flags.port, timeout=10, + self.PROXY.acceptors.flags.port, timeout=10, ) self.assertEqual(r.status, 404) self.assertEqual(r.headers.get('server'), PROXY_AGENT_HEADER_VALUE) From c594731d235d9defec1ef649115ab8bbeceb2c94 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 00:21:09 +0530 Subject: [PATCH 12/26] Add SetupShutdownContextManager abstraction --- proxy/common/context_managers.py | 46 ++++++++++++++++++++++++++ proxy/core/acceptor/executors.py | 16 ++------- proxy/core/acceptor/pool.py | 16 ++------- proxy/core/event/manager.py | 18 ++-------- proxy/proxy.py | 56 ++++++++++++++------------------ 5 files changed, 77 insertions(+), 75 deletions(-) create mode 100644 proxy/common/context_managers.py diff --git a/proxy/common/context_managers.py b/proxy/common/context_managers.py new file mode 100644 index 0000000000..2836b80c19 --- /dev/null +++ b/proxy/common/context_managers.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +""" + proxy.py + ~~~~~~~~ + ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on + Network monitoring, controls & Application development, testing, debugging. + + :copyright: (c) 2013-present by Abhinav Singh and contributors. + :license: BSD, see LICENSE for more details. +""" +from abc import ABC, abstractmethod +from types import TracebackType +from typing import Optional, Type + + +class SetupShutdownContextManager(ABC): + """An abstract context manager which expects + implementations to provide a setup() and shutdown() + implementation instead of __enter__ and __exit__ methods. + + Note that, currently, SetupShutdownContextManager by + default return instance of the class and doesn't provide + implementations to override and return anything else. + + If you want to return anything else but the class instance, + do not use SetupShutdownContextManager. + """ + + def __enter__(self) -> 'SetupShutdownContextManager': + self.setup() + return self + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: + self.shutdown() + + @abstractmethod + def setup(self) -> None: + raise NotImplementedError() + + def shutdown(self) -> None: + raise NotImplementedError() diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 40a3a8a91f..d87f3a2820 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -18,7 +18,6 @@ from multiprocessing.reduction import send_handle from typing import Optional, List, Tuple, Type -from types import TracebackType from .work import Work from .threadless import Threadless @@ -26,6 +25,7 @@ from ..connection import TcpClientConnection from ..event import EventQueue, eventNames +from ...common.context_managers import SetupShutdownContextManager from ...common.flag import flags from ...common.utils import is_threadless from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS @@ -61,7 +61,7 @@ ) -class ThreadlessPool: +class ThreadlessPool(SetupShutdownContextManager): """Manages lifecycle of threadless pool and delegates work to them using a round-robin strategy. @@ -84,18 +84,6 @@ def __init__( self.work_queues: List[connection.Connection] = [] self.work_pids: List[int] = [] - def __enter__(self) -> 'ThreadlessPool': - self.setup() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.shutdown() - def setup(self) -> None: """Setup threadless processes.""" if is_threadless(self.flags.threadless, self.flags.threaded): diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 95bc515401..e01d6a7fb7 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -18,13 +18,13 @@ from multiprocessing.reduction import send_handle from typing import List, Optional, Type -from types import TracebackType from .acceptor import Acceptor from .work import Work from ..event import EventQueue +from ...common.context_managers import SetupShutdownContextManager from ...common.utils import bytes_ from ...common.flag import flags from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME @@ -80,7 +80,7 @@ ) -class AcceptorPool: +class AcceptorPool(SetupShutdownContextManager): """AcceptorPool is a helper class which pre-spawns `Acceptor` processes to utilize all available CPU cores for accepting new work. @@ -118,18 +118,6 @@ def __init__( self.executor_queues: List[connection.Connection] = executor_queues self.executor_pids: List[int] = executor_pids - def __enter__(self) -> 'AcceptorPool': - self.setup() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.shutdown() - def setup(self) -> None: """Setup socket and acceptors.""" self._write_pid_file() diff --git a/proxy/core/event/manager.py b/proxy/core/event/manager.py index 7359c0aff5..1d2fefb09e 100644 --- a/proxy/core/event/manager.py +++ b/proxy/core/event/manager.py @@ -12,12 +12,12 @@ import threading import multiprocessing -from typing import Optional, Type -from types import TracebackType +from typing import Optional from .queue import EventQueue from .dispatcher import EventDispatcher +from ...common.context_managers import SetupShutdownContextManager from ...common.flag import flags from ...common.constants import DEFAULT_ENABLE_EVENTS @@ -33,7 +33,7 @@ ) -class EventManager: +class EventManager(SetupShutdownContextManager): """Event manager is a context manager which provides encapsulation around various setup and shutdown steps to start the eventing core. @@ -46,18 +46,6 @@ def __init__(self) -> None: self.dispatcher_shutdown: Optional[threading.Event] = None self.manager: Optional[multiprocessing.managers.SyncManager] = None - def __enter__(self) -> 'EventManager': - self.setup() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.shutdown() - def setup(self) -> None: self.manager = multiprocessing.Manager() self.queue = EventQueue(self.manager.Queue()) diff --git a/proxy/proxy.py b/proxy/proxy.py index cae527fa39..a234576975 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -12,18 +12,16 @@ import time import logging -from types import TracebackType from typing import List, Optional, Any, Type -from proxy.core.acceptor.work import Work - -from .core.acceptor import AcceptorPool, ThreadlessPool -from .http.handler import HttpProtocolHandler +from .core.acceptor import AcceptorPool, ThreadlessPool, Work from .core.event import EventManager +from .http.handler import HttpProtocolHandler from .common.flag import FlagParser, flags from .common.constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL from .common.constants import DEFAULT_OPEN_FILE_LIMIT, DEFAULT_PLUGINS, DEFAULT_VERSION from .common.constants import DEFAULT_ENABLE_DASHBOARD +from .common.context_managers import SetupShutdownContextManager logger = logging.getLogger(__name__) @@ -77,10 +75,10 @@ # TODO: Ideally all `--enable-*` flags must be at the top-level. # --enable-dashboard is specially needed here because -# ProxyDashboard class is not imported by anyone. +# ProxyDashboard class is not imported anywhere. # -# If we move this flag definition within dashboard.py, -# users will also have to explicitly enable dashboard plugin +# Due to which, if we move this flag definition within dashboard +# plugin, users will have to explicitly enable dashboard plugin # to also use flags provided by it. flags.add_argument( '--enable-dashboard', @@ -90,13 +88,13 @@ ) -class Proxy: - """Context manager to control AcceptorPool & Eventing core lifecycle. +class Proxy(SetupShutdownContextManager): + """Context manager to control AcceptorPool, ExecutorPool & EventingCore lifecycle. By default, AcceptorPool is started with `HttpProtocolHandler` work class. By definition, it expects HTTP traffic to flow between clients and server. - Optionally, it also initializes the eventing core, a multiprocess safe + Optionally, it also initializes the eventing core, a multi-process safe pubsub system queue which can be used to build various patterns for message sharing and/or signaling. """ @@ -113,7 +111,20 @@ def __init__( self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None - def __enter__(self) -> 'Proxy': + def setup(self) -> None: + # TODO: Introduce cron feature + # https://github.com/abhinavsingh/proxy.py/issues/392 + # + # TODO: Introduce ability to publish + # adhoc events which can modify behaviour of server + # at runtime. Example, updating flags, plugin + # configuration etc. + # + # TODO: Python shell within running proxy.py environment? + # + # TODO: Pid watcher which watches for processes started + # by proxy.py core. May be alert or restart those processes + # on failure. if self.flags.enable_events: logger.info('Core Event enabled') self.event_manager = EventManager() @@ -146,14 +157,8 @@ def __enter__(self) -> 'Proxy': 'Listening on %s:%s' % (self.acceptors.flags.hostname, self.acceptors.flags.port), ) - return self - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: + def shutdown(self) -> None: assert self.acceptors self.acceptors.shutdown() assert self.executors @@ -169,19 +174,6 @@ def main( ) -> None: try: with Proxy(input_args=input_args, **opts): - # TODO: Introduce cron feature - # https://github.com/abhinavsingh/proxy.py/issues/392 - # - # TODO: Introduce ability to publish - # adhoc events which can modify behaviour of server - # at runtime. Example, updating flags, plugin - # configuration etc. - # - # TODO: Python shell within running proxy.py environment? - # - # TODO: Pid watcher which watches for processes started - # by proxy.py core. May be alert or restart those processes - # on failure. while True: time.sleep(1) except KeyboardInterrupt: From e9f1927e18edcbb65c3ca0a57c0de490b23534f0 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 00:43:34 +0530 Subject: [PATCH 13/26] Match --num-acceptors logic with PR description --- proxy/common/flag.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/proxy/common/flag.py b/proxy/common/flag.py index 62e7fcd961..9f115a7848 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -235,23 +235,33 @@ def initialize( socket.AF_INET6 if args.hostname.version == 6 else socket.AF_INET ) else: - # FIXME: Not true for tests, as this value will be mock + # FIXME: Not true for tests, as this value will be a mock. + # # It's a problem only on Windows. Instead of a proper - # test level fix, simply commenting this for now. + # fix in the tests, simply commenting this line of assertion + # for now. + # # assert args.unix_socket_path is None args.family = socket.AF_INET6 if args.hostname.version == 6 else socket.AF_INET args.port = cast(int, opts.get('port', args.port)) args.backlog = cast(int, opts.get('backlog', args.backlog)) num_workers = opts.get('num_workers', args.num_workers) - num_workers = num_workers if num_workers is not None else DEFAULT_NUM_WORKERS args.num_workers = cast( int, num_workers if num_workers > 0 else multiprocessing.cpu_count(), ) num_acceptors = opts.get('num_acceptors', args.num_acceptors) - num_acceptors = num_acceptors if num_acceptors is not None else DEFAULT_NUM_ACCEPTORS - args.num_acceptors = cast( - int, num_acceptors if num_acceptors > 0 else multiprocessing.cpu_count(), - ) + # See https://github.com/abhinavsingh/proxy.py/pull/714 description + # to understand rationale behind the following logic. + # + # --num-workers flag or option was found. We will use + # the same value for num_acceptors when --num-acceptors flag + # is absent. + if num_workers != DEFAULT_NUM_WORKERS and num_acceptors == DEFAULT_NUM_ACCEPTORS: + args.num_acceptors = args.num_workers + else: + args.num_acceptors = cast( + int, num_acceptors if num_acceptors > 0 else multiprocessing.cpu_count(), + ) args.static_server_dir = cast( str, opts.get( From 5058fd650c5956116559d49283192bceba6eccc2 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 00:54:24 +0530 Subject: [PATCH 14/26] Rename executor utility methods and add docstring --- proxy/core/acceptor/acceptor.py | 2 +- proxy/core/acceptor/executors.py | 113 ++++++++++++++++++------------- 2 files changed, 66 insertions(+), 49 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 2731ba646e..257c3c3108 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -131,7 +131,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: # By default all acceptors will start sending work to # 1st workers. To randomize, we offset index by idd. index = (self._total + self.idd) % self.flags.num_workers - ThreadlessPool.start_threadless_work( + ThreadlessPool.delegate( self.executor_pids[index], self.executor_queues[index], conn, diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index d87f3a2820..5b91898ec7 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -70,6 +70,15 @@ class ThreadlessPool(SetupShutdownContextManager): with ThreadlessPool(flags=..., work_klass=...) as pool: while True: time.sleep(1) + + If necessary, start multiple threadless pool with different + work classes. + + TODO: We could optimize multiple-work-type scenario + by making Threadless class constructor independent of work_klass. + We could then relay the work_klass during work delegation. + This will also make ThreadlessPool constructor agnostic + of work_klass. """ def __init__( @@ -80,9 +89,62 @@ def __init__( self.flags = flags self.work_klass = work_klass self.event_queue = event_queue - self._workers: List[Threadless] = [] + # Threadless worker communication states self.work_queues: List[connection.Connection] = [] self.work_pids: List[int] = [] + # List of threadless workers + self._workers: List[Threadless] = [] + + @staticmethod + def delegate( + worker_pid: int, + work_queue: connection.Connection, + conn: socket.socket, + addr: Optional[Tuple[str, int]], + unix_socket_path: Optional[str] = None, + ) -> None: + """Utility method to delegate a work to threadless executor pool.""" + # Accepted client address is empty string for + # unix socket domain, avoid sending empty string + # for optimization. + if not unix_socket_path: + work_queue.send(addr) + send_handle( + work_queue, + conn.fileno(), + worker_pid, + ) + conn.close() + + @staticmethod + def start_threaded_work( + flags: argparse.Namespace, + work_klass: Type[Work], + conn: socket.socket, + addr: Optional[Tuple[str, int]], + event_queue: Optional[EventQueue] = None, + publisher_id: Optional[str] = None, + ) -> Tuple[Work, threading.Thread]: + """Utility method to start a work in a new thread.""" + work = work_klass( + TcpClientConnection(conn, addr), + flags=flags, + event_queue=event_queue, + ) + # TODO: Keep reference to threads and join during shutdown. + # This will ensure connections are not abruptly closed on shutdown + # for threaded execution mode. + thread = threading.Thread(target=work.run) + thread.daemon = True + thread.start() + work.publish_event( + event_name=eventNames.WORK_STARTED, + event_payload={'fileno': conn.fileno(), 'addr': addr}, + publisher_id=publisher_id or 'thread#{0}'.format( + thread.ident, + ), + ) + return (work, thread) def setup(self) -> None: """Setup threadless processes.""" @@ -107,6 +169,7 @@ def shutdown(self) -> None: ) def _start_worker(self, index: int) -> None: + """Starts a threadless worker.""" pipe = multiprocessing.Pipe() self.work_queues.append(pipe[0]) w = Threadless( @@ -122,6 +185,7 @@ def _start_worker(self, index: int) -> None: logger.debug('Started threadless#%d process#%d', index, w.pid) def _shutdown_worker(self) -> None: + """Pop a running threadless worker and clean it up.""" w = self._workers.pop() pid = w.pid w.running.set() @@ -129,50 +193,3 @@ def _shutdown_worker(self) -> None: self.work_pids.pop() self.work_queues.pop().close() logger.debug('Stopped threadless process#%d', pid) - - @staticmethod - def start_threadless_work( - worker_pid: int, - work_queue: connection.Connection, - conn: socket.socket, - addr: Optional[Tuple[str, int]], - unix_socket_path: Optional[str] = None, - ) -> None: - # Accepted client address is empty string for - # unix socket domain, avoid sending empty string - if not unix_socket_path: - work_queue.send(addr) - send_handle( - work_queue, - conn.fileno(), - worker_pid, - ) - conn.close() - - @staticmethod - def start_threaded_work( - flags: argparse.Namespace, - work_klass: Type[Work], - conn: socket.socket, - addr: Optional[Tuple[str, int]], - event_queue: Optional[EventQueue] = None, - publisher_id: Optional[str] = None, - ) -> Tuple[Work, threading.Thread]: - work = work_klass( - TcpClientConnection(conn, addr), - flags=flags, - event_queue=event_queue, - ) - # TODO: Keep reference to threads and join during shutdown. - # This will ensure connections are not abruptly closed on shutdown. - thread = threading.Thread(target=work.run) - thread.daemon = True - thread.start() - work.publish_event( - event_name=eventNames.WORK_STARTED, - event_payload={'fileno': conn.fileno(), 'addr': addr}, - publisher_id=publisher_id or 'thread#{0}'.format( - thread.ident, - ), - ) - return (work, thread) From d42c9ff4ed62a3e8c3d798d379ec13a6732528e5 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 01:00:19 +0530 Subject: [PATCH 15/26] Remove work_klass from constructors and pass it via flags --- proxy/core/acceptor/acceptor.py | 6 ++---- proxy/core/acceptor/executors.py | 9 +++------ proxy/core/acceptor/pool.py | 5 +---- proxy/core/acceptor/threadless.py | 4 +--- proxy/proxy.py | 4 +--- 5 files changed, 8 insertions(+), 20 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 257c3c3108..a9fc271946 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -59,7 +59,6 @@ def __init__( idd: int, work_queue: connection.Connection, flags: argparse.Namespace, - work_klass: Type[Work], lock: multiprocessing.synchronize.Lock, executor_queues: List[connection.Connection], executor_pids: List[int], @@ -76,8 +75,7 @@ def __init__( self.lock = lock # Queue over which server socket fd is received on start-up self.work_queue: connection.Connection = work_queue - # Worker class - self.work_klass = work_klass + # Available executors self.executor_queues = executor_queues self.executor_pids = executor_pids # Selector @@ -145,7 +143,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: ) else: work, thread = ThreadlessPool.start_threaded_work( - self.flags, self.work_klass, + self.flags, conn, addr, event_queue=self.event_queue, publisher_id=self.__class__.__name__, diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 5b91898ec7..060e697455 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -82,12 +82,11 @@ class ThreadlessPool(SetupShutdownContextManager): """ def __init__( - self, flags: argparse.Namespace, - work_klass: Type[Work], + self, + flags: argparse.Namespace, event_queue: Optional[EventQueue] = None, ) -> None: self.flags = flags - self.work_klass = work_klass self.event_queue = event_queue # Threadless worker communication states self.work_queues: List[connection.Connection] = [] @@ -119,14 +118,13 @@ def delegate( @staticmethod def start_threaded_work( flags: argparse.Namespace, - work_klass: Type[Work], conn: socket.socket, addr: Optional[Tuple[str, int]], event_queue: Optional[EventQueue] = None, publisher_id: Optional[str] = None, ) -> Tuple[Work, threading.Thread]: """Utility method to start a work in a new thread.""" - work = work_klass( + work = flags.work_klass( TcpClientConnection(conn, addr), flags=flags, event_queue=event_queue, @@ -175,7 +173,6 @@ def _start_worker(self, index: int) -> None: w = Threadless( client_queue=pipe[1], flags=self.flags, - work_klass=self.work_klass, event_queue=self.event_queue, ) self._workers.append(w) diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index e01d6a7fb7..668a71cb50 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -99,7 +99,6 @@ class AcceptorPool(SetupShutdownContextManager): def __init__( self, flags: argparse.Namespace, - work_klass: Type[Work], executor_queues: List[connection.Connection], executor_pids: List[int], event_queue: Optional[EventQueue] = None, @@ -113,8 +112,7 @@ def __init__( self.acceptors: List[Acceptor] = [] # Work queue used to share file descriptor with acceptor processes self.work_queues: List[connection.Connection] = [] - # Work class implementation - self.work_klass: Type[Work] = work_klass + # Available executors self.executor_queues: List[connection.Connection] = executor_queues self.executor_pids: List[int] = executor_pids @@ -175,7 +173,6 @@ def _start(self) -> None: idd=acceptor_id, work_queue=work_queue[1], flags=self.flags, - work_klass=self.work_klass, lock=LOCK, event_queue=self.event_queue, executor_queues=self.executor_queues, diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index 7a957f4180..d94f4db6b6 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -56,13 +56,11 @@ def __init__( self, client_queue: connection.Connection, flags: argparse.Namespace, - work_klass: Type[Work], event_queue: Optional[EventQueue] = None, ) -> None: super().__init__() self.client_queue = client_queue self.flags = flags - self.work_klass = work_klass self.event_queue = event_queue self.running = multiprocessing.Event() @@ -139,7 +137,7 @@ def accept_client(self) -> None: if not self.flags.unix_socket_path: addr = self.client_queue.recv() fileno = recv_handle(self.client_queue) - self.works[fileno] = self.work_klass( + self.works[fileno] = self.flags.work_klass( TcpClientConnection(conn=self.fromfd(fileno), addr=addr), flags=self.flags, event_queue=self.event_queue, diff --git a/proxy/proxy.py b/proxy/proxy.py index a234576975..de46e85fcd 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -105,8 +105,8 @@ def __init__( work_klass: Type[Work] = HttpProtocolHandler, **opts: Any, ) -> None: - self.work_klass: Type[Work] = work_klass self.flags = FlagParser.initialize(input_args, **opts) + self.flags.work_klass = work_klass self.acceptors: Optional[AcceptorPool] = None self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None @@ -134,13 +134,11 @@ def setup(self) -> None: else None self.executors = ThreadlessPool( flags=self.flags, - work_klass=self.work_klass, event_queue=event_queue, ) self.executors.setup() self.acceptors = AcceptorPool( flags=self.flags, - work_klass=self.work_klass, event_queue=event_queue, executor_queues=self.executors.work_queues, executor_pids=self.executors.work_pids, From 730c44cdf12fdcef8442d6630e294705a95d650a Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 01:02:43 +0530 Subject: [PATCH 16/26] Update docstring for pools as they no longer accept a work_klass argument --- proxy/core/acceptor/executors.py | 2 +- proxy/core/acceptor/pool.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 060e697455..ca73f448df 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -67,7 +67,7 @@ class ThreadlessPool(SetupShutdownContextManager): Example usage: - with ThreadlessPool(flags=..., work_klass=...) as pool: + with ThreadlessPool(flags=...) as pool: while True: time.sleep(1) diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 668a71cb50..38a5edf43f 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -90,11 +90,11 @@ class AcceptorPool(SetupShutdownContextManager): Example usage: - with AcceptorPool(flags=..., work_klass=...) as pool: + with AcceptorPool(flags=...) as pool: while True: time.sleep(1) - `work_klass` must implement `work.Work` class. + `flags.work_klass` must implement `work.Work` class. """ def __init__( From 3c9a24bbda1618d9752f24e25da300ea71b2cf92 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 01:23:52 +0530 Subject: [PATCH 17/26] Turn work_klass into a flag. main() no longer accepts input_args (only kwargs opts). Similarly, Proxy doesnt accept any input_args now (only kwargs opts) --- examples/https_connect_tunnel.py | 9 +++---- examples/ssl_echo_server.py | 12 ++++----- examples/tcp_echo_server.py | 6 ++--- examples/web_scraper.py | 10 +++----- proxy/common/constants.py | 1 + proxy/common/flag.py | 44 ++++++++++++++++++-------------- proxy/proxy.py | 28 ++++++++++---------- 7 files changed, 54 insertions(+), 56 deletions(-) diff --git a/examples/https_connect_tunnel.py b/examples/https_connect_tunnel.py index b34ed7ff82..5b246a5ccb 100644 --- a/examples/https_connect_tunnel.py +++ b/examples/https_connect_tunnel.py @@ -13,7 +13,6 @@ from typing import Any, Optional from proxy import Proxy -from proxy.common.flag import FlagParser from proxy.common.utils import build_http_response from proxy.http.codes import httpStatusCodes from proxy.http.parser import httpParserStates @@ -76,10 +75,10 @@ def handle_data(self, data: memoryview) -> Optional[bool]: def main() -> None: # This example requires `threadless=True` with Proxy( - flags=FlagParser.initialize( - port=12345, num_workers=1, threadless=True, - ), - work_klass=HttpsConnectTunnelHandler, + work_klass=HttpsConnectTunnelHandler, + threadless=True, + num_workers=1, + port=12345, ): try: while True: diff --git a/examples/ssl_echo_server.py b/examples/ssl_echo_server.py index 9359bcaeb1..11d2331cc9 100644 --- a/examples/ssl_echo_server.py +++ b/examples/ssl_echo_server.py @@ -46,14 +46,12 @@ def handle_data(self, data: memoryview) -> Optional[bool]: def main() -> None: # This example requires `threadless=True` with Proxy( - flags=FlagParser.initialize( - port=12345, - num_workers=1, - threadless=True, - keyfile='https-key.pem', - certfile='https-signed-cert.pem', - ), work_klass=EchoSSLServerHandler, + threadless=True, + num_workers=1, + port=12345, + keyfile='https-key.pem', + certfile='https-signed-cert.pem', ): try: while True: diff --git a/examples/tcp_echo_server.py b/examples/tcp_echo_server.py index e65c3bcb2f..821128cd74 100644 --- a/examples/tcp_echo_server.py +++ b/examples/tcp_echo_server.py @@ -31,10 +31,10 @@ def handle_data(self, data: memoryview) -> Optional[bool]: def main() -> None: # This example requires `threadless=True` with Proxy( - flags=FlagParser.initialize( - port=12345, num_workers=1, threadless=True, - ), work_klass=EchoServerHandler, + threadless=True, + num_workers=1, + port=12345, ): try: while True: diff --git a/examples/web_scraper.py b/examples/web_scraper.py index 3c42060ae6..8fbafcda96 100644 --- a/examples/web_scraper.py +++ b/examples/web_scraper.py @@ -58,14 +58,10 @@ def handle_events( if __name__ == '__main__': with Proxy( - flags=FlagParser.initialize( - port=12345, - num_workers=1, - threadless=True, - keyfile='https-key.pem', - certfile='https-signed-cert.pem', - ), work_klass=WebScraper, + threadless=True, + num_workers=1, + port=12345, ) as pool: while True: time.sleep(1) diff --git a/proxy/common/constants.py b/proxy/common/constants.py index 2fa7e9f609..8ed524bc76 100644 --- a/proxy/common/constants.py +++ b/proxy/common/constants.py @@ -103,6 +103,7 @@ def _env_threadless_compliant() -> bool: DEFAULT_HTTP_PORT = 80 DEFAULT_HTTPS_PORT = 443 DEFAULT_MAX_SEND_SIZE = 16 * 1024 +DEFAULT_WORK_KLASS = 'proxy.http.handler.HttpProtocolHandler' DEFAULT_DEVTOOLS_DOC_URL = 'http://proxy' DEFAULT_DEVTOOLS_FRAME_ID = secrets.token_hex(8) diff --git a/proxy/common/flag.py b/proxy/common/flag.py index 9f115a7848..be6bcda564 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -107,13 +107,36 @@ def initialize( print(__version__) sys.exit(0) + # proxy.py currently cannot serve over HTTPS and also perform TLS interception + # at the same time. Check if user is trying to enable both feature + # at the same time. + if (args.cert_file and args.key_file) and \ + (args.ca_key_file and args.ca_cert_file and args.ca_signing_key_file): + print( + 'You can either enable end-to-end encryption OR TLS interception,' + 'not both together.', + ) + sys.exit(1) + # Setup logging module Logger.setup_logger(args.log_file, args.log_level, args.log_format) # Setup limits set_open_file_limit(args.open_file_limit) - # Load plugins + # Load work_klass + work_klass = opts.get('work_klass', args.work_klass) + work_klass = work_klass \ + if isinstance(work_klass, type) \ + else Plugins.importer(work_klass)[0] + print(work_klass) + + # Generate auth_code required for basic authentication if enabled + auth_code = None + if args.basic_auth: + auth_code = base64.b64encode(bytes_(args.basic_auth)) + + # Load default plugins along with user provided --plugins default_plugins = [ bytes_(p) for p in FlagParser.get_default_plugins(args) @@ -123,31 +146,14 @@ def initialize( for p in opts.get('plugins', args.plugins.split(text_(COMMA))) if not (isinstance(p, str) and len(p) == 0) ] - - # Load default plugins along with user provided --plugins plugins = Plugins.load(default_plugins + extra_plugins) - # proxy.py currently cannot serve over HTTPS and also perform TLS interception - # at the same time. Check if user is trying to enable both feature - # at the same time. - if (args.cert_file and args.key_file) and \ - (args.ca_key_file and args.ca_cert_file and args.ca_signing_key_file): - print( - 'You can either enable end-to-end encryption OR TLS interception,' - 'not both together.', - ) - sys.exit(1) - - # Generate auth_code required for basic authentication if enabled - auth_code = None - if args.basic_auth: - auth_code = base64.b64encode(bytes_(args.basic_auth)) - # https://github.com/python/mypy/issues/5865 # # def option(t: object, key: str, default: Any) -> Any: # return cast(t, opts.get(key, default)) + args.work_klass = work_klass args.plugins = plugins args.auth_code = cast( Optional[bytes], diff --git a/proxy/proxy.py b/proxy/proxy.py index de46e85fcd..a5f1c687c0 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -20,7 +20,7 @@ from .common.flag import FlagParser, flags from .common.constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL from .common.constants import DEFAULT_OPEN_FILE_LIMIT, DEFAULT_PLUGINS, DEFAULT_VERSION -from .common.constants import DEFAULT_ENABLE_DASHBOARD +from .common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_WORK_KLASS from .common.context_managers import SetupShutdownContextManager @@ -87,6 +87,13 @@ help='Default: False. Enables proxy.py dashboard.', ) +flags.add_argument( + '--work-klass', + type=str, + default=DEFAULT_WORK_KLASS, + help='Work klass to use by default for work execution.', +) + class Proxy(SetupShutdownContextManager): """Context manager to control AcceptorPool, ExecutorPool & EventingCore lifecycle. @@ -99,14 +106,8 @@ class Proxy(SetupShutdownContextManager): for message sharing and/or signaling. """ - def __init__( - self, - input_args: Optional[List[str]] = None, - work_klass: Type[Work] = HttpProtocolHandler, - **opts: Any, - ) -> None: - self.flags = FlagParser.initialize(input_args, **opts) - self.flags.work_klass = work_klass + def __init__(self, **opts: Any) -> None: + self.flags = FlagParser.initialize(sys.argv[1:], **opts) self.acceptors: Optional[AcceptorPool] = None self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None @@ -166,12 +167,9 @@ def shutdown(self) -> None: self.event_manager.shutdown() -def main( - input_args: Optional[List[str]] = None, - **opts: Any, -) -> None: +def main(**opts: Any) -> None: try: - with Proxy(input_args=input_args, **opts): + with Proxy(**opts): while True: time.sleep(1) except KeyboardInterrupt: @@ -179,4 +177,4 @@ def main( def entry_point() -> None: - main(input_args=sys.argv[1:]) + main() From b3b622ab417a2f7ce0314f66f05150bd48d0ffc5 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 01:28:10 +0530 Subject: [PATCH 18/26] Expose default work klass in README --- README.md | 169 +++++++++++++++++++++++++++++-------------------- proxy/proxy.py | 3 +- 2 files changed, 101 insertions(+), 71 deletions(-) diff --git a/README.md b/README.md index 094787a372..4d8beebe70 100644 --- a/README.md +++ b/README.md @@ -1927,19 +1927,25 @@ for list of tests. ```console ❯ proxy -h -usage: proxy [-h] [--enable-events] [--enable-conn-pool] [--threadless] [--threaded] - [--num-workers NUM_WORKERS] [--pid-file PID_FILE] [--backlog BACKLOG] [--hostname HOSTNAME] - [--port PORT] [--num-acceptors NUM_ACCEPTORS] [--unix-socket-path UNIX_SOCKET_PATH] - [--client-recvbuf-size CLIENT_RECVBUF_SIZE] [--key-file KEY_FILE] [--timeout TIMEOUT] - [--version] [--log-level LOG_LEVEL] [--log-file LOG_FILE] [--log-format LOG_FORMAT] - [--open-file-limit OPEN_FILE_LIMIT] [--plugins PLUGINS] [--enable-dashboard] - [--disable-http-proxy] [--ca-key-file CA_KEY_FILE] [--ca-cert-dir CA_CERT_DIR] - [--ca-cert-file CA_CERT_FILE] [--ca-file CA_FILE] [--ca-signing-key-file CA_SIGNING_KEY_FILE] - [--cert-file CERT_FILE] [--disable-headers DISABLE_HEADERS] - [--server-recvbuf-size SERVER_RECVBUF_SIZE] [--basic-auth BASIC_AUTH] [--cache-dir CACHE_DIR] +usage: -m [-h] [--enable-events] [--enable-conn-pool] [--threadless] [--threaded] + [--num-workers NUM_WORKERS] [--pid-file PID_FILE] [--backlog BACKLOG] + [--hostname HOSTNAME] [--port PORT] [--num-acceptors NUM_ACCEPTORS] + [--unix-socket-path UNIX_SOCKET_PATH] + [--client-recvbuf-size CLIENT_RECVBUF_SIZE] [--key-file KEY_FILE] + [--timeout TIMEOUT] [--version] [--log-level LOG_LEVEL] + [--log-file LOG_FILE] [--log-format LOG_FORMAT] + [--open-file-limit OPEN_FILE_LIMIT] [--plugins PLUGINS] + [--enable-dashboard] [--work-klass WORK_KLASS] [--disable-http-proxy] + [--ca-key-file CA_KEY_FILE] [--ca-cert-dir CA_CERT_DIR] + [--ca-cert-file CA_CERT_FILE] [--ca-file CA_FILE] + [--ca-signing-key-file CA_SIGNING_KEY_FILE] [--cert-file CERT_FILE] + [--disable-headers DISABLE_HEADERS] + [--server-recvbuf-size SERVER_RECVBUF_SIZE] [--basic-auth BASIC_AUTH] + [--cache-dir CACHE_DIR] [--filtered-upstream-hosts FILTERED_UPSTREAM_HOSTS] [--enable-web-server] - [--enable-static-server] [--static-server-dir STATIC_SERVER_DIR] [--pac-file PAC_FILE] - [--pac-file-url-path PAC_FILE_URL_PATH] [--filtered-client-ips FILTERED_CLIENT_IPS] + [--enable-static-server] [--static-server-dir STATIC_SERVER_DIR] + [--pac-file PAC_FILE] [--pac-file-url-path PAC_FILE_URL_PATH] + [--filtered-client-ips FILTERED_CLIENT_IPS] [--filtered-url-regex-config FILTERED_URL_REGEX_CONFIG] [--cloudflare-dns-mode CLOUDFLARE_DNS_MODE] @@ -1947,101 +1953,124 @@ proxy.py v2.4.0 options: -h, --help show this help message and exit - --enable-events Default: False. Enables core to dispatch lifecycle events. Plugins can be used - to subscribe for core events. + --enable-events Default: False. Enables core to dispatch lifecycle events. + Plugins can be used to subscribe for core events. --enable-conn-pool Default: False. (WIP) Enable upstream connection pooling. - --threadless Default: True. Enabled by default on Python 3.8+ (mac, linux). When disabled a - new thread is spawned to handle each client connection. - --threaded Default: False. Disabled by default on Python < 3.8 and windows. When enabled a - new thread is spawned to handle each client connection. + --threadless Default: True. Enabled by default on Python 3.8+ (mac, + linux). When disabled a new thread is spawned to handle + each client connection. + --threaded Default: False. Disabled by default on Python < 3.8 and + windows. When enabled a new thread is spawned to handle + each client connection. --num-workers NUM_WORKERS Defaults to number of CPU cores. --pid-file PID_FILE Default: None. Save parent process ID to a file. - --backlog BACKLOG Default: 100. Maximum number of pending connections to proxy server + --backlog BACKLOG Default: 100. Maximum number of pending connections to + proxy server --hostname HOSTNAME Default: ::1. Server IP address. --port PORT Default: 8899. Server port. --num-acceptors NUM_ACCEPTORS Defaults to number of CPU cores. --unix-socket-path UNIX_SOCKET_PATH - Default: None. Unix socket path to use. When provided --host and --port flags - are ignored + Default: None. Unix socket path to use. When provided + --host and --port flags are ignored --client-recvbuf-size CLIENT_RECVBUF_SIZE - Default: 1 MB. Maximum amount of data received from the client in a single - recv() operation. Bump this value for faster uploads at the expense of increased - RAM. - --key-file KEY_FILE Default: None. Server key file to enable end-to-end TLS encryption with clients. - If used, must also pass --cert-file. - --timeout TIMEOUT Default: 10.0. Number of seconds after which an inactive connection must be - dropped. Inactivity is defined by no data sent or received by the client. + Default: 1 MB. Maximum amount of data received from the + client in a single recv() operation. Bump this value for + faster uploads at the expense of increased RAM. + --key-file KEY_FILE Default: None. Server key file to enable end-to-end TLS + encryption with clients. If used, must also pass --cert- + file. + --timeout TIMEOUT Default: 10.0. Number of seconds after which an inactive + connection must be dropped. Inactivity is defined by no + data sent or received by the client. --version, -v Prints proxy.py version. --log-level LOG_LEVEL - Valid options: DEBUG, INFO (default), WARNING, ERROR, CRITICAL. Both upper and - lowercase values are allowed. You may also simply use the leading character e.g. - --log-level d + Valid options: DEBUG, INFO (default), WARNING, ERROR, + CRITICAL. Both upper and lowercase values are allowed. You + may also simply use the leading character e.g. --log-level + d --log-file LOG_FILE Default: sys.stdout. Log file destination. --log-format LOG_FORMAT Log format for Python logger. --open-file-limit OPEN_FILE_LIMIT - Default: 1024. Maximum number of files (TCP connections) that proxy.py can open - concurrently. + Default: 1024. Maximum number of files (TCP connections) + that proxy.py can open concurrently. --plugins PLUGINS Comma separated plugins --enable-dashboard Default: False. Enables proxy.py dashboard. + --work-klass WORK_KLASS + Default: proxy.http.handler.HttpProtocolHandler. Work klass + to use for work execution. --disable-http-proxy Default: False. Whether to disable proxy.HttpProxyPlugin. --ca-key-file CA_KEY_FILE - Default: None. CA key to use for signing dynamically generated HTTPS - certificates. If used, must also pass --ca-cert-file and --ca-signing-key-file + Default: None. CA key to use for signing dynamically + generated HTTPS certificates. If used, must also pass --ca- + cert-file and --ca-signing-key-file --ca-cert-dir CA_CERT_DIR - Default: ~/.proxy.py. Directory to store dynamically generated certificates. - Also see --ca-key-file, --ca-cert-file and --ca-signing-key-file + Default: ~/.proxy.py. Directory to store dynamically + generated certificates. Also see --ca-key-file, --ca-cert- + file and --ca-signing-key-file --ca-cert-file CA_CERT_FILE - Default: None. Signing certificate to use for signing dynamically generated - HTTPS certificates. If used, must also pass --ca-key-file and --ca-signing-key- - file - --ca-file CA_FILE Default: /Users/abhinavsingh/Dev/proxy.py/venv310/lib/python3.10/site- - packages/certifi/cacert.pem. Provide path to custom CA bundle for peer - certificate verification + Default: None. Signing certificate to use for signing + dynamically generated HTTPS certificates. If used, must + also pass --ca-key-file and --ca-signing-key-file + --ca-file CA_FILE Default: /Users/abhinavsingh/Dev/proxy.py/venv310/lib/pytho + n3.10/site-packages/certifi/cacert.pem. Provide path to + custom CA bundle for peer certificate verification --ca-signing-key-file CA_SIGNING_KEY_FILE - Default: None. CA signing key to use for dynamic generation of HTTPS - certificates. If used, must also pass --ca-key-file and --ca-cert-file + Default: None. CA signing key to use for dynamic generation + of HTTPS certificates. If used, must also pass --ca-key- + file and --ca-cert-file --cert-file CERT_FILE - Default: None. Server certificate to enable end-to-end TLS encryption with - clients. If used, must also pass --key-file. + Default: None. Server certificate to enable end-to-end TLS + encryption with clients. If used, must also pass --key- + file. --disable-headers DISABLE_HEADERS - Default: None. Comma separated list of headers to remove before dispatching - client request to upstream server. + Default: None. Comma separated list of headers to remove + before dispatching client request to upstream server. --server-recvbuf-size SERVER_RECVBUF_SIZE - Default: 1 MB. Maximum amount of data received from the server in a single - recv() operation. Bump this value for faster downloads at the expense of - increased RAM. + Default: 1 MB. Maximum amount of data received from the + server in a single recv() operation. Bump this value for + faster downloads at the expense of increased RAM. --basic-auth BASIC_AUTH - Default: No authentication. Specify colon separated user:password to enable - basic authentication. + Default: No authentication. Specify colon separated + user:password to enable basic authentication. --cache-dir CACHE_DIR - Default: A temporary directory. Flag only applicable when cache plugin is used - with on-disk storage. + Default: A temporary directory. Flag only applicable when + cache plugin is used with on-disk storage. --filtered-upstream-hosts FILTERED_UPSTREAM_HOSTS - Default: Blocks Facebook. Comma separated list of IPv4 and IPv6 addresses. - --enable-web-server Default: False. Whether to enable proxy.HttpWebServerPlugin. + Default: Blocks Facebook. Comma separated list of IPv4 and + IPv6 addresses. + --enable-web-server Default: False. Whether to enable + proxy.HttpWebServerPlugin. --enable-static-server - Default: False. Enable inbuilt static file server. Optionally, also use - --static-server-dir to serve static content from custom directory. By default, - static file server serves out of installed proxy.py python module folder. + Default: False. Enable inbuilt static file server. + Optionally, also use --static-server-dir to serve static + content from custom directory. By default, static file + server serves out of installed proxy.py python module + folder. --static-server-dir STATIC_SERVER_DIR - Default: "public" folder in directory where proxy.py is placed. This option is - only applicable when static server is also enabled. See --enable-static-server. - --pac-file PAC_FILE A file (Proxy Auto Configuration) or string to serve when the server receives a - direct file request. Using this option enables proxy.HttpWebServerPlugin. + Default: "public" folder in directory where proxy.py is + placed. This option is only applicable when static server + is also enabled. See --enable-static-server. + --pac-file PAC_FILE A file (Proxy Auto Configuration) or string to serve when + the server receives a direct file request. Using this + option enables proxy.HttpWebServerPlugin. --pac-file-url-path PAC_FILE_URL_PATH Default: /. Web server path to serve the PAC file. --filtered-client-ips FILTERED_CLIENT_IPS - Default: 127.0.0.1,::1. Comma separated list of IPv4 and IPv6 addresses. + Default: 127.0.0.1,::1. Comma separated list of IPv4 and + IPv6 addresses. --filtered-url-regex-config FILTERED_URL_REGEX_CONFIG - Default: No config. Comma separated list of IPv4 and IPv6 addresses. + Default: No config. Comma separated list of IPv4 and IPv6 + addresses. --cloudflare-dns-mode CLOUDFLARE_DNS_MODE - Default: security. Either "security" (for malware protection) or "family" (for - malware and adult content protection) + Default: security. Either "security" (for malware + protection) or "family" (for malware and adult content + protection) -Proxy.py not working? Report at: https://github.com/abhinavsingh/proxy.py/issues/new +Proxy.py not working? Report at: +https://github.com/abhinavsingh/proxy.py/issues/new ``` # Changelog diff --git a/proxy/proxy.py b/proxy/proxy.py index a5f1c687c0..9225742c61 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -91,7 +91,8 @@ '--work-klass', type=str, default=DEFAULT_WORK_KLASS, - help='Work klass to use by default for work execution.', + help='Default: ' + DEFAULT_WORK_KLASS + + '. Work klass to use for work execution.', ) From 50855386d153c6d990a59ca3e5ff9a3785f4cdd3 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 01:33:42 +0530 Subject: [PATCH 19/26] Expose `HttpProtocolHandler` and `HttpProtocolHandlerPlugin` within `proxy.http` module --- proxy/common/constants.py | 2 +- proxy/http/__init__.py | 7 +++++++ proxy/proxy.py | 2 +- tests/http/exceptions/test_http_proxy_auth_failed.py | 2 +- tests/http/test_http_proxy.py | 2 +- tests/http/test_http_proxy_tls_interception.py | 2 +- tests/http/test_protocol_handler.py | 2 +- tests/http/test_web_server.py | 2 +- tests/plugin/test_http_proxy_plugins.py | 2 +- .../test_http_proxy_plugins_with_tls_interception.py | 2 +- tests/test_main.py | 2 +- 11 files changed, 17 insertions(+), 10 deletions(-) diff --git a/proxy/common/constants.py b/proxy/common/constants.py index 8ed524bc76..b5b88186a2 100644 --- a/proxy/common/constants.py +++ b/proxy/common/constants.py @@ -103,7 +103,7 @@ def _env_threadless_compliant() -> bool: DEFAULT_HTTP_PORT = 80 DEFAULT_HTTPS_PORT = 443 DEFAULT_MAX_SEND_SIZE = 16 * 1024 -DEFAULT_WORK_KLASS = 'proxy.http.handler.HttpProtocolHandler' +DEFAULT_WORK_KLASS = 'proxy.http.HttpProtocolHandler' DEFAULT_DEVTOOLS_DOC_URL = 'http://proxy' DEFAULT_DEVTOOLS_FRAME_ID = secrets.token_hex(8) diff --git a/proxy/http/__init__.py b/proxy/http/__init__.py index 232621f0b5..e293b97993 100644 --- a/proxy/http/__init__.py +++ b/proxy/http/__init__.py @@ -8,3 +8,10 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. """ +from .handler import HttpProtocolHandler +from .plugin import HttpProtocolHandlerPlugin + +__all__ = [ + 'HttpProtocolHandler', + 'HttpProtocolHandlerPlugin', +] diff --git a/proxy/proxy.py b/proxy/proxy.py index 9225742c61..f03648e678 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -16,7 +16,7 @@ from .core.acceptor import AcceptorPool, ThreadlessPool, Work from .core.event import EventManager -from .http.handler import HttpProtocolHandler +from .http import HttpProtocolHandler from .common.flag import FlagParser, flags from .common.constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL from .common.constants import DEFAULT_OPEN_FILE_LIMIT, DEFAULT_PLUGINS, DEFAULT_VERSION diff --git a/tests/http/exceptions/test_http_proxy_auth_failed.py b/tests/http/exceptions/test_http_proxy_auth_failed.py index c1d369a84c..a65fc045ff 100644 --- a/tests/http/exceptions/test_http_proxy_auth_failed.py +++ b/tests/http/exceptions/test_http_proxy_auth_failed.py @@ -14,7 +14,7 @@ from proxy.common.flag import FlagParser from proxy.http.exception.proxy_auth_failed import ProxyAuthenticationFailed -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.core.connection import TcpClientConnection from proxy.common.utils import build_http_request diff --git a/tests/http/test_http_proxy.py b/tests/http/test_http_proxy.py index f658de87e7..fde5da11b0 100644 --- a/tests/http/test_http_proxy.py +++ b/tests/http/test_http_proxy.py @@ -16,7 +16,7 @@ from proxy.common.flag import FlagParser from proxy.core.connection import TcpClientConnection from proxy.http.proxy import HttpProxyPlugin -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.http.exception import HttpProtocolException from proxy.common.utils import build_http_request diff --git a/tests/http/test_http_proxy_tls_interception.py b/tests/http/test_http_proxy_tls_interception.py index 987d5c5e46..0b13aa8ab5 100644 --- a/tests/http/test_http_proxy_tls_interception.py +++ b/tests/http/test_http_proxy_tls_interception.py @@ -19,7 +19,7 @@ from proxy.common.constants import DEFAULT_CA_FILE from proxy.core.connection import TcpClientConnection, TcpServerConnection -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.http.proxy import HttpProxyPlugin from proxy.http.methods import httpMethods from proxy.common.utils import build_http_request, bytes_ diff --git a/tests/http/test_protocol_handler.py b/tests/http/test_protocol_handler.py index 14a1f5ae6b..7ef30b7cc3 100644 --- a/tests/http/test_protocol_handler.py +++ b/tests/http/test_protocol_handler.py @@ -25,7 +25,7 @@ from proxy.http.proxy import HttpProxyPlugin from proxy.http.parser import httpParserStates, httpParserTypes from proxy.http.exception import ProxyAuthenticationFailed, ProxyConnectionFailed -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler class TestHttpProtocolHandler(unittest.TestCase): diff --git a/tests/http/test_web_server.py b/tests/http/test_web_server.py index 6ef69480c5..be0098128a 100644 --- a/tests/http/test_web_server.py +++ b/tests/http/test_web_server.py @@ -18,7 +18,7 @@ from proxy.common.plugins import Plugins from proxy.common.flag import FlagParser from proxy.core.connection import TcpClientConnection -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.http.parser import HttpParser, httpParserStates, httpParserTypes from proxy.common.utils import build_http_response, build_http_request, bytes_, text_ from proxy.common.constants import CRLF, PLUGIN_HTTP_PROXY, PLUGIN_PAC_FILE, PLUGIN_WEB_SERVER, PROXY_PY_DIR diff --git a/tests/plugin/test_http_proxy_plugins.py b/tests/plugin/test_http_proxy_plugins.py index 36bbbcb41b..97dae264d4 100644 --- a/tests/plugin/test_http_proxy_plugins.py +++ b/tests/plugin/test_http_proxy_plugins.py @@ -19,7 +19,7 @@ from proxy.common.flag import FlagParser from proxy.core.connection import TcpClientConnection -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.http.proxy import HttpProxyPlugin from proxy.common.utils import build_http_request, bytes_, build_http_response from proxy.common.constants import PROXY_AGENT_HEADER_VALUE, DEFAULT_HTTP_PORT diff --git a/tests/plugin/test_http_proxy_plugins_with_tls_interception.py b/tests/plugin/test_http_proxy_plugins_with_tls_interception.py index 42dc9e4a3d..6415755c80 100644 --- a/tests/plugin/test_http_proxy_plugins_with_tls_interception.py +++ b/tests/plugin/test_http_proxy_plugins_with_tls_interception.py @@ -21,7 +21,7 @@ from proxy.core.connection import TcpClientConnection, TcpServerConnection from proxy.http.codes import httpStatusCodes from proxy.http.methods import httpMethods -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.http.proxy import HttpProxyPlugin from .utils import get_plugin_by_test_name diff --git a/tests/test_main.py b/tests/test_main.py index 315652ba85..245817856f 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -14,7 +14,7 @@ from proxy.proxy import main, entry_point from proxy.common.utils import bytes_ -from proxy.http.handler import HttpProtocolHandler +from proxy.http import HttpProtocolHandler from proxy.common.flag import FlagParser from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT From 68d706e9e4547a434ee862e8d2d195fb88320cb2 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 02:19:29 +0530 Subject: [PATCH 20/26] Start to fix tests --- proxy/common/flag.py | 1 - proxy/proxy.py | 6 ++- tests/test_main.py | 106 +++++++++++++++++++++++++++++-------------- 3 files changed, 76 insertions(+), 37 deletions(-) diff --git a/proxy/common/flag.py b/proxy/common/flag.py index be6bcda564..ac353bc6da 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -129,7 +129,6 @@ def initialize( work_klass = work_klass \ if isinstance(work_klass, type) \ else Plugins.importer(work_klass)[0] - print(work_klass) # Generate auth_code required for basic authentication if enabled auth_code = None diff --git a/proxy/proxy.py b/proxy/proxy.py index f03648e678..1ffddc82ff 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -108,7 +108,11 @@ class Proxy(SetupShutdownContextManager): """ def __init__(self, **opts: Any) -> None: - self.flags = FlagParser.initialize(sys.argv[1:], **opts) + input_args = sys.argv[1:] + print(input_args) + print('*'*20) + self.flags = FlagParser.initialize(input_args, **opts) + print(self.flags) self.acceptors: Optional[AcceptorPool] = None self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None diff --git a/tests/test_main.py b/tests/test_main.py index 245817856f..38ecd4fc72 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -17,7 +17,7 @@ from proxy.http import HttpProtocolHandler from proxy.common.flag import FlagParser -from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT +from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_NUM_ACCEPTORS, DEFAULT_WORK_KLASS from proxy.common.constants import DEFAULT_TIMEOUT, DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HTTP_PROXY from proxy.common.constants import DEFAULT_ENABLE_STATIC_SERVER, DEFAULT_ENABLE_EVENTS, DEFAULT_ENABLE_DEVTOOLS from proxy.common.constants import DEFAULT_ENABLE_WEB_SERVER, DEFAULT_THREADLESS, DEFAULT_CERT_FILE, DEFAULT_KEY_FILE @@ -34,6 +34,7 @@ class TestMain(unittest.TestCase): @staticmethod def mock_default_args(mock_args: mock.Mock) -> None: + """Use when trying to mock parse_args""" mock_args.version = False mock_args.cert_file = DEFAULT_CERT_FILE mock_args.key_file = DEFAULT_KEY_FILE @@ -47,6 +48,7 @@ def mock_default_args(mock_args: mock.Mock) -> None: mock_args.basic_auth = DEFAULT_BASIC_AUTH mock_args.hostname = DEFAULT_IPV6_HOSTNAME mock_args.port = DEFAULT_PORT + mock_args.num_acceptors = DEFAULT_NUM_ACCEPTORS mock_args.num_workers = DEFAULT_NUM_WORKERS mock_args.disable_http_proxy = DEFAULT_DISABLE_HTTP_PROXY mock_args.pac_file = DEFAULT_PAC_FILE @@ -64,13 +66,16 @@ def mock_default_args(mock_args: mock.Mock) -> None: mock_args.enable_devtools = DEFAULT_ENABLE_DEVTOOLS mock_args.enable_events = DEFAULT_ENABLE_EVENTS mock_args.enable_dashboard = DEFAULT_ENABLE_DASHBOARD + mock_args.work_klass = DEFAULT_WORK_KLASS @mock.patch('time.sleep') @mock.patch('proxy.proxy.FlagParser.initialize') @mock.patch('proxy.proxy.EventManager') @mock.patch('proxy.proxy.AcceptorPool') + @mock.patch('proxy.proxy.ThreadlessPool') def test_entry_point( self, + mock_executor_pool: mock.Mock, mock_acceptor_pool: mock.Mock, mock_event_manager: mock.Mock, mock_initialize: mock.Mock, @@ -80,11 +85,16 @@ def test_entry_point( mock_initialize.return_value.enable_events = False entry_point() mock_event_manager.assert_not_called() - mock_acceptor_pool.assert_called_with( + mock_executor_pool.assert_called_once_with( flags=mock_initialize.return_value, - work_klass=HttpProtocolHandler, event_queue=None, ) + mock_acceptor_pool.assert_called_once_with( + flags=mock_initialize.return_value, + event_queue=None, + executor_queues=mock_executor_pool.return_value.work_queues, + executor_pids=mock_executor_pool.return_value.work_pids, + ) mock_acceptor_pool.return_value.setup.assert_called() mock_acceptor_pool.return_value.shutdown.assert_called() mock_sleep.assert_called() @@ -93,8 +103,10 @@ def test_entry_point( @mock.patch('proxy.proxy.FlagParser.initialize') @mock.patch('proxy.proxy.EventManager') @mock.patch('proxy.proxy.AcceptorPool') - def test_main_with_no_arguments( + @mock.patch('proxy.proxy.ThreadlessPool') + def test_main_with_no_flags( self, + mock_executor_pool: mock.Mock, mock_acceptor_pool: mock.Mock, mock_event_manager: mock.Mock, mock_initialize: mock.Mock, @@ -102,13 +114,18 @@ def test_main_with_no_arguments( ) -> None: mock_sleep.side_effect = KeyboardInterrupt() mock_initialize.return_value.enable_events = False - main([]) + main() mock_event_manager.assert_not_called() - mock_acceptor_pool.assert_called_with( + mock_executor_pool.assert_called_once_with( flags=mock_initialize.return_value, - work_klass=HttpProtocolHandler, event_queue=None, ) + mock_acceptor_pool.assert_called_once_with( + flags=mock_initialize.return_value, + event_queue=None, + executor_queues=mock_executor_pool.return_value.work_queues, + executor_pids=mock_executor_pool.return_value.work_pids, + ) mock_acceptor_pool.return_value.setup.assert_called() mock_acceptor_pool.return_value.shutdown.assert_called() mock_sleep.assert_called() @@ -117,23 +134,30 @@ def test_main_with_no_arguments( @mock.patch('proxy.proxy.FlagParser.initialize') @mock.patch('proxy.proxy.EventManager') @mock.patch('proxy.proxy.AcceptorPool') + @mock.patch('proxy.proxy.ThreadlessPool') def test_enable_events( - self, - mock_acceptor_pool: mock.Mock, - mock_event_manager: mock.Mock, - mock_initialize: mock.Mock, - mock_sleep: mock.Mock, + self, + mock_executor_pool: mock.Mock, + mock_acceptor_pool: mock.Mock, + mock_event_manager: mock.Mock, + mock_initialize: mock.Mock, + mock_sleep: mock.Mock, ) -> None: mock_sleep.side_effect = KeyboardInterrupt() mock_initialize.return_value.enable_events = True - main([]) + main() mock_event_manager.assert_called_once() mock_event_manager.return_value.setup.assert_called_once() mock_event_manager.return_value.shutdown.assert_called_once() - mock_acceptor_pool.assert_called_with( + mock_executor_pool.assert_called_once_with( + flags=mock_initialize.return_value, + event_queue=mock_event_manager.return_value.queue, + ) + mock_acceptor_pool.assert_called_once_with( flags=mock_initialize.return_value, - work_klass=HttpProtocolHandler, event_queue=mock_event_manager.return_value.queue, + executor_queues=mock_executor_pool.return_value.work_queues, + executor_pids=mock_executor_pool.return_value.work_pids, ) mock_acceptor_pool.return_value.setup.assert_called() mock_acceptor_pool.return_value.shutdown.assert_called() @@ -144,19 +168,21 @@ def test_enable_events( @mock.patch('proxy.common.flag.FlagParser.parse_args') @mock.patch('proxy.proxy.EventManager') @mock.patch('proxy.proxy.AcceptorPool') + @mock.patch('proxy.proxy.ThreadlessPool') def test_enable_dashboard( - self, - mock_acceptor_pool: mock.Mock, - mock_event_manager: mock.Mock, - mock_parse_args: mock.Mock, - mock_load_plugins: mock.Mock, - mock_sleep: mock.Mock, + self, + mock_executor_pool: mock.Mock, + mock_acceptor_pool: mock.Mock, + mock_event_manager: mock.Mock, + mock_parse_args: mock.Mock, + mock_load_plugins: mock.Mock, + mock_sleep: mock.Mock, ) -> None: mock_sleep.side_effect = KeyboardInterrupt() mock_args = mock_parse_args.return_value self.mock_default_args(mock_args) mock_args.enable_dashboard = True - main(['--enable-dashboard']) + main(enable_dashboard=True) mock_load_plugins.assert_called() self.assertEqual( mock_load_plugins.call_args_list[0][0][0], [ @@ -167,32 +193,37 @@ def test_enable_dashboard( bytes_(PLUGIN_HTTP_PROXY), ], ) + # TODO: Assert arguments passed to parse_arg mock_parse_args.assert_called_once() - mock_acceptor_pool.assert_called() - mock_acceptor_pool.return_value.setup.assert_called() # dashboard will also enable eventing mock_event_manager.assert_called_once() mock_event_manager.return_value.setup.assert_called_once() mock_event_manager.return_value.shutdown.assert_called_once() + mock_executor_pool.assert_called_once() + mock_executor_pool.return_value.setup.assert_called_once() + mock_acceptor_pool.assert_called_once() + mock_acceptor_pool.return_value.setup.assert_called_once() @mock.patch('time.sleep') @mock.patch('proxy.common.plugins.Plugins.load') @mock.patch('proxy.common.flag.FlagParser.parse_args') @mock.patch('proxy.proxy.EventManager') @mock.patch('proxy.proxy.AcceptorPool') + @mock.patch('proxy.proxy.ThreadlessPool') def test_enable_devtools( - self, - mock_acceptor_pool: mock.Mock, - mock_event_manager: mock.Mock, - mock_parse_args: mock.Mock, - mock_load_plugins: mock.Mock, - mock_sleep: mock.Mock, + self, + mock_executor_pool: mock.Mock, + mock_acceptor_pool: mock.Mock, + mock_event_manager: mock.Mock, + mock_parse_args: mock.Mock, + mock_load_plugins: mock.Mock, + mock_sleep: mock.Mock, ) -> None: mock_sleep.side_effect = KeyboardInterrupt() mock_args = mock_parse_args.return_value self.mock_default_args(mock_args) mock_args.enable_devtools = True - main(['--enable-devtools']) + main(enable_devtools=True) mock_load_plugins.assert_called() self.assertEqual( mock_load_plugins.call_args_list[0][0][0], [ @@ -202,10 +233,12 @@ def test_enable_devtools( ], ) mock_parse_args.assert_called_once() - mock_acceptor_pool.assert_called() - mock_acceptor_pool.return_value.setup.assert_called() - # Currently --enable-devtools alone doesn't enable eventing core + # Currently --enable-devtools flag alone doesn't enable eventing core mock_event_manager.assert_not_called() + mock_executor_pool.assert_called_once() + mock_executor_pool.return_value.setup.assert_called_once() + mock_acceptor_pool.assert_called_once() + mock_acceptor_pool.return_value.setup.assert_called_once() @mock.patch('time.sleep') @mock.patch('proxy.proxy.EventManager') @@ -271,12 +304,15 @@ def test_main_py2_exit( mock_is_py2.assert_called() @mock.patch('builtins.print') + @mock.patch('sys.argv') def test_main_version( self, + mock_sys_argv: mock.Mock, mock_print: mock.Mock, ) -> None: + mock_sys_argv.return_value = ['proxy', '--version'] with self.assertRaises(SystemExit) as e: - main(['--version']) + main() mock_print.assert_called_with(__version__) self.assertEqual(e.exception.code, 0) From 123248d7e169e5fe9581fd201abf5cc9bd88af00 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 03:44:56 +0530 Subject: [PATCH 21/26] Fix tests --- proxy/common/flag.py | 10 ++-- proxy/core/acceptor/acceptor.py | 4 +- proxy/core/acceptor/executors.py | 2 +- proxy/proxy.py | 13 ++---- proxy/testing/test_case.py | 2 +- tests/common/test_flags.py | 45 ++++++++++++++++-- tests/core/test_acceptor.py | 16 ++++--- tests/core/test_acceptor_pool.py | 4 +- tests/test_main.py | 78 -------------------------------- 9 files changed, 64 insertions(+), 110 deletions(-) diff --git a/proxy/common/flag.py b/proxy/common/flag.py index ac353bc6da..eee80b8222 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -84,8 +84,8 @@ def parse_args( @staticmethod def initialize( - input_args: Optional[List[str]] - = None, **opts: Any, + input_args: Optional[List[str]] = None, + **opts: Any, ) -> argparse.Namespace: if input_args is None: input_args = [] @@ -126,9 +126,9 @@ def initialize( # Load work_klass work_klass = opts.get('work_klass', args.work_klass) - work_klass = work_klass \ - if isinstance(work_klass, type) \ - else Plugins.importer(work_klass)[0] + work_klass = Plugins.importer(work_klass)[0] \ + if isinstance(work_klass, str) \ + else work_klass # Generate auth_code required for basic authentication if enabled auth_code = None diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index a9fc271946..b3c00d9558 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -18,12 +18,10 @@ from multiprocessing import connection from multiprocessing.reduction import recv_handle -from typing import List, Optional, Tuple, Type +from typing import List, Optional, Tuple from proxy.core.acceptor.executors import ThreadlessPool -from .work import Work - from ..event import EventQueue from ...common.utils import is_threadless diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index ca73f448df..5873f1debf 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -17,7 +17,7 @@ from multiprocessing import connection from multiprocessing.reduction import send_handle -from typing import Optional, List, Tuple, Type +from typing import Optional, List, Tuple from .work import Work from .threadless import Threadless diff --git a/proxy/proxy.py b/proxy/proxy.py index 1ffddc82ff..112eb2a0a7 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -12,11 +12,10 @@ import time import logging -from typing import List, Optional, Any, Type +from typing import List, Optional, Any -from .core.acceptor import AcceptorPool, ThreadlessPool, Work +from .core.acceptor import AcceptorPool, ThreadlessPool from .core.event import EventManager -from .http import HttpProtocolHandler from .common.flag import FlagParser, flags from .common.constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL from .common.constants import DEFAULT_OPEN_FILE_LIMIT, DEFAULT_PLUGINS, DEFAULT_VERSION @@ -107,12 +106,8 @@ class Proxy(SetupShutdownContextManager): for message sharing and/or signaling. """ - def __init__(self, **opts: Any) -> None: - input_args = sys.argv[1:] - print(input_args) - print('*'*20) + def __init__(self, input_args: Optional[List[str]] = None, **opts: Any) -> None: self.flags = FlagParser.initialize(input_args, **opts) - print(self.flags) self.acceptors: Optional[AcceptorPool] = None self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None @@ -174,7 +169,7 @@ def shutdown(self) -> None: def main(**opts: Any) -> None: try: - with Proxy(**opts): + with Proxy(sys.argv[1:], **opts): while True: time.sleep(1) except KeyboardInterrupt: diff --git a/proxy/testing/test_case.py b/proxy/testing/test_case.py index 317acd51c9..2bec2778d1 100644 --- a/proxy/testing/test_case.py +++ b/proxy/testing/test_case.py @@ -41,7 +41,7 @@ def setUpClass(cls) -> None: cls.INPUT_ARGS.append('--port') cls.INPUT_ARGS.append('0') - cls.PROXY = Proxy(input_args=cls.INPUT_ARGS) + cls.PROXY = Proxy(cls.INPUT_ARGS) cls.PROXY.flags.plugins[b'HttpProxyBasePlugin'].append( CacheResponsesPlugin, ) diff --git a/tests/common/test_flags.py b/tests/common/test_flags.py index 25c82124ca..a8791d5ce4 100644 --- a/tests/common/test_flags.py +++ b/tests/common/test_flags.py @@ -8,16 +8,18 @@ :copyright: (c) 2013-present by Abhinav Singh and contributors. :license: BSD, see LICENSE for more details. """ -from proxy.common.utils import bytes_ -from proxy.common.constants import PLUGIN_HTTP_PROXY import unittest +from unittest import mock from typing import List, Dict -from proxy.common.flag import FlagParser from proxy.http.proxy import HttpProxyPlugin from proxy.plugin import CacheResponsesPlugin from proxy.plugin import FilterByUpstreamHostPlugin +from proxy.common.utils import bytes_ +from proxy.common.flag import FlagParser +from proxy.common.version import __version__ +from proxy.common.constants import PLUGIN_HTTP_PROXY, PY2_DEPRECATION_MESSAGE class TestFlags(unittest.TestCase): @@ -139,6 +141,43 @@ def test_unique_plugin_from_class(self) -> None: ], }) + def test_basic_auth_flag_is_base64_encoded(self) -> None: + flags = FlagParser.initialize(['--basic-auth', 'user:pass']) + self.assertEqual(flags.auth_code, b'dXNlcjpwYXNz') + + @mock.patch('builtins.print') + def test_main_version(self, mock_print: mock.Mock) -> None: + with self.assertRaises(SystemExit) as e: + FlagParser.initialize(['--version']) + mock_print.assert_called_with(__version__) + self.assertEqual(e.exception.code, 0) + + @mock.patch('builtins.print') + @mock.patch('proxy.common.flag.is_py2') + def test_main_py2_exit( + self, + mock_is_py2: mock.Mock, + mock_print: mock.Mock, + ) -> None: + mock_is_py2.return_value = True + with self.assertRaises(SystemExit) as e: + FlagParser.initialize() + mock_print.assert_called_with(PY2_DEPRECATION_MESSAGE) + self.assertEqual(e.exception.code, 1) + mock_is_py2.assert_called() + + @mock.patch('builtins.print') + @mock.patch('proxy.common.flag.is_py2') + def test_main_py3_runs( + self, + mock_is_py2: mock.Mock, + mock_print: mock.Mock, + ) -> None: + mock_is_py2.return_value = False + FlagParser.initialize() + mock_is_py2.assert_called() + mock_print.assert_not_called() + if __name__ == '__main__': unittest.main() diff --git a/tests/core/test_acceptor.py b/tests/core/test_acceptor.py index 595513f4db..dde5a50c78 100644 --- a/tests/core/test_acceptor.py +++ b/tests/core/test_acceptor.py @@ -16,21 +16,23 @@ from proxy.core.acceptor import Acceptor from proxy.common.flag import FlagParser +from proxy.http import HttpProtocolHandler class TestAcceptor(unittest.TestCase): def setUp(self) -> None: self.acceptor_id = 1 - self.mock_protocol_handler = mock.MagicMock() self.pipe = multiprocessing.Pipe() - self.flags = FlagParser.initialize(threaded=True) + self.flags = FlagParser.initialize( + threaded=True, work_klass=mock.MagicMock()) self.acceptor = Acceptor( idd=self.acceptor_id, work_queue=self.pipe[1], flags=self.flags, lock=multiprocessing.Lock(), - work_klass=self.mock_protocol_handler, + executor_queues=[], + executor_pids=[], ) @mock.patch('selectors.DefaultSelector') @@ -55,9 +57,9 @@ def test_continues_when_no_events( self.acceptor.run() sock.accept.assert_not_called() - self.mock_protocol_handler.assert_not_called() + self.flags.work_klass.assert_not_called() - @mock.patch('proxy.core.acceptor.acceptor.TcpClientConnection') + @mock.patch('proxy.core.acceptor.executors.TcpClientConnection') @mock.patch('threading.Thread') @mock.patch('selectors.DefaultSelector') @mock.patch('socket.fromfd') @@ -92,13 +94,13 @@ def test_accepts_client_from_server_socket( family=socket.AF_INET6, type=socket.SOCK_STREAM, ) - self.mock_protocol_handler.assert_called_with( + self.flags.work_klass.assert_called_with( mock_client.return_value, flags=self.flags, event_queue=None, ) mock_thread.assert_called_with( - target=self.mock_protocol_handler.return_value.run, + target=self.flags.work_klass.return_value.run, ) mock_thread.return_value.start.assert_called() sock.close.assert_called() diff --git a/tests/core/test_acceptor_pool.py b/tests/core/test_acceptor_pool.py index 66a910fe7a..e2b106425f 100644 --- a/tests/core/test_acceptor_pool.py +++ b/tests/core/test_acceptor_pool.py @@ -46,16 +46,14 @@ def test_setup_and_shutdown( num_workers = 2 pid_file = os.path.join(tempfile.gettempdir(), 'pid') sock = mock_socket.return_value - work_klass = mock.MagicMock() flags = FlagParser.initialize( num_workers=2, pid_file=pid_file, threaded=True, ) - pool = AcceptorPool(flags=flags, work_klass=work_klass) + pool = AcceptorPool(flags=flags, executor_queues=[], executor_pids=[]) pool.setup() mock_send_handle.assert_called() - work_klass.assert_not_called() mock_socket.assert_called_with( socket.AF_INET6 if pool.flags.hostname.version == 6 else socket.AF_INET, socket.SOCK_STREAM, diff --git a/tests/test_main.py b/tests/test_main.py index 38ecd4fc72..06322d8a93 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -14,8 +14,6 @@ from proxy.proxy import main, entry_point from proxy.common.utils import bytes_ -from proxy.http import HttpProtocolHandler -from proxy.common.flag import FlagParser from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_NUM_ACCEPTORS, DEFAULT_WORK_KLASS from proxy.common.constants import DEFAULT_TIMEOUT, DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HTTP_PROXY @@ -240,82 +238,6 @@ def test_enable_devtools( mock_acceptor_pool.assert_called_once() mock_acceptor_pool.return_value.setup.assert_called_once() - @mock.patch('time.sleep') - @mock.patch('proxy.proxy.EventManager') - @mock.patch('proxy.proxy.AcceptorPool') - def test_basic_auth_flag_is_base64_encoded( - self, - mock_acceptor_pool: mock.Mock, - mock_event_manager: mock.Mock, - mock_sleep: mock.Mock, - ) -> None: - mock_sleep.side_effect = KeyboardInterrupt() - - input_args = ['--basic-auth', 'user:pass'] - flgs = FlagParser.initialize(input_args) - - main(input_args=input_args) - mock_event_manager.assert_not_called() - mock_acceptor_pool.assert_called_once() - self.assertEqual( - flgs.auth_code, - b'dXNlcjpwYXNz', - ) - - @mock.patch('time.sleep') - @mock.patch('builtins.print') - @mock.patch('proxy.proxy.EventManager') - @mock.patch('proxy.proxy.AcceptorPool') - @mock.patch('proxy.common.flag.is_py2') - def test_main_py3_runs( - self, - mock_is_py2: mock.Mock, - mock_acceptor_pool: mock.Mock, - mock_event_manager: mock.Mock, - mock_print: mock.Mock, - mock_sleep: mock.Mock, - ) -> None: - mock_sleep.side_effect = KeyboardInterrupt() - - input_args = ['--basic-auth', 'user:pass'] - mock_is_py2.return_value = False - - main(input_args, num_workers=1) - - mock_is_py2.assert_called() - mock_print.assert_not_called() - - mock_event_manager.assert_not_called() - mock_acceptor_pool.assert_called_once() - mock_acceptor_pool.return_value.setup.assert_called() - - @mock.patch('builtins.print') - @mock.patch('proxy.common.flag.is_py2') - def test_main_py2_exit( - self, - mock_is_py2: mock.Mock, - mock_print: mock.Mock, - ) -> None: - mock_is_py2.return_value = True - with self.assertRaises(SystemExit) as e: - main(num_workers=1) - mock_print.assert_called_with(PY2_DEPRECATION_MESSAGE) - self.assertEqual(e.exception.code, 1) - mock_is_py2.assert_called() - - @mock.patch('builtins.print') - @mock.patch('sys.argv') - def test_main_version( - self, - mock_sys_argv: mock.Mock, - mock_print: mock.Mock, - ) -> None: - mock_sys_argv.return_value = ['proxy', '--version'] - with self.assertRaises(SystemExit) as e: - main() - mock_print.assert_called_with(__version__) - self.assertEqual(e.exception.code, 0) - # def test_pac_file(self) -> None: # pass From 3a7390167efb9715941ff504c125b1dd84ebfcba Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 04:01:53 +0530 Subject: [PATCH 22/26] mypy and flake8 --- examples/ssl_echo_server.py | 1 - examples/tcp_echo_server.py | 1 - examples/web_scraper.py | 1 - proxy/common/context_managers.py | 46 ------------------------------- proxy/common/flag.py | 2 +- proxy/core/acceptor/executors.py | 12 ++++++-- proxy/core/acceptor/pool.py | 17 ++++++++---- proxy/core/acceptor/threadless.py | 2 +- proxy/core/event/manager.py | 12 ++++++-- proxy/proxy.py | 10 +++++-- tests/core/test_acceptor.py | 1 - tests/test_main.py | 7 ++--- 12 files changed, 42 insertions(+), 70 deletions(-) delete mode 100644 proxy/common/context_managers.py diff --git a/examples/ssl_echo_server.py b/examples/ssl_echo_server.py index 11d2331cc9..65432f3719 100644 --- a/examples/ssl_echo_server.py +++ b/examples/ssl_echo_server.py @@ -12,7 +12,6 @@ from typing import Optional from proxy import Proxy -from proxy.common.flag import FlagParser from proxy.common.utils import wrap_socket from proxy.core.connection import TcpClientConnection diff --git a/examples/tcp_echo_server.py b/examples/tcp_echo_server.py index 821128cd74..cd4924150f 100644 --- a/examples/tcp_echo_server.py +++ b/examples/tcp_echo_server.py @@ -12,7 +12,6 @@ from typing import Optional from proxy import Proxy -from proxy.common.flag import FlagParser from proxy.core.base import BaseTcpServerHandler diff --git a/examples/web_scraper.py b/examples/web_scraper.py index 8fbafcda96..0cd638ade7 100644 --- a/examples/web_scraper.py +++ b/examples/web_scraper.py @@ -14,7 +14,6 @@ from typing import Dict from proxy import Proxy -from proxy.common.flag import FlagParser from proxy.core.acceptor import Work from proxy.common.types import Readables, Writables diff --git a/proxy/common/context_managers.py b/proxy/common/context_managers.py deleted file mode 100644 index 2836b80c19..0000000000 --- a/proxy/common/context_managers.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: utf-8 -*- -""" - proxy.py - ~~~~~~~~ - ⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on - Network monitoring, controls & Application development, testing, debugging. - - :copyright: (c) 2013-present by Abhinav Singh and contributors. - :license: BSD, see LICENSE for more details. -""" -from abc import ABC, abstractmethod -from types import TracebackType -from typing import Optional, Type - - -class SetupShutdownContextManager(ABC): - """An abstract context manager which expects - implementations to provide a setup() and shutdown() - implementation instead of __enter__ and __exit__ methods. - - Note that, currently, SetupShutdownContextManager by - default return instance of the class and doesn't provide - implementations to override and return anything else. - - If you want to return anything else but the class instance, - do not use SetupShutdownContextManager. - """ - - def __enter__(self) -> 'SetupShutdownContextManager': - self.setup() - return self - - def __exit__( - self, - exc_type: Optional[Type[BaseException]], - exc_val: Optional[BaseException], - exc_tb: Optional[TracebackType], - ) -> None: - self.shutdown() - - @abstractmethod - def setup(self) -> None: - raise NotImplementedError() - - def shutdown(self) -> None: - raise NotImplementedError() diff --git a/proxy/common/flag.py b/proxy/common/flag.py index eee80b8222..88609d5f81 100644 --- a/proxy/common/flag.py +++ b/proxy/common/flag.py @@ -126,7 +126,7 @@ def initialize( # Load work_klass work_klass = opts.get('work_klass', args.work_klass) - work_klass = Plugins.importer(work_klass)[0] \ + work_klass = Plugins.importer(bytes_(work_klass))[0] \ if isinstance(work_klass, str) \ else work_klass diff --git a/proxy/core/acceptor/executors.py b/proxy/core/acceptor/executors.py index 5873f1debf..0070fade7e 100644 --- a/proxy/core/acceptor/executors.py +++ b/proxy/core/acceptor/executors.py @@ -17,7 +17,7 @@ from multiprocessing import connection from multiprocessing.reduction import send_handle -from typing import Optional, List, Tuple +from typing import Any, Optional, List, Tuple from .work import Work from .threadless import Threadless @@ -25,7 +25,6 @@ from ..connection import TcpClientConnection from ..event import EventQueue, eventNames -from ...common.context_managers import SetupShutdownContextManager from ...common.flag import flags from ...common.utils import is_threadless from ...common.constants import DEFAULT_NUM_WORKERS, DEFAULT_THREADLESS @@ -61,7 +60,7 @@ ) -class ThreadlessPool(SetupShutdownContextManager): +class ThreadlessPool: """Manages lifecycle of threadless pool and delegates work to them using a round-robin strategy. @@ -94,6 +93,13 @@ def __init__( # List of threadless workers self._workers: List[Threadless] = [] + def __enter__(self) -> 'ThreadlessPool': + self.setup() + return self + + def __exit__(self, *args: Any) -> None: + self.shutdown() + @staticmethod def delegate( worker_pid: int, diff --git a/proxy/core/acceptor/pool.py b/proxy/core/acceptor/pool.py index 38a5edf43f..e5fa4d9c79 100644 --- a/proxy/core/acceptor/pool.py +++ b/proxy/core/acceptor/pool.py @@ -9,22 +9,20 @@ :license: BSD, see LICENSE for more details. """ import os -import argparse +import socket import logging +import argparse import multiprocessing -import socket from multiprocessing import connection from multiprocessing.reduction import send_handle -from typing import List, Optional, Type +from typing import Any, List, Optional from .acceptor import Acceptor -from .work import Work from ..event import EventQueue -from ...common.context_managers import SetupShutdownContextManager from ...common.utils import bytes_ from ...common.flag import flags from ...common.constants import DEFAULT_BACKLOG, DEFAULT_IPV6_HOSTNAME @@ -80,7 +78,7 @@ ) -class AcceptorPool(SetupShutdownContextManager): +class AcceptorPool: """AcceptorPool is a helper class which pre-spawns `Acceptor` processes to utilize all available CPU cores for accepting new work. @@ -116,6 +114,13 @@ def __init__( self.executor_queues: List[connection.Connection] = executor_queues self.executor_pids: List[int] = executor_pids + def __enter__(self) -> 'AcceptorPool': + self.setup() + return self + + def __exit__(self, *args: Any) -> None: + self.shutdown() + def setup(self) -> None: """Setup socket and acceptors.""" self._write_pid_file() diff --git a/proxy/core/acceptor/threadless.py b/proxy/core/acceptor/threadless.py index d94f4db6b6..aa8c9c28dd 100644 --- a/proxy/core/acceptor/threadless.py +++ b/proxy/core/acceptor/threadless.py @@ -19,7 +19,7 @@ from multiprocessing import connection from multiprocessing.reduction import recv_handle -from typing import Dict, Optional, Tuple, List, Generator, Any, Type +from typing import Dict, Optional, Tuple, List, Generator, Any from .work import Work diff --git a/proxy/core/event/manager.py b/proxy/core/event/manager.py index 1d2fefb09e..20d5c6e7c8 100644 --- a/proxy/core/event/manager.py +++ b/proxy/core/event/manager.py @@ -12,12 +12,11 @@ import threading import multiprocessing -from typing import Optional +from typing import Any, Optional from .queue import EventQueue from .dispatcher import EventDispatcher -from ...common.context_managers import SetupShutdownContextManager from ...common.flag import flags from ...common.constants import DEFAULT_ENABLE_EVENTS @@ -33,7 +32,7 @@ ) -class EventManager(SetupShutdownContextManager): +class EventManager: """Event manager is a context manager which provides encapsulation around various setup and shutdown steps to start the eventing core. @@ -46,6 +45,13 @@ def __init__(self) -> None: self.dispatcher_shutdown: Optional[threading.Event] = None self.manager: Optional[multiprocessing.managers.SyncManager] = None + def __enter__(self) -> 'EventManager': + self.setup() + return self + + def __exit__(self, *args: Any) -> None: + self.shutdown() + def setup(self) -> None: self.manager = multiprocessing.Manager() self.queue = EventQueue(self.manager.Queue()) diff --git a/proxy/proxy.py b/proxy/proxy.py index 112eb2a0a7..e20b7ec2d6 100644 --- a/proxy/proxy.py +++ b/proxy/proxy.py @@ -20,7 +20,6 @@ from .common.constants import DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_LOG_LEVEL from .common.constants import DEFAULT_OPEN_FILE_LIMIT, DEFAULT_PLUGINS, DEFAULT_VERSION from .common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_WORK_KLASS -from .common.context_managers import SetupShutdownContextManager logger = logging.getLogger(__name__) @@ -95,7 +94,7 @@ ) -class Proxy(SetupShutdownContextManager): +class Proxy: """Context manager to control AcceptorPool, ExecutorPool & EventingCore lifecycle. By default, AcceptorPool is started with `HttpProtocolHandler` work class. @@ -112,6 +111,13 @@ def __init__(self, input_args: Optional[List[str]] = None, **opts: Any) -> None: self.executors: Optional[ThreadlessPool] = None self.event_manager: Optional[EventManager] = None + def __enter__(self) -> 'Proxy': + self.setup() + return self + + def __exit__(self, *args: Any) -> None: + self.shutdown() + def setup(self) -> None: # TODO: Introduce cron feature # https://github.com/abhinavsingh/proxy.py/issues/392 diff --git a/tests/core/test_acceptor.py b/tests/core/test_acceptor.py index dde5a50c78..d280430808 100644 --- a/tests/core/test_acceptor.py +++ b/tests/core/test_acceptor.py @@ -16,7 +16,6 @@ from proxy.core.acceptor import Acceptor from proxy.common.flag import FlagParser -from proxy.http import HttpProtocolHandler class TestAcceptor(unittest.TestCase): diff --git a/tests/test_main.py b/tests/test_main.py index 06322d8a93..21137b6d10 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -15,17 +15,16 @@ from proxy.proxy import main, entry_point from proxy.common.utils import bytes_ -from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT, DEFAULT_NUM_ACCEPTORS, DEFAULT_WORK_KLASS +from proxy.common.constants import DEFAULT_ENABLE_DASHBOARD, DEFAULT_LOG_LEVEL, DEFAULT_LOG_FILE, DEFAULT_LOG_FORMAT from proxy.common.constants import DEFAULT_TIMEOUT, DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HTTP_PROXY from proxy.common.constants import DEFAULT_ENABLE_STATIC_SERVER, DEFAULT_ENABLE_EVENTS, DEFAULT_ENABLE_DEVTOOLS from proxy.common.constants import DEFAULT_ENABLE_WEB_SERVER, DEFAULT_THREADLESS, DEFAULT_CERT_FILE, DEFAULT_KEY_FILE from proxy.common.constants import DEFAULT_CA_CERT_FILE, DEFAULT_CA_KEY_FILE, DEFAULT_CA_SIGNING_KEY_FILE from proxy.common.constants import DEFAULT_PAC_FILE, DEFAULT_PLUGINS, DEFAULT_PID_FILE, DEFAULT_PORT, DEFAULT_BASIC_AUTH from proxy.common.constants import DEFAULT_NUM_WORKERS, DEFAULT_OPEN_FILE_LIMIT, DEFAULT_IPV6_HOSTNAME -from proxy.common.constants import DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_CLIENT_RECVBUF_SIZE, PY2_DEPRECATION_MESSAGE +from proxy.common.constants import DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_CLIENT_RECVBUF_SIZE, DEFAULT_WORK_KLASS from proxy.common.constants import PLUGIN_INSPECT_TRAFFIC, PLUGIN_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL, PLUGIN_WEB_SERVER -from proxy.common.constants import PLUGIN_HTTP_PROXY -from proxy.common.version import __version__ +from proxy.common.constants import PLUGIN_HTTP_PROXY, DEFAULT_NUM_ACCEPTORS class TestMain(unittest.TestCase): From 5e1e860d6acf5cd539ef99ea0dc2036096edd4a4 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 04:02:33 +0530 Subject: [PATCH 23/26] Trailing comma --- tests/core/test_acceptor.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/core/test_acceptor.py b/tests/core/test_acceptor.py index d280430808..274399c371 100644 --- a/tests/core/test_acceptor.py +++ b/tests/core/test_acceptor.py @@ -24,7 +24,8 @@ def setUp(self) -> None: self.acceptor_id = 1 self.pipe = multiprocessing.Pipe() self.flags = FlagParser.initialize( - threaded=True, work_klass=mock.MagicMock()) + threaded=True, work_klass=mock.MagicMock(), + ) self.acceptor = Acceptor( idd=self.acceptor_id, work_queue=self.pipe[1], From bf3b466d629b23fc1c66b1672fcf6f753df66241 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 04:09:53 +0530 Subject: [PATCH 24/26] Remove unused var --- proxy/core/acceptor/acceptor.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index b3c00d9558..9202e6f9dd 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -135,20 +135,20 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: self.flags.unix_socket_path, ) logger.debug( - 'Dispatched work#{0} from acceptor#{1} to worker#{2}'.format( - self._total, self.idd, index, + 'Dispatched work#{0}.{0} to worker#{2}'.format( + self.idd, self._total, index, ), ) else: - work, thread = ThreadlessPool.start_threaded_work( + _, thread = ThreadlessPool.start_threaded_work( self.flags, conn, addr, event_queue=self.event_queue, publisher_id=self.__class__.__name__, ) logger.debug( - 'Started work#{0} in thread#{1}'.format( - self._total, thread.ident, + 'Started work#{0}.{0} in thread#{1}'.format( + self.idd, self._total, thread.ident, ), ) self._total += 1 From 6fb669a9d3f7a398b4d1870ef6aa64d6f8d34930 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 04:11:46 +0530 Subject: [PATCH 25/26] Unused arg --- proxy/core/acceptor/acceptor.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index 9202e6f9dd..f40d846784 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -135,7 +135,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: self.flags.unix_socket_path, ) logger.debug( - 'Dispatched work#{0}.{0} to worker#{2}'.format( + 'Dispatched work#{0}.{1} to worker#{2}'.format( self.idd, self._total, index, ), ) @@ -147,7 +147,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: publisher_id=self.__class__.__name__, ) logger.debug( - 'Started work#{0}.{0} in thread#{1}'.format( + 'Started work#{0}.{1} in thread#{1}'.format( self.idd, self._total, thread.ident, ), ) From d5aa4cec667dece6ebd79617e9d8c034dabbcbf4 Mon Sep 17 00:00:00 2001 From: Abhinav Singh Date: Wed, 10 Nov 2021 04:14:41 +0530 Subject: [PATCH 26/26] uff --- proxy/core/acceptor/acceptor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/core/acceptor/acceptor.py b/proxy/core/acceptor/acceptor.py index f40d846784..18758f53f7 100644 --- a/proxy/core/acceptor/acceptor.py +++ b/proxy/core/acceptor/acceptor.py @@ -147,7 +147,7 @@ def _work(self, conn: socket.socket, addr: Optional[Tuple[str, int]]) -> None: publisher_id=self.__class__.__name__, ) logger.debug( - 'Started work#{0}.{1} in thread#{1}'.format( + 'Started work#{0}.{1} in thread#{2}'.format( self.idd, self._total, thread.ident, ), )