Skip to content

Commit

Permalink
Add PRACK support (WiP).
Browse files Browse the repository at this point in the history
  • Loading branch information
sobomax committed Jul 29, 2024
1 parent dc4d7c6 commit 49369a3
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 24 deletions.
6 changes: 6 additions & 0 deletions sippy/SipRequest.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,9 @@ def genRequest(self, method, cseq = None):
via = self.getHFBCopy('via'), callid = self.getHFBCopy('call-id'), \
cseq = cseq, maxforwards = maxforward, \
user_agent = self.user_agent, expires = expires)

def getRTId(self):
headers_dict = dict([(x.name, x) for x in self.headers if x.name in ('rack', 'call-id', 'from')])
rseq, cseq, method = headers_dict['rack'].getBody().getRSeq()
rval = [str(headers_dict['call-id'].getBody()), headers_dict['from'].getBody().getTag(), rseq, cseq, method]
return tuple(rval)
7 changes: 7 additions & 0 deletions sippy/SipResponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,10 @@ def getCopy(self):
cself.reason = self.reason
cself.sipver = self.sipver
return cself

def getRTId(self):
headers_dict = dict([(x.name, x) for x in self.headers if x.name in ('rseq', 'cseq', 'call-id', 'from')])
rseq = headers_dict['rseq'].getBody().getNum()
cseq, method = headers_dict['cseq'].getBody().getCSeq()
rval = [str(headers_dict['call-id'].getBody()), headers_dict['from'].getBody().getTag(), rseq, cseq, method]
return tuple(rval)
138 changes: 117 additions & 21 deletions sippy/SipTransactionManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from sippy.SipAddress import SipAddress
from sippy.SipRoute import SipRoute
from sippy.SipHeader import SipHeader
from sippy.SipRSeq import SipRSeq
from sippy.Exceptions.SipParseError import SipParseError, SdpParseError
from sippy.Exceptions.RtpProxyError import RtpProxyError
from sippy.Udp_server import Udp_server, Udp_server_opts
Expand Down Expand Up @@ -98,6 +99,23 @@ def cleanup(self):
self.req_out_cb = None
self.res_out_cb = None

class SipUASTransaction(SipTransaction):
rseq = None
prov_inflight = None
pr_rel = False

def __init__(self):
self.rseq = SipRSeq()
self.prov_inflight = {}
SipTransaction.__init__(self)

class SipUACTransaction(SipTransaction):
seen_rseqs = None

def __init__(self):
self.seen_rseqs = []
SipTransaction.__init__(self)

# Symbolic states names
class SipTransactionState(object):
pass
Expand Down Expand Up @@ -217,19 +235,23 @@ def rotateCache(self):
self.cache_r2l = {}

class SipTMRetransmitO(object):
t = None
userv = None
data = None
address = None
call_id = None
lossemul = None
checksum = None

def __init__(self, userv = None, data = None, address = None,
call_id = None, lossemul = None):
call_id = None, lossemul = None, t = None, checksum = None):
self.userv = userv
self.data = data
self.address = address
self.call_id = call_id
self.lossemul = lossemul
self.t = t
self.checksum = checksum

class SipTransactionManager(object):
global_config = None
Expand All @@ -241,6 +263,7 @@ class SipTransactionManager(object):
l2rcache = None
nat_traversal = False
req_consumers = None
rtid2tid = None
provisional_retr = 0
ploss_out_rate = 0.0
pdelay_out_max = 0.0
Expand All @@ -257,6 +280,7 @@ def __init__(self, global_config, req_cb = None):
self.l1rcache = {}
self.l2rcache = {}
self.req_consumers = {}
self.rtid2tid = {}
Timeout(self.rCachePurge, 32, -1)

def handleIncoming(self, data_in, ra:Remote_address, server, rtime):
Expand Down Expand Up @@ -383,7 +407,7 @@ def get_contact():
def newTransaction(self, msg, resp_cb = None, laddress = None, userv = None, \
cb_ifver = 1, compact = False, t = None):
if t == None:
t = SipTransaction()
t = SipUACTransaction()
t.rtime = MonoTime()
t.compact = compact
t.method = msg.getMethod()
Expand Down Expand Up @@ -453,6 +477,14 @@ def incomingResponse(self, msg, t, checksum):
if t.state == TERMINATED:
return

