Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple transport framework from dashboard plugin #953

Merged
merged 5 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dashboard/src/core/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class WebsocketApi {
private hostname: string = window.location.hostname ? window.location.hostname : 'localhost';
private port: number = window.location.port ? Number(window.location.port) : 8899;
// TODO: Must map to route registered by dashboard.py, don't hardcode
private wsPrefix: string = '/dashboard';
private wsPrefix: string = '/transport/';
private wsScheme: string = window.location.protocol === 'http:' ? 'ws' : 'wss';
private ws: WebSocket;
private wsPath: string = this.wsScheme + '://' + this.hostname + ':' + this.port + this.wsPrefix;
Expand Down
11 changes: 6 additions & 5 deletions proxy/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,16 @@ def _env_threadless_compliant() -> bool:
'HttpProtocolHandlerPlugin',
'HttpProxyBasePlugin',
'HttpWebServerBasePlugin',
'ProxyDashboardWebsocketPlugin',
'WebSocketTransportBasePlugin',
]
PLUGIN_PROXY_AUTH = 'proxy.http.proxy.AuthPlugin'
PLUGIN_DASHBOARD = 'proxy.dashboard.ProxyDashboard'
PLUGIN_HTTP_PROXY = 'proxy.http.proxy.HttpProxyPlugin'
PLUGIN_WEB_SERVER = 'proxy.http.server.HttpWebServerPlugin'
PLUGIN_PAC_FILE = 'proxy.http.server.HttpWebServerPacFilePlugin'
PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.DevtoolsProtocolPlugin'
PLUGIN_DASHBOARD = 'proxy.dashboard.ProxyDashboard'
PLUGIN_INSPECT_TRAFFIC = 'proxy.dashboard.InspectTrafficPlugin'
PLUGIN_PROXY_AUTH = 'proxy.http.proxy.AuthPlugin'
PLUGIN_DEVTOOLS_PROTOCOL = 'proxy.http.inspector.devtools.DevtoolsProtocolPlugin'
PLUGIN_INSPECT_TRAFFIC = 'proxy.http.inspector.inspect_traffic.InspectTrafficPlugin'
PLUGIN_WEBSOCKET_TRANSPORT = 'proxy.http.websocket.transport.WebSocketTransport'

