From 320edd2ffdf5afee38d3908885c89331d6b497d7 Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Mon, 1 Jul 2024 13:12:14 -0700 Subject: [PATCH] Add initial revision of the WS/WSS socket server and appropriate support for sending calls beetween WSS and SIP clients. Requires RTPProxy 3.1 or newer with dtls_gw and ice_lite modules loaded. --- requirements.txt | 1 + sippy/MyConfigParser.py | 5 + sippy/Rtp_proxy/Session/webrtc.py | 9 ++ sippy/SipTransactionManager.py | 13 ++- sippy/Wss_server.py | 147 ++++++++++++++++++++++++++++++ sippy/b2bua_radius.py | 38 ++++++-- 6 files changed, 203 insertions(+), 10 deletions(-) create mode 100644 sippy/Rtp_proxy/Session/webrtc.py create mode 100644 sippy/Wss_server.py diff --git a/requirements.txt b/requirements.txt index ded180af..4f10ea79 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ ElPeriodic>=1.1 pycryptodome +websockets diff --git a/sippy/MyConfigParser.py b/sippy/MyConfigParser.py index 1ee2acdc..4666b7cc 100644 --- a/sippy/MyConfigParser.py +++ b/sippy/MyConfigParser.py @@ -98,6 +98,11 @@ 'and "SUBSCRIBE" messages. Address in the format ' \ '"host[:port]"'), 'nat_traversal': ('B', 'enable NAT traversal for signalling'), \ + 'wss_socket': ('S', 'WSS (SIP via websockets, RFC7118) socket ' \ + 'configuration. Configuration in the format ' \ + '"host:port:cert_file:key_file", where "cert_file" ' \ + '/ "key_file" are paths to the TLS certificate ' \ + 'and key file respectively in the X.509 PEM format'), 'xmpp_b2bua_id': ('I', 'ID passed to the XMPP socket server')} class MyConfigParser(RawConfigParser): diff --git a/sippy/Rtp_proxy/Session/webrtc.py b/sippy/Rtp_proxy/Session/webrtc.py new file mode 100644 index 00000000..e561a6ea --- /dev/null +++ b/sippy/Rtp_proxy/Session/webrtc.py @@ -0,0 +1,9 @@ +from sippy.Rtp_proxy.session import Rtp_proxy_session + +class Rtp_proxy_session_webrtc(Rtp_proxy_session): + insert_nortpp = False + def __init__(self, *a, **kwa): + super().__init__(*a, **kwa) + self.caller.deice = True + self.caller.gateway_dtls = 'dtls' + self.callee.gateway_dtls = 'rtp' diff --git a/sippy/SipTransactionManager.py b/sippy/SipTransactionManager.py index e3603f23..e34727c6 100644 --- a/sippy/SipTransactionManager.py +++ b/sippy/SipTransactionManager.py @@ -309,6 +309,11 @@ def handleIncoming(self, data_in, ra:Remote_address, server, rtime): curl = cbody.getUrl() if check1918(curl.host): curl.host, curl.port = ra.address + if ra.transport == 'ws' and resp.countHFs('contact') > 0: + cbody = resp.getHFBody('contact') + if not cbody.asterisk: + curl = cbody.getUrl() + curl.host = ra.received resp.setSource(ra.address) self.incomingResponse(resp, t, checksum) else: @@ -816,8 +821,12 @@ def transmitData(self, userv, data, address, cachesum = None, \ logop = 'SENDING' else: logop = 'DISCARDING' - self.global_config['_sip_logger'].write('%s message to %s:%d:\n' % \ - (logop, address[0], address[1]), data) + if userv.transport != 'ws': + paddr = '%s:%d' % (address[0], address[1]) + else: + paddr = address[0] + msg = f'{logop} message to {userv.transport}:{paddr}:\n' + self.global_config['_sip_logger'].write(msg, data) if cachesum != None: if lossemul > 0: lossemul -= 1 diff --git a/sippy/Wss_server.py b/sippy/Wss_server.py new file mode 100644 index 00000000..230d604a --- /dev/null +++ b/sippy/Wss_server.py @@ -0,0 +1,147 @@ +# Copyright (c) 2006-2024 Sippy Software, Inc. All rights reserved. +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation and/or +# other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +from typing import Optional, Dict, Tuple +from threading import Thread +from asyncio import get_event_loop, all_tasks, new_event_loop, set_event_loop, CancelledError, \ + Queue as AsyncQueue, create_task +from ssl import SSLContext, PROTOCOL_TLS_SERVER +from uuid import UUID +from websockets import WebSocketServerProtocol, ConnectionClosed, serve as ws_serve + +from sippy.Core.EventDispatcher import ED2 +from sippy.Network_server import Network_server, Network_server_opts, Remote_address +from sippy.Time.MonoTime import MonoTime + +class Wss_server_opts(Network_server_opts): + certfile: Optional[str] = None + keyfile: Optional[str] = None + + def __init__(self, *args, certfile = None, keyfile = None, o = None): + super().__init__(*args, o = o) + if o != None: + self.certfile, self.keyfile = o.certfile, o.keyfile + return + self.certfile = certfile + self.keyfile = keyfile + +class Wss_server(Thread, Network_server): + transport = 'ws' + daemon = True + ssl_context: Optional[SSLContext] = None + connections: Dict[UUID, Tuple[WebSocketServerProtocol, AsyncQueue]] + + def __init__(self, global_config, uopts:Wss_server_opts): + Thread.__init__(self) + Network_server.__init__(self, uopts) + if self.uopts.certfile is not None: + self.ssl_context = SSLContext(PROTOCOL_TLS_SERVER) + self.ssl_context.load_cert_chain(self.uopts.certfile, self.uopts.keyfile) + self.connections = {} + self.start() + + async def monitor_queue(self): + while True: + item = await get_event_loop().run_in_executor(None, self.sendqueue.get) + if item is None: + for task in all_tasks(): + task.cancel() + break + data, address = item + uaddress = address[0] + if uaddress not in self.connections: + print(f'ERROR: Invalid address {uaddress=}') + continue + await self.connections[uaddress][1].put(data) + + async def sip_to_ws(self, queue:AsyncQueue, websocket:WebSocketServerProtocol): + while True: + item = await queue.get() + await websocket.send(item) + + async def ws_to_sip(self, websocket, path): + print(f'New connection {websocket.id=}') + queue = AsyncQueue() + sender = create_task(self.sip_to_ws(queue, websocket)) + conn_id = f'{websocket.id}.invalid' + self.connections[conn_id] = (websocket, queue) + address = Remote_address(websocket.remote_address, self.transport) + address.received = conn_id + try: + while True: + data = await websocket.recv() + rtime = MonoTime() + ED2.callFromThread(self.handle_read, data, address, rtime) + except ConnectionClosed: + print(f'Connection {websocket.id} closed') + finally: + del self.connections[conn_id] + sender.cancel() + await sender + + async def async_run(self): + start_server = ws_serve( + self.ws_to_sip, self.uopts.laddress[0], self.uopts.laddress[1], ssl = self.ssl_context, + subprotocols = ['sip'] + ) + server = await start_server + await self.monitor_queue() + server.close() + await server.wait_closed() + + def runFailed(self, exception): + ED2.breakLoop(255) + raise exception + + def run(self): + loop = new_event_loop() + set_event_loop(loop) + try: + loop.run_until_complete(self.async_run()) + except CancelledError: + pass + except OSError as ex: + ED2.callFromThread(self.runFailed, ex) + finally: + loop.close() + +if __name__ == '__main__': + laddr = ('192.168.23.43', 9878) + certfile = '/home/sobomax/server.crt' + keyfile = '/home/sobomax/server.key' + from sippy.SipRequest import SipRequest + def data_callback(data, address, server, rtime): + sr = SipRequest(data) + print(f'Got {sr=} from {address=}') + for rr in (100, 'Trying'), (666, 'Busy Here'): + res = sr.genResponse(rr[0], rr[1]) + server.send_to(str(res), address) + ED2.breakLoop() + wopts = Wss_server_opts(laddr, data_callback, certfile = certfile, keyfile = keyfile) + wserv = Wss_server(None, wopts) + try: + ED2.loop() + finally: + wserv.shutdown() diff --git a/sippy/b2bua_radius.py b/sippy/b2bua_radius.py index 811eead9..0357de3c 100755 --- a/sippy/b2bua_radius.py +++ b/sippy/b2bua_radius.py @@ -49,6 +49,7 @@ from sippy.FakeAccounting import FakeAccounting from sippy.SipLogger import SipLogger from sippy.Rtp_proxy.session import Rtp_proxy_session +from sippy.Rtp_proxy.Session.webrtc import Rtp_proxy_session_webrtc from sippy.Rtp_proxy.client import Rtp_proxy_client from signal import SIGHUP, SIGPROF, SIGUSR1, SIGUSR2, SIGTERM from sippy.CLIManager import CLIConnectionManager @@ -57,6 +58,7 @@ from sippy.StatefulProxy import StatefulProxy from sippy.misc import daemonize from sippy.B2BRoute import B2BRoute +from sippy.Wss_server import Wss_server, Wss_server_opts import gc, getopt, os from re import sub @@ -100,6 +102,8 @@ class CCStateDead(object): class CCStateDisconnecting(object): sname = 'Disconnecting' +class Rtp_proxy_session_default(Rtp_proxy_session): insert_nortpp = True + class CallController(object): id = 1 uaA = None @@ -114,6 +118,7 @@ class CallController(object): acctA = None acctO = None global_config = None + rtpps_cls = Rtp_proxy_session_default rtp_proxy_session = None huntstop_scodes = None pass_headers = None @@ -121,9 +126,10 @@ class CallController(object): proxied = False challenge = None - def __init__(self, remote_ip, source, global_config, pass_headers): + def __init__(self, remote_ip, source, vtrans, global_config, pass_headers): self.id = CallController.id CallController.id += 1 + if vtrans == 'WSS': self.rtpps_cls = Rtp_proxy_session_webrtc self.global_config = global_config self.uaA = UA(self.global_config, event_cb = self.recvEvent, conn_cbs = (self.aConn,), disc_cbs = (self.aDisc,), \ fail_cbs = (self.aDisc,), dead_cbs = (self.aDead,)) @@ -158,7 +164,7 @@ def recvEvent(self, event, ua): allowed_pts = self.global_config['_allowed_pts'] for sect in body.content.sections: mbody = sect.m_header - if mbody.transport.lower() not in Rtp_proxy_session.AV_TRTYPES: + if mbody.transport.lower() not in self.rtpps_cls.AV_TRTYPES: continue old_len = len(mbody.formats) mbody.formats = [x for x in mbody.formats if x in allowed_pts] @@ -177,11 +183,10 @@ def recvEvent(self, event, ua): self.cld = re_replace(self.global_config['static_tr_in'], self.cld) event.data = (self.cId, self.cli, self.cld, body, auth, self.caller_name) if '_rtp_proxy_clients' in self.global_config: - self.rtp_proxy_session = Rtp_proxy_session(self.global_config, call_id = self.cId, \ + self.rtp_proxy_session = self.rtpps_cls(self.global_config, call_id = self.cId, \ notify_socket = self.global_config['b2bua_socket'], \ notify_tag = quote('r %s' % str(self.id))) self.rtp_proxy_session.callee.raddress = (self.remote_ip, 5060) - self.rtp_proxy_session.insert_nortpp = True self.eTry = event self.state = CCStateWaitRoute if not self.global_config['auth_enable']: @@ -428,8 +433,11 @@ def recvRequest(self, req, sip_t): via = req.getHFBody('via', 1) else: via = req.getHFBody('via', 0) - remote_ip = via.getTAddr()[0] source = req.getSource() + if not via.transport == 'WSS': + remote_ip = via.getTAddr()[0] + else: + remote_ip = source[0] # First check if request comes from IP that # we want to accept our traffic from @@ -459,7 +467,7 @@ def recvRequest(self, req, sip_t): hfs = req.getHFs(header) if len(hfs) > 0: pass_headers.extend(hfs) - cc = CallController(remote_ip, source, self.global_config, pass_headers) + cc = CallController(remote_ip, source, via.transport, self.global_config, pass_headers) cc.challenge = challenge rval = cc.uaA.recvRequest(req, sip_t) self.ccmap.append(cc) @@ -813,7 +821,17 @@ def main_func(): if global_config.getdefault('xmpp_b2bua_id', None) != None: global_config['_xmpp_mode'] = True - global_config['_sip_tm'] = SipTransactionManager(global_config, global_config['_cmap'].recvRequest) + stm = SipTransactionManager(global_config, global_config['_cmap'].recvRequest) + + if 'wss_socket' in global_config: + parts = global_config['wss_socket'].split(':', 3) + wss_laddr = (parts[0], int(parts[1])) + wss_certfile = parts[2] + wss_keyfile = parts[3] + wss_opts = Wss_server_opts(wss_laddr, stm.handleIncoming, certfile=wss_certfile, keyfile=wss_keyfile) + global_config['_wss_server'] = Wss_server(global_config, wss_opts) + + global_config['_sip_tm'] = stm global_config['_sip_tm'].nat_traversal = global_config.getdefault('nat_traversal', False) cmdfile = global_config['b2bua_socket'] @@ -825,7 +843,11 @@ def main_func(): open(global_config['pidfile'], 'w').write(str(os.getpid()) + '\n') Signal(SIGUSR1, reopen, SIGUSR1, global_config['logfile']) - ED2.loop() + try: + ED2.loop() + finally: + if '_wss_server' in global_config: + global_config['_wss_server'].shutdown() if __name__ == '__main__': main_func()