code = msg.getSCode()[0]
if code > 100 and code < 200 and msg.countHFs('rseq') > 0:
rskey = msg.getRTId()
if rskey in t.seen_rseqs:
self.l1rcache[checksum] = (None, None, None)
return None
t.seen_rseqs.append(rskey)

if t.state == TRYING:
# Stop timers
if t.teA != None:
Expand All @@ -464,7 +496,7 @@ def incomingResponse(self, msg, t, checksum):
t.teB.cancel()
t.teB = None

if msg.getSCode()[0] < 200:
if code < 200:
# Privisional response - leave everything as is, except that
# change state and reload timeout timer
if t.state == TRYING:
Expand All @@ -483,12 +515,11 @@ def incomingResponse(self, msg, t, checksum):
# Final response - notify upper layer and remove transaction
if t.needack:
# Prepare and send ACK if necessary
fcode = msg.getSCode()[0]
tag = msg.getHFBody('to').getTag()
if tag != None:
t.ack.getHFBody('to').setTag(tag)
rAddr = None
if msg.getSCode()[0] >= 200 and msg.getSCode()[0] < 300:
if code >= 200 and code < 300:
# Some hairy code ahead
if msg.countHFs('contact') > 0:
rTarget = msg.getHFBody('contact').getUrl().getCopy()
Expand All @@ -512,7 +543,7 @@ def incomingResponse(self, msg, t, checksum):
t.ack.setTarget(rAddr)
t.ack.delHFs('route')
t.ack.appendHeaders([SipHeader(name = 'route', body = x) for x in routes])
if fcode >= 200 and fcode < 300:
if code >= 200 and code < 300:
t.ack.getHFBody('via').genBranch()
if rAddr == None:
rAddr = (t.address, t.userv.transport)
Expand Down Expand Up @@ -581,28 +612,32 @@ def incomingRequest(self, msg, checksum, tids, server):
self.transmitMsg(server, resp, resp.getHFBody('via').getTAddr(), checksum, \
t.compact)
return
if msg.getMethod() != 'ACK':
tid = msg.getTId(wBRN = True)
else:
mmethod = msg.getMethod()
if mmethod == 'ACK':
tid = msg.getTId(wTTG = True)
elif mmethod == 'PRACK':
rtid = msg.getRTId()
tid = self.rtid2tid.get(rtid, None)
else:
tid = msg.getTId(wBRN = True)
t = self.tserver.get(tid, None)
if t != None:
#print 'existing transaction'
if msg.getMethod() == t.method:
if mmethod == t.method:
# Duplicate received, check that we have sent any response on this
# request already
if t.data != None:
self.transmitData(t.userv, t.data, t.address, checksum)
return
elif msg.getMethod() == 'CANCEL':
elif mmethod == 'CANCEL':
# RFC3261 says that we have to reply 200 OK in all cases if
# there is such transaction
resp = msg.genResponse(200, 'OK')
self.transmitMsg(t.userv, resp, resp.getHFBody('via').getTAddr(), checksum, \
t.compact)
if t.state in (TRYING, RINGING):
self.doCancel(t, msg.rtime, msg)
elif msg.getMethod() == 'ACK' and t.state == COMPLETED:
elif mmethod == 'ACK' and t.state == COMPLETED:
t.state = CONFIRMED
if t.teA != None:
t.teA.cancel()
Expand All @@ -614,27 +649,52 @@ def incomingRequest(self, msg, checksum, tids, server):
t.ack_cb(msg)
t.cleanup()
self.l1rcache[checksum] = SipTMRetransmitO()
elif msg.getMethod() == 'ACK':
elif mmethod == 'PRACK':
rskey = msg.getRTId()
if rskey in t.prov_inflight:
rert_t = t.prov_inflight[rskey]
rert_t.cancel()
del t.prov_inflight[rskey]
del self.rtid2tid[rskey]
resp = msg.genResponse(200, 'OK')
else:
print('rskey: %s, prov_inflight: %s' % (str(rskey), str(t.prov_inflight)))
resp = msg.genResponse(481, 'Huh?')
self.transmitMsg(t.userv, resp, resp.getHFBody('via').getTAddr(), checksum, \
t.compact)
elif mmethod == 'ACK':
# Some ACK that doesn't match any existing transaction.
# Drop and forget it - upper layer is unlikely to be interested
# to seeing this anyway.
print(datetime.now(), 'unmatched ACK transaction - ignoring')
print(datetime.now(), 'tid: %s, self.tserver: %s' % (str(tid), \
str(self.tserver)))
sys.stdout.flush()
self.l1rcache[checksum] = SipTMRetransmitO()
elif msg.getMethod() == 'CANCEL':
elif mmethod == 'PRACK':
# Some ACK that doesn't match any existing transaction.
# Drop and forget it - upper layer is unlikely to be interested
# to seeing this anyway.
print(datetime.now(), 'unmatched PRACK transaction - 481\'ing')
print(datetime.now(), 'rtid: %s, tid: %s, self.tserver: %s' % (str(rtid), str(tid), \
str(self.tserver)))
sys.stdout.flush()
resp = msg.genResponse(481, 'Huh?')
self.transmitMsg(server, resp, resp.getHFBody('via').getTAddr(), checksum)
elif mmethod == 'CANCEL':
resp = msg.genResponse(481, 'Call Leg/Transaction Does Not Exist')
self.transmitMsg(server, resp, resp.getHFBody('via').getTAddr(), checksum)
else:
#print 'new transaction', msg.getMethod()
t = SipTransaction()
#print 'new transaction', mmethod
t = SipUASTransaction()
t.tid = tid
t.state = TRYING
t.teA = None
t.teD = None
t.teE = None
t.teF = None
t.teG = None
t.method = msg.getMethod()
t.method = mmethod
t.rtime = msg.rtime
t.data = None
t.address = None
Expand All @@ -648,7 +708,7 @@ def incomingRequest(self, msg, checksum, tids, server):
# For messages received on the wildcard interface find
# or create more specific server.
t.userv = self.l4r.getServer(msg.getSource())
if msg.getMethod() == 'INVITE':
if mmethod == 'INVITE':
t.r487 = msg.genResponse(487, 'Request Terminated')
t.needack = True
t.branch = msg.getHFBody('via').getBranch()
Expand Down Expand Up @@ -719,11 +779,29 @@ def sendResponse(self, resp, t = None, retrans = False, ack_cb = None,
toHF = resp.getHFBody('to')
if scode > 100 and toHF.getTag() == None:
toHF.genTag()
if t.pr_rel and scode > 100 and scode < 200:
rseq = t.rseq.getCopy()
t.rseq.incNum()
rseq_h = SipHeader(name = 'rseq', body = rseq)
resp.appendHeader(rseq_h)
tid = resp.getTId(wBRN = True)
rtid = resp.getRTId()
self.rtid2tid[rtid] = tid
else:
rseq_h = None
t.data = resp.localStr(t.userv.getSIPaddr(), compact = t.compact)
t.address = resp.getHFBody('via').getTAddr()
self.transmitData(t.userv, t.data, t.address, t.checksum, lossemul)
if t.res_out_cb != None:
t.res_out_cb(resp)
if rseq_h != None:
rskey = resp.getRTId()
if lossemul > 0:
lossemul -= 1
rdata = SipTMRetransmitO(t = t, userv = t.userv, data = t.data, \
address = t.address, lossemul = lossemul)
rert_t = Timeout(self.retrUasResponse, 0.5, 1, rdata, 0.5, rskey)
t.prov_inflight[rskey] = rert_t
if scode < 200:
t.state = RINGING
if self.provisional_retr > 0 and scode > 100:
Expand All @@ -748,9 +826,14 @@ def sendResponse(self, resp, t = None, retrans = False, ack_cb = None,
# ACK transaction after this point. Branch tag in ACK
# could differ as well.
del self.tserver[t.tid]
t.tid = list(t.tid[:-1])
t.tid.append(resp.getHFBody('to').getTag())
t.tid = tuple(t.tid)
new_tid = list(t.tid[:-1])
new_tid.append(resp.getHFBody('to').getTag())
new_tid = tuple(new_tid)
for ik in t.prov_inflight.keys():
if not ik in self.rtid2tid or self.rtid2tid[ik] != t.tid:
continue
self.rtid2tid[ik] = new_tid
t.tid = new_tid
self.tserver[t.tid] = t
# Install retransmit timer if necessary
t.tout = 0.5
Expand All @@ -760,6 +843,19 @@ def sendResponse(self, resp, t = None, retrans = False, ack_cb = None,
del self.tserver[t.tid]
t.cleanup()

def retrUasResponse(self, rdata, last_timeout, rskey):
if last_timeout > 16:
del rdata.t.prov_inflight[rskey]
del self.rtid2tid[rskey]
return
if rdata.lossemul == 0:
self.transmitData(rdata.userv, rdata.data, rdata.address)
else:
rdata.lossemul -= 1
last_timeout *= 2
rert_t = Timeout(self.retrUasResponse, last_timeout, 1, rdata, last_timeout, rskey)
rdata.t.prov_inflight[rskey] = rert_t

def doCancel(self, t, rtime = None, req = None):
if rtime == None:
rtime = MonoTime()
Expand Down
9 changes: 6 additions & 3 deletions sippy/UA.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,19 +156,21 @@ def recvRequest(self, req, sip_t):
sip_t.compact = self.compact_sip
if self.remote_ua == None:
self.update_ua(req)
rmethod = req.getMethod()
if self.rCSeq != None and self.rCSeq >= req.getHFBody('cseq').getCSeqNum():
return (req.genResponse(500, 'Server Internal Error', server = self.local_ua), None, None)
self.rCSeq = req.getHFBody('cseq').getCSeqNum()
if self.state == None:
if req.getMethod() == 'INVITE':
if rmethod == 'INVITE':
sip_t.pr_rel = True
self.changeState((UasStateIdle,))
else:
return None
newstate = self.state.recvRequest(req)
if newstate != None:
self.changeState(newstate)
self.emitPendingEvents()
if newstate != None and req.getMethod() == 'INVITE':
if newstate != None and rmethod == 'INVITE':
return (None, self.state.cancel, self.disconnect)
else:
return None
Expand Down Expand Up @@ -306,7 +308,8 @@ def genRequest(self, method, body = None, cqop = None, \
return req

def sendUasResponse(self, scode, reason, body = None, contacts = None, \
reason_rfc3326 = None, extra_headers = None, ack_wait = False):
reason_rfc3326 = None, extra_headers = None, ack_wait = False, \
retrans = False):
uasResp = self.uasResp.getCopy()
uasResp.setSCode(scode, reason)
uasResp.setBody(body)
Expand Down
13 changes: 13 additions & 0 deletions sippy/UacStateRinging.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
from sippy.UaStateGeneric import UaStateGeneric
from sippy.CCEvents import CCEventRing, CCEventConnect, CCEventFail, CCEventRedirect, \
CCEventDisconnect, CCEventPreConnect
from sippy.SipHeader import SipHeader
from sippy.SipRAck import SipRAck

class UacStateRinging(UaStateGeneric):
sname = 'Ringing(UAC)'
Expand All @@ -41,6 +43,17 @@ def recvResponse(self, resp, tr):
code, reason = resp.getSCode()
scode = (code, reason, body)
if code < 200:
if resp.countHFs('rseq') > 0:
tag = resp.getHFBody('to').getTag()
self.ua.rUri.setTag(tag)
rseq = resp.getHFBody('rseq')
cseq = resp.getHFBody('cseq')
req = self.ua.genRequest('PRACK')
self.ua.lCSeq += 1
rack = SipRAck(rseq = rseq.number, cseq = cseq.cseq, method = cseq.method)
req.appendHeader(SipHeader(name = 'rack', body = rack))
self.ua.global_config['_sip_tm'].newTransaction(req, \
laddress = self.ua.source_address, compact = self.ua.compact_sip)
if self.ua.p1xx_ts == None:
self.ua.p1xx_ts = resp.rtime
self.ua.last_scode = code
Expand Down
Loading

0 comments on commit 49369a3

Please sign in to comment.