Skip to content

Commit

Permalink
Add initial revision of the WS/WSS socket server and
Browse files Browse the repository at this point in the history
appropriate support for sending calls beetween WSS
and SIP clients. Requires RTPProxy 3.1 or newer with
dtls_gw and ice_lite modules loaded.
  • Loading branch information
sobomax committed Jul 11, 2024
1 parent 98f9e25 commit 33a7175
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 10 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
ElPeriodic>=1.1
pycryptodome
websockets
5 changes: 5 additions & 0 deletions sippy/MyConfigParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@
'and "SUBSCRIBE" messages. Address in the format ' \
'"host[:port]"'),
'nat_traversal': ('B', 'enable NAT traversal for signalling'), \
'wss_socket': ('S', 'WSS (SIP via websockets, RFC7118) socket ' \
'configuration. Configuration in the format ' \
'"host:port:cert_file:key_file", where "cert_file" ' \
'/ "key_file" are paths to the TLS certificate ' \
'and key file respectively in the X.509 PEM format'),
'xmpp_b2bua_id': ('I', 'ID passed to the XMPP socket server')}

class MyConfigParser(RawConfigParser):
Expand Down
9 changes: 9 additions & 0 deletions sippy/Rtp_proxy/Session/webrtc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from sippy.Rtp_proxy.session import Rtp_proxy_session