PY2_DEPRECATION_MESSAGE = '''DEPRECATION: proxy.py no longer supports Python 2.7. Kindly upgrade to Python 3+. '
'If for some reasons you cannot upgrade, use'
Expand Down
3 changes: 2 additions & 1 deletion proxy/common/flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from .constants import DEFAULT_DEVTOOLS_WS_PATH, DEFAULT_DISABLE_HEADERS, PY2_DEPRECATION_MESSAGE
from .constants import PLUGIN_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL, DEFAULT_MIN_COMPRESSION_LIMIT
from .constants import PLUGIN_HTTP_PROXY, PLUGIN_INSPECT_TRAFFIC, PLUGIN_PAC_FILE
from .constants import PLUGIN_WEB_SERVER, PLUGIN_PROXY_AUTH, IS_WINDOWS
from .constants import PLUGIN_WEB_SERVER, PLUGIN_PROXY_AUTH, IS_WINDOWS, PLUGIN_WEBSOCKET_TRANSPORT
from .logger import Logger

from .version import __version__
Expand Down Expand Up @@ -395,6 +395,7 @@ def get_default_plugins(
default_plugins.append(PLUGIN_WEB_SERVER)
args.enable_static_server = True
default_plugins.append(PLUGIN_DASHBOARD)
default_plugins.append(PLUGIN_WEBSOCKET_TRANSPORT)
default_plugins.append(PLUGIN_INSPECT_TRAFFIC)
args.enable_events = True
args.enable_devtools = True
Expand Down
7 changes: 4 additions & 3 deletions proxy/core/event/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@


class EventQueue:
"""Global event queue. Must be a multiprocessing.Manager queue because
subscribers need to dispatch their subscription queue over this global
queue.
"""Global event queue. Must be a multiprocess safe queue capable of
transporting other queues. This is necessary because currently
subscribers use a separate subscription queue to consume events.
Subscription queue is exchanged over the global event queue.

Each published event contains following schema::

Expand Down
22 changes: 10 additions & 12 deletions proxy/core/event/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ class EventSubscriber:
can be different from publishers. Publishers can even be processes
outside of the proxy.py core.

Note that, EventSubscriber cannot share the `multiprocessing.Manager`
with the EventManager. Because EventSubscriber can be started
in a different process than EventManager.

`multiprocessing.Manager` is used to initialize
a new Queue which is used for subscriptions. EventDispatcher
might be running in a separate process and hence
subscription queue must be multiprocess safe.

When `subscribe` method is called, EventManager will
start a relay thread which consumes using the multiprocess
safe queue passed to the relay thread.
`multiprocessing.Pipe` is used to initialize a new Queue for
receiving subscribed events from eventing core. Note that,
core EventDispatcher might be running in a separate process
and hence subscription queue must be multiprocess safe.

When `subscribe` method is called, EventManager stars
a relay thread which consumes event out of the subscription queue
and invoke callback.

NOTE: Callback is executed in the context of relay thread.
"""

def __init__(self, event_queue: EventQueue, callback: Callable[[Dict[str, Any]], None]) -> None:
Expand Down
4 changes: 0 additions & 4 deletions proxy/dashboard/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@
:license: BSD, see LICENSE for more details.
"""
from .dashboard import ProxyDashboard
from .inspect_traffic import InspectTrafficPlugin
from .plugin import ProxyDashboardWebsocketPlugin

__all__ = [
'ProxyDashboard',
'InspectTrafficPlugin',
'ProxyDashboardWebsocketPlugin',
]
62 changes: 2 additions & 60 deletions proxy/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@
:license: BSD, see LICENSE for more details.
"""
import os
import json
import logging
from typing import List, Tuple, Any, Dict

from .plugin import ProxyDashboardWebsocketPlugin

from ..common.utils import bytes_
from typing import List, Tuple

from ..http.responses import permanentRedirectResponse
from ..http.parser import HttpParser
from ..http.websocket import WebsocketFrame
from ..http.server import HttpWebServerPlugin, HttpWebServerBasePlugin, httpProtocolTypes

logger = logging.getLogger(__name__)
Expand All @@ -42,24 +36,9 @@ class ProxyDashboard(HttpWebServerBasePlugin):
(httpProtocolTypes.HTTPS, r'/dashboard/$'),
]

# Handles WebsocketAPI requests for dashboard
WS_ROUTES = [
(httpProtocolTypes.WEBSOCKET, r'/dashboard$'),
]

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.plugins: Dict[str, ProxyDashboardWebsocketPlugin] = {}
if b'ProxyDashboardWebsocketPlugin' in self.flags.plugins:
for klass in self.flags.plugins[b'ProxyDashboardWebsocketPlugin']:
p = klass(self.flags, self.client, self.event_queue)
for method in p.methods():
self.plugins[method] = p

def routes(self) -> List[Tuple[int, str]]:
return ProxyDashboard.REDIRECT_ROUTES + \
ProxyDashboard.INDEX_ROUTES + \
ProxyDashboard.WS_ROUTES
ProxyDashboard.INDEX_ROUTES

def handle_request(self, request: HttpParser) -> None:
if request.path == b'/dashboard/':
Expand All @@ -76,40 +55,3 @@ def handle_request(self, request: HttpParser) -> None:
b'/dashboard/proxy.html',
):
self.client.queue(permanentRedirectResponse(b'/dashboard/'))

def on_websocket_open(self) -> None:
logger.info('app ws opened')

def on_websocket_message(self, frame: WebsocketFrame) -> None:
try:
assert frame.data
message = json.loads(frame.data)
except UnicodeDecodeError:
logger.error(frame.data)
logger.info(frame.opcode)
return

method = message['method']
if method == 'ping':
self.reply({'id': message['id'], 'response': 'pong'})
elif method in self.plugins:
self.plugins[method].handle_message(message)
else:
logger.info(frame.data)
logger.info(frame.opcode)
self.reply({'id': message['id'], 'response': 'not_implemented'})

def on_client_connection_close(self) -> None:
logger.info('app ws closed')
# TODO(abhinavsingh): unsubscribe

def reply(self, data: Dict[str, Any]) -> None:
self.client.queue(
memoryview(
WebsocketFrame.text(
bytes_(
json.dumps(data),
),
),
),
)
10 changes: 0 additions & 10 deletions proxy/http/inspector/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,4 @@

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.

.. spelling::

http
Submodules
"""
from .devtools import DevtoolsProtocolPlugin

__all__ = [
'DevtoolsProtocolPlugin',
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,13 @@
import json
from typing import List, Dict, Any

from .plugin import ProxyDashboardWebsocketPlugin
from ...common.utils import bytes_
from ...core.event import EventSubscriber
from ...core.connection import TcpClientConnection
from ..websocket import WebsocketFrame, WebSocketTransportBasePlugin

from ..common.utils import bytes_
from ..core.event import EventSubscriber
from ..core.connection import TcpClientConnection
from ..http.websocket import WebsocketFrame


class InspectTrafficPlugin(ProxyDashboardWebsocketPlugin):
class InspectTrafficPlugin(WebSocketTransportBasePlugin):
"""Websocket API for inspect_traffic.ts frontend plugin."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
Expand Down
2 changes: 2 additions & 0 deletions proxy/http/websocket/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
"""
from .frame import WebsocketFrame, websocketOpcodes
from .client import WebsocketClient
from .plugin import WebSocketTransportBasePlugin

__all__ = [
'websocketOpcodes',
'WebsocketFrame',
'WebsocketClient',
'WebSocketTransportBasePlugin',
]
10 changes: 5 additions & 5 deletions proxy/dashboard/plugin.py → proxy/http/websocket/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
from abc import ABC, abstractmethod
from typing import List, Dict, Any

from ..common.utils import bytes_
from ..http.websocket import WebsocketFrame
from ..core.connection import TcpClientConnection
from ..core.event import EventQueue
from ...common.utils import bytes_
from . import WebsocketFrame
from ...core.connection import TcpClientConnection
from ...core.event import EventQueue


class ProxyDashboardWebsocketPlugin(ABC):
class WebSocketTransportBasePlugin(ABC):
"""Abstract class for plugins extending dashboard websocket API."""

def __init__(
Expand Down
82 changes: 82 additions & 0 deletions proxy/http/websocket/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
"""
proxy.py
~~~~~~~~
⚡⚡⚡ Fast, Lightweight, Pluggable, TLS interception capable proxy server focused on
Network monitoring, controls & Application development, testing, debugging.

:copyright: (c) 2013-present by Abhinav Singh and contributors.
:license: BSD, see LICENSE for more details.
"""
import json
import logging
from typing import List, Tuple, Any, Dict

from ...common.utils import bytes_

from ..server import httpProtocolTypes, HttpWebServerBasePlugin
from ..parser import HttpParser

from .frame import WebsocketFrame
from .plugin import WebSocketTransportBasePlugin

logger = logging.getLogger(__name__)


class WebSocketTransport(HttpWebServerBasePlugin):
"""WebSocket transport framework."""

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.plugins: Dict[str, WebSocketTransportBasePlugin] = {}
if b'WebSocketTransportBasePlugin' in self.flags.plugins:
for klass in self.flags.plugins[b'WebSocketTransportBasePlugin']:
p = klass(self.flags, self.client, self.event_queue)
for method in p.methods():
self.plugins[method] = p

def routes(self) -> List[Tuple[int, str]]:
return [
(httpProtocolTypes.WEBSOCKET, r'/transport/$'),
]

def handle_request(self, request: HttpParser) -> None:
raise NotImplementedError()

def on_websocket_open(self) -> None:
# TODO(abhinavsingh): Add connected callback invocation
logger.info('app ws opened')

def on_websocket_message(self, frame: WebsocketFrame) -> None:
try:
assert frame.data
message = json.loads(frame.data)
except UnicodeDecodeError:
logger.error(frame.data)
logger.info(frame.opcode)
return

method = message['method']
if method == 'ping':
self.reply({'id': message['id'], 'response': 'pong'})
elif method in self.plugins:
self.plugins[method].handle_message(message)
else:
logger.info(frame.data)
logger.info(frame.opcode)
self.reply({'id': message['id'], 'response': 'not_implemented'})

def on_client_connection_close(self) -> None:
# TODO(abhinavsingh): Add disconnected callback invocation
logger.info('app ws closed')

def reply(self, data: Dict[str, Any]) -> None:
self.client.queue(
memoryview(
WebsocketFrame.text(
bytes_(
json.dumps(data),
),
),
),
)
3 changes: 2 additions & 1 deletion tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
DEFAULT_ENABLE_DASHBOARD, PLUGIN_DEVTOOLS_PROTOCOL,
DEFAULT_ENABLE_WEB_SERVER, DEFAULT_DISABLE_HTTP_PROXY,
DEFAULT_CA_SIGNING_KEY_FILE, DEFAULT_CLIENT_RECVBUF_SIZE,
DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_ENABLE_STATIC_SERVER,
DEFAULT_SERVER_RECVBUF_SIZE, DEFAULT_ENABLE_STATIC_SERVER, PLUGIN_WEBSOCKET_TRANSPORT,
_env_threadless_compliant,
)

Expand Down Expand Up @@ -243,6 +243,7 @@ def test_enable_dashboard(
mock_load_plugins.call_args_list[0][0][0], [
bytes_(PLUGIN_WEB_SERVER),
bytes_(PLUGIN_DASHBOARD),
bytes_(PLUGIN_WEBSOCKET_TRANSPORT),
bytes_(PLUGIN_INSPECT_TRAFFIC),
bytes_(PLUGIN_DEVTOOLS_PROTOCOL),
bytes_(PLUGIN_HTTP_PROXY),
Expand Down