class Rtp_proxy_session_webrtc(Rtp_proxy_session):
insert_nortpp = False
def __init__(self, *a, **kwa):
super().__init__(*a, **kwa)
self.caller.deice = True
self.caller.gateway_dtls = 'dtls'
self.callee.gateway_dtls = 'rtp'
13 changes: 11 additions & 2 deletions sippy/SipTransactionManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,11 @@ def handleIncoming(self, data_in, ra:Remote_address, server, rtime):
curl = cbody.getUrl()
if check1918(curl.host):
curl.host, curl.port = ra.address
if ra.transport == 'ws' and resp.countHFs('contact') > 0:
cbody = resp.getHFBody('contact')
if not cbody.asterisk:
curl = cbody.getUrl()
curl.host = ra.received
resp.setSource(ra.address)
self.incomingResponse(resp, t, checksum)
else:
Expand Down Expand Up @@ -816,8 +821,12 @@ def transmitData(self, userv, data, address, cachesum = None, \
logop = 'SENDING'
else:
logop = 'DISCARDING'
self.global_config['_sip_logger'].write('%s message to %s:%d:\n' % \
(logop, address[0], address[1]), data)
if userv.transport != 'ws':
paddr = '%s:%d' % (address[0], address[1])
else:
paddr = address[0]
msg = f'{logop} message to {userv.transport}:{paddr}:\n'
self.global_config['_sip_logger'].write(msg, data)
if cachesum != None:
if lossemul > 0:
lossemul -= 1
Expand Down
147 changes: 147 additions & 0 deletions sippy/Wss_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Copyright (c) 2006-2024 Sippy Software, Inc. All rights reserved.
#
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation and/or
# other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from typing import Optional, Dict, Tuple
from threading import Thread
from asyncio import get_event_loop, all_tasks, new_event_loop, set_event_loop, CancelledError, \
Queue as AsyncQueue, create_task
from ssl import SSLContext, PROTOCOL_TLS_SERVER
from uuid import UUID
from websockets import WebSocketServerProtocol, ConnectionClosed, serve as ws_serve

from sippy.Core.EventDispatcher import ED2
from sippy.Network_server import Network_server, Network_server_opts, Remote_address
from sippy.Time.MonoTime import MonoTime

class Wss_server_opts(Network_server_opts):
certfile: Optional[str] = None
keyfile: Optional[str] = None

def __init__(self, *args, certfile = None, keyfile = None, o = None):
super().__init__(*args, o = o)
if o != None:
self.certfile, self.keyfile = o.certfile, o.keyfile
return
self.certfile = certfile
self.keyfile = keyfile

class Wss_server(Thread, Network_server):
transport = 'ws'
daemon = True
ssl_context: Optional[SSLContext] = None
connections: Dict[UUID, Tuple[WebSocketServerProtocol, AsyncQueue]]

def __init__(self, global_config, uopts:Wss_server_opts):
Thread.__init__(self)
Network_server.__init__(self, uopts)
if self.uopts.certfile is not None:
self.ssl_context = SSLContext(PROTOCOL_TLS_SERVER)
self.ssl_context.load_cert_chain(self.uopts.certfile, self.uopts.keyfile)
self.connections = {}
self.start()

async def monitor_queue(self):
while True:
item = await get_event_loop().run_in_executor(None, self.sendqueue.get)
if item is None:
for task in all_tasks():
task.cancel()
break
data, address = item
uaddress = address[0]
if uaddress not in self.connections:
print(f'ERROR: Invalid address {uaddress=}')
continue
await self.connections[uaddress][1].put(data)

async def sip_to_ws(self, queue:AsyncQueue, websocket:WebSocketServerProtocol):
while True:
item = await queue.get()
await websocket.send(item)

async def ws_to_sip(self, websocket, path):
print(f'New connection {websocket.id=}')
queue = AsyncQueue()
sender = create_task(self.sip_to_ws(queue, websocket))
conn_id = f'{websocket.id}.invalid'
self.connections[conn_id] = (websocket, queue)
address = Remote_address(websocket.remote_address, self.transport)
address.received = conn_id
try:
while True:
data = await websocket.recv()
rtime = MonoTime()
ED2.callFromThread(self.handle_read, data, address, rtime)
except ConnectionClosed:
print(f'Connection {websocket.id} closed')
finally:
del self.connections[conn_id]
sender.cancel()
await sender

async def async_run(self):
start_server = ws_serve(
self.ws_to_sip, self.uopts.laddress[0], self.uopts.laddress[1], ssl = self.ssl_context,
subprotocols = ['sip']
)
server = await start_server
await self.monitor_queue()
server.close()
await server.wait_closed()

def runFailed(self, exception):
ED2.breakLoop(255)
raise exception

def run(self):
loop = new_event_loop()
set_event_loop(loop)
try:
loop.run_until_complete(self.async_run())
except CancelledError:
pass
except OSError as ex:
ED2.callFromThread(self.runFailed, ex)
finally:
loop.close()

if __name__ == '__main__':
laddr = ('192.168.23.43', 9878)
certfile = '/home/sobomax/server.crt'
keyfile = '/home/sobomax/server.key'
from sippy.SipRequest import SipRequest
def data_callback(data, address, server, rtime):
sr = SipRequest(data)
print(f'Got {sr=} from {address=}')
for rr in (100, 'Trying'), (666, 'Busy Here'):
res = sr.genResponse(rr[0], rr[1])
server.send_to(str(res), address)
ED2.breakLoop()
wopts = Wss_server_opts(laddr, data_callback, certfile = certfile, keyfile = keyfile)
wserv = Wss_server(None, wopts)
try:
ED2.loop()
finally:
wserv.shutdown()
38 changes: 30 additions & 8 deletions sippy/b2bua_radius.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from sippy.FakeAccounting import FakeAccounting
from sippy.SipLogger import SipLogger
from sippy.Rtp_proxy.session import Rtp_proxy_session
from sippy.Rtp_proxy.Session.webrtc import Rtp_proxy_session_webrtc
from sippy.Rtp_proxy.client import Rtp_proxy_client
from signal import SIGHUP, SIGPROF, SIGUSR1, SIGUSR2, SIGTERM
from sippy.CLIManager import CLIConnectionManager
Expand All @@ -57,6 +58,7 @@
from sippy.StatefulProxy import StatefulProxy
from sippy.misc import daemonize
from sippy.B2BRoute import B2BRoute
from sippy.Wss_server import Wss_server, Wss_server_opts

import gc, getopt, os
from re import sub
Expand Down Expand Up @@ -100,6 +102,8 @@ class CCStateDead(object):
class CCStateDisconnecting(object):
sname = 'Disconnecting'

class Rtp_proxy_session_default(Rtp_proxy_session): insert_nortpp = True

class CallController(object):
id = 1
uaA = None
Expand All @@ -114,16 +118,18 @@ class CallController(object):
acctA = None
acctO = None
global_config = None
rtpps_cls = Rtp_proxy_session_default
rtp_proxy_session = None
huntstop_scodes = None
pass_headers = None
auth_proc = None
proxied = False
challenge = None

def __init__(self, remote_ip, source, global_config, pass_headers):
def __init__(self, remote_ip, source, vtrans, global_config, pass_headers):
self.id = CallController.id
CallController.id += 1
if vtrans == 'WSS': self.rtpps_cls = Rtp_proxy_session_webrtc
self.global_config = global_config
self.uaA = UA(self.global_config, event_cb = self.recvEvent, conn_cbs = (self.aConn,), disc_cbs = (self.aDisc,), \
fail_cbs = (self.aDisc,), dead_cbs = (self.aDead,))
Expand Down Expand Up @@ -158,7 +164,7 @@ def recvEvent(self, event, ua):
allowed_pts = self.global_config['_allowed_pts']
for sect in body.content.sections:
mbody = sect.m_header
if mbody.transport.lower() not in Rtp_proxy_session.AV_TRTYPES:
if mbody.transport.lower() not in self.rtpps_cls.AV_TRTYPES:
continue
old_len = len(mbody.formats)
mbody.formats = [x for x in mbody.formats if x in allowed_pts]
Expand All @@ -177,11 +183,10 @@ def recvEvent(self, event, ua):
self.cld = re_replace(self.global_config['static_tr_in'], self.cld)
event.data = (self.cId, self.cli, self.cld, body, auth, self.caller_name)
if '_rtp_proxy_clients' in self.global_config:
self.rtp_proxy_session = Rtp_proxy_session(self.global_config, call_id = self.cId, \
self.rtp_proxy_session = self.rtpps_cls(self.global_config, call_id = self.cId, \
notify_socket = self.global_config['b2bua_socket'], \
notify_tag = quote('r %s' % str(self.id)))
self.rtp_proxy_session.callee.raddress = (self.remote_ip, 5060)
self.rtp_proxy_session.insert_nortpp = True
self.eTry = event
self.state = CCStateWaitRoute
if not self.global_config['auth_enable']:
Expand Down Expand Up @@ -428,8 +433,11 @@ def recvRequest(self, req, sip_t):
via = req.getHFBody('via', 1)
else:
via = req.getHFBody('via', 0)
remote_ip = via.getTAddr()[0]
source = req.getSource()
if not via.transport == 'WSS':
remote_ip = via.getTAddr()[0]
else:
remote_ip = source[0]

# First check if request comes from IP that
# we want to accept our traffic from
Expand Down Expand Up @@ -459,7 +467,7 @@ def recvRequest(self, req, sip_t):
hfs = req.getHFs(header)
if len(hfs) > 0:
pass_headers.extend(hfs)
cc = CallController(remote_ip, source, self.global_config, pass_headers)
cc = CallController(remote_ip, source, via.transport, self.global_config, pass_headers)
cc.challenge = challenge
rval = cc.uaA.recvRequest(req, sip_t)
self.ccmap.append(cc)
Expand Down Expand Up @@ -813,7 +821,17 @@ def main_func():

if global_config.getdefault('xmpp_b2bua_id', None) != None:
global_config['_xmpp_mode'] = True
global_config['_sip_tm'] = SipTransactionManager(global_config, global_config['_cmap'].recvRequest)
stm = SipTransactionManager(global_config, global_config['_cmap'].recvRequest)

if 'wss_socket' in global_config:
parts = global_config['wss_socket'].split(':', 3)
wss_laddr = (parts[0], int(parts[1]))
wss_certfile = parts[2]
wss_keyfile = parts[3]
wss_opts = Wss_server_opts(wss_laddr, stm.handleIncoming, certfile=wss_certfile, keyfile=wss_keyfile)
global_config['_wss_server'] = Wss_server(global_config, wss_opts)

global_config['_sip_tm'] = stm
global_config['_sip_tm'].nat_traversal = global_config.getdefault('nat_traversal', False)

cmdfile = global_config['b2bua_socket']
Expand All @@ -825,7 +843,11 @@ def main_func():
open(global_config['pidfile'], 'w').write(str(os.getpid()) + '\n')
Signal(SIGUSR1, reopen, SIGUSR1, global_config['logfile'])

ED2.loop()
try:
ED2.loop()
finally:
if '_wss_server' in global_config:
global_config['_wss_server'].shutdown()

if __name__ == '__main__':
main_func()

0 comments on commit 33a7175

Please sign in to comment.