From df624c5a061af895ef5e76c81edddc7e1e0c611c Mon Sep 17 00:00:00 2001 From: Maksym Sobolyev Date: Thu, 4 Jun 2015 16:35:25 -0700 Subject: [PATCH] Add PRACK support (WiP). --- sippy/SipRequest.py | 6 ++ sippy/SipResponse.py | 7 ++ sippy/SipTransactionManager.py | 138 ++++++++++++++++++++++++++++----- sippy/UA.py | 9 ++- sippy/UacStateRinging.py | 13 ++++ sippy/UacStateTrying.py | 13 ++++ 6 files changed, 162 insertions(+), 24 deletions(-) diff --git a/sippy/SipRequest.py b/sippy/SipRequest.py index 1cddb50..22e8a70 100644 --- a/sippy/SipRequest.py +++ b/sippy/SipRequest.py @@ -161,3 +161,9 @@ def genRequest(self, method, cseq = None): via = self.getHFBCopy('via'), callid = self.getHFBCopy('call-id'), \ cseq = cseq, maxforwards = maxforward, \ user_agent = self.user_agent, expires = expires) + + def getRTId(self): + headers_dict = dict([(x.name, x) for x in self.headers if x.name in ('rack', 'call-id', 'from')]) + rseq, cseq, method = headers_dict['rack'].getBody().getRSeq() + rval = [str(headers_dict['call-id'].getBody()), headers_dict['from'].getBody().getTag(), rseq, cseq, method] + return tuple(rval) diff --git a/sippy/SipResponse.py b/sippy/SipResponse.py index da0a9dd..2a30d6c 100644 --- a/sippy/SipResponse.py +++ b/sippy/SipResponse.py @@ -79,3 +79,10 @@ def getCopy(self): cself.reason = self.reason cself.sipver = self.sipver return cself + + def getRTId(self): + headers_dict = dict([(x.name, x) for x in self.headers if x.name in ('rseq', 'cseq', 'call-id', 'from')]) + rseq = headers_dict['rseq'].getBody().getNum() + cseq, method = headers_dict['cseq'].getBody().getCSeq() + rval = [str(headers_dict['call-id'].getBody()), headers_dict['from'].getBody().getTag(), rseq, cseq, method] + return tuple(rval) diff --git a/sippy/SipTransactionManager.py b/sippy/SipTransactionManager.py index 56e0082..fac0284 100644 --- a/sippy/SipTransactionManager.py +++ b/sippy/SipTransactionManager.py @@ -33,6 +33,7 @@ from sippy.SipAddress import SipAddress from sippy.SipRoute import SipRoute from sippy.SipHeader import SipHeader +from sippy.SipRSeq import SipRSeq from sippy.Exceptions.SipParseError import SipParseError, SdpParseError from sippy.Exceptions.RtpProxyError import RtpProxyError from sippy.Udp_server import Udp_server, Udp_server_opts @@ -98,6 +99,23 @@ def cleanup(self): self.req_out_cb = None self.res_out_cb = None +class SipUASTransaction(SipTransaction): + rseq = None + prov_inflight = None + pr_rel = False + + def __init__(self): + self.rseq = SipRSeq() + self.prov_inflight = {} + SipTransaction.__init__(self) + +class SipUACTransaction(SipTransaction): + seen_rseqs = None + + def __init__(self): + self.seen_rseqs = [] + SipTransaction.__init__(self) + # Symbolic states names class SipTransactionState(object): pass @@ -217,19 +235,23 @@ def rotateCache(self): self.cache_r2l = {} class SipTMRetransmitO(object): + t = None userv = None data = None address = None call_id = None lossemul = None + checksum = None def __init__(self, userv = None, data = None, address = None, - call_id = None, lossemul = None): + call_id = None, lossemul = None, t = None, checksum = None): self.userv = userv self.data = data self.address = address self.call_id = call_id self.lossemul = lossemul + self.t = t + self.checksum = checksum class SipTransactionManager(object): global_config = None @@ -241,6 +263,7 @@ class SipTransactionManager(object): l2rcache = None nat_traversal = False req_consumers = None + rtid2tid = None provisional_retr = 0 ploss_out_rate = 0.0 pdelay_out_max = 0.0 @@ -257,6 +280,7 @@ def __init__(self, global_config, req_cb = None): self.l1rcache = {} self.l2rcache = {} self.req_consumers = {} + self.rtid2tid = {} Timeout(self.rCachePurge, 32, -1) def handleIncoming(self, data_in, ra:Remote_address, server, rtime): @@ -383,7 +407,7 @@ def get_contact(): def newTransaction(self, msg, resp_cb = None, laddress = None, userv = None, \ cb_ifver = 1, compact = False, t = None): if t == None: - t = SipTransaction() + t = SipUACTransaction() t.rtime = MonoTime() t.compact = compact t.method = msg.getMethod() @@ -453,6 +477,14 @@ def incomingResponse(self, msg, t, checksum): if t.state == TERMINATED: return + code = msg.getSCode()[0] + if code > 100 and code < 200 and msg.countHFs('rseq') > 0: + rskey = msg.getRTId() + if rskey in t.seen_rseqs: + self.l1rcache[checksum] = (None, None, None) + return None + t.seen_rseqs.append(rskey) + if t.state == TRYING: # Stop timers if t.teA != None: @@ -464,7 +496,7 @@ def incomingResponse(self, msg, t, checksum): t.teB.cancel() t.teB = None - if msg.getSCode()[0] < 200: + if code < 200: # Privisional response - leave everything as is, except that # change state and reload timeout timer if t.state == TRYING: @@ -483,12 +515,11 @@ def incomingResponse(self, msg, t, checksum): # Final response - notify upper layer and remove transaction if t.needack: # Prepare and send ACK if necessary - fcode = msg.getSCode()[0] tag = msg.getHFBody('to').getTag() if tag != None: t.ack.getHFBody('to').setTag(tag) rAddr = None - if msg.getSCode()[0] >= 200 and msg.getSCode()[0] < 300: + if code >= 200 and code < 300: # Some hairy code ahead if msg.countHFs('contact') > 0: rTarget = msg.getHFBody('contact').getUrl().getCopy() @@ -512,7 +543,7 @@ def incomingResponse(self, msg, t, checksum): t.ack.setTarget(rAddr) t.ack.delHFs('route') t.ack.appendHeaders([SipHeader(name = 'route', body = x) for x in routes]) - if fcode >= 200 and fcode < 300: + if code >= 200 and code < 300: t.ack.getHFBody('via').genBranch() if rAddr == None: rAddr = (t.address, t.userv.transport) @@ -581,20 +612,24 @@ def incomingRequest(self, msg, checksum, tids, server): self.transmitMsg(server, resp, resp.getHFBody('via').getTAddr(), checksum, \ t.compact) return - if msg.getMethod() != 'ACK': - tid = msg.getTId(wBRN = True) - else: + mmethod = msg.getMethod() + if mmethod == 'ACK': tid = msg.getTId(wTTG = True) + elif mmethod == 'PRACK': + rtid = msg.getRTId() + tid = self.rtid2tid.get(rtid, None) + else: + tid = msg.getTId(wBRN = True) t = self.tserver.get(tid, None) if t != None: #print 'existing transaction' - if msg.getMethod() == t.method: + if mmethod == t.method: # Duplicate received, check that we have sent any response on this # request already if t.data != None: self.transmitData(t.userv, t.data, t.address, checksum) return - elif msg.getMethod() == 'CANCEL': + elif mmethod == 'CANCEL': # RFC3261 says that we have to reply 200 OK in all cases if # there is such transaction resp = msg.genResponse(200, 'OK') @@ -602,7 +637,7 @@ def incomingRequest(self, msg, checksum, tids, server): t.compact) if t.state in (TRYING, RINGING): self.doCancel(t, msg.rtime, msg) - elif msg.getMethod() == 'ACK' and t.state == COMPLETED: + elif mmethod == 'ACK' and t.state == COMPLETED: t.state = CONFIRMED if t.teA != None: t.teA.cancel() @@ -614,19 +649,44 @@ def incomingRequest(self, msg, checksum, tids, server): t.ack_cb(msg) t.cleanup() self.l1rcache[checksum] = SipTMRetransmitO() - elif msg.getMethod() == 'ACK': + elif mmethod == 'PRACK': + rskey = msg.getRTId() + if rskey in t.prov_inflight: + rert_t = t.prov_inflight[rskey] + rert_t.cancel() + del t.prov_inflight[rskey] + del self.rtid2tid[rskey] + resp = msg.genResponse(200, 'OK') + else: + print('rskey: %s, prov_inflight: %s' % (str(rskey), str(t.prov_inflight))) + resp = msg.genResponse(481, 'Huh?') + self.transmitMsg(t.userv, resp, resp.getHFBody('via').getTAddr(), checksum, \ + t.compact) + elif mmethod == 'ACK': # Some ACK that doesn't match any existing transaction. # Drop and forget it - upper layer is unlikely to be interested # to seeing this anyway. print(datetime.now(), 'unmatched ACK transaction - ignoring') + print(datetime.now(), 'tid: %s, self.tserver: %s' % (str(tid), \ + str(self.tserver))) sys.stdout.flush() self.l1rcache[checksum] = SipTMRetransmitO() - elif msg.getMethod() == 'CANCEL': + elif mmethod == 'PRACK': + # Some ACK that doesn't match any existing transaction. + # Drop and forget it - upper layer is unlikely to be interested + # to seeing this anyway. + print(datetime.now(), 'unmatched PRACK transaction - 481\'ing') + print(datetime.now(), 'rtid: %s, tid: %s, self.tserver: %s' % (str(rtid), str(tid), \ + str(self.tserver))) + sys.stdout.flush() + resp = msg.genResponse(481, 'Huh?') + self.transmitMsg(server, resp, resp.getHFBody('via').getTAddr(), checksum) + elif mmethod == 'CANCEL': resp = msg.genResponse(481, 'Call Leg/Transaction Does Not Exist') self.transmitMsg(server, resp, resp.getHFBody('via').getTAddr(), checksum) else: - #print 'new transaction', msg.getMethod() - t = SipTransaction() + #print 'new transaction', mmethod + t = SipUASTransaction() t.tid = tid t.state = TRYING t.teA = None @@ -634,7 +694,7 @@ def incomingRequest(self, msg, checksum, tids, server): t.teE = None t.teF = None t.teG = None - t.method = msg.getMethod() + t.method = mmethod t.rtime = msg.rtime t.data = None t.address = None @@ -648,7 +708,7 @@ def incomingRequest(self, msg, checksum, tids, server): # For messages received on the wildcard interface find # or create more specific server. t.userv = self.l4r.getServer(msg.getSource()) - if msg.getMethod() == 'INVITE': + if mmethod == 'INVITE': t.r487 = msg.genResponse(487, 'Request Terminated') t.needack = True t.branch = msg.getHFBody('via').getBranch() @@ -719,11 +779,29 @@ def sendResponse(self, resp, t = None, retrans = False, ack_cb = None, toHF = resp.getHFBody('to') if scode > 100 and toHF.getTag() == None: toHF.genTag() + if t.pr_rel and scode > 100 and scode < 200: + rseq = t.rseq.getCopy() + t.rseq.incNum() + rseq_h = SipHeader(name = 'rseq', body = rseq) + resp.appendHeader(rseq_h) + tid = resp.getTId(wBRN = True) + rtid = resp.getRTId() + self.rtid2tid[rtid] = tid + else: + rseq_h = None t.data = resp.localStr(t.userv.getSIPaddr(), compact = t.compact) t.address = resp.getHFBody('via').getTAddr() self.transmitData(t.userv, t.data, t.address, t.checksum, lossemul) if t.res_out_cb != None: t.res_out_cb(resp) + if rseq_h != None: + rskey = resp.getRTId() + if lossemul > 0: + lossemul -= 1 + rdata = SipTMRetransmitO(t = t, userv = t.userv, data = t.data, \ + address = t.address, lossemul = lossemul) + rert_t = Timeout(self.retrUasResponse, 0.5, 1, rdata, 0.5, rskey) + t.prov_inflight[rskey] = rert_t if scode < 200: t.state = RINGING if self.provisional_retr > 0 and scode > 100: @@ -748,9 +826,14 @@ def sendResponse(self, resp, t = None, retrans = False, ack_cb = None, # ACK transaction after this point. Branch tag in ACK # could differ as well. del self.tserver[t.tid] - t.tid = list(t.tid[:-1]) - t.tid.append(resp.getHFBody('to').getTag()) - t.tid = tuple(t.tid) + new_tid = list(t.tid[:-1]) + new_tid.append(resp.getHFBody('to').getTag()) + new_tid = tuple(new_tid) + for ik in t.prov_inflight.keys(): + if not ik in self.rtid2tid or self.rtid2tid[ik] != t.tid: + continue + self.rtid2tid[ik] = new_tid + t.tid = new_tid self.tserver[t.tid] = t # Install retransmit timer if necessary t.tout = 0.5 @@ -760,6 +843,19 @@ def sendResponse(self, resp, t = None, retrans = False, ack_cb = None, del self.tserver[t.tid] t.cleanup() + def retrUasResponse(self, rdata, last_timeout, rskey): + if last_timeout > 16: + del rdata.t.prov_inflight[rskey] + del self.rtid2tid[rskey] + return + if rdata.lossemul == 0: + self.transmitData(rdata.userv, rdata.data, rdata.address) + else: + rdata.lossemul -= 1 + last_timeout *= 2 + rert_t = Timeout(self.retrUasResponse, last_timeout, 1, rdata, last_timeout, rskey) + rdata.t.prov_inflight[rskey] = rert_t + def doCancel(self, t, rtime = None, req = None): if rtime == None: rtime = MonoTime() diff --git a/sippy/UA.py b/sippy/UA.py index 317a2e9..aa669f3 100644 --- a/sippy/UA.py +++ b/sippy/UA.py @@ -156,11 +156,13 @@ def recvRequest(self, req, sip_t): sip_t.compact = self.compact_sip if self.remote_ua == None: self.update_ua(req) + rmethod = req.getMethod() if self.rCSeq != None and self.rCSeq >= req.getHFBody('cseq').getCSeqNum(): return (req.genResponse(500, 'Server Internal Error', server = self.local_ua), None, None) self.rCSeq = req.getHFBody('cseq').getCSeqNum() if self.state == None: - if req.getMethod() == 'INVITE': + if rmethod == 'INVITE': + sip_t.pr_rel = True self.changeState((UasStateIdle,)) else: return None @@ -168,7 +170,7 @@ def recvRequest(self, req, sip_t): if newstate != None: self.changeState(newstate) self.emitPendingEvents() - if newstate != None and req.getMethod() == 'INVITE': + if newstate != None and rmethod == 'INVITE': return (None, self.state.cancel, self.disconnect) else: return None @@ -306,7 +308,8 @@ def genRequest(self, method, body = None, cqop = None, \ return req def sendUasResponse(self, scode, reason, body = None, contacts = None, \ - reason_rfc3326 = None, extra_headers = None, ack_wait = False): + reason_rfc3326 = None, extra_headers = None, ack_wait = False, \ + retrans = False): uasResp = self.uasResp.getCopy() uasResp.setSCode(scode, reason) uasResp.setBody(body) diff --git a/sippy/UacStateRinging.py b/sippy/UacStateRinging.py index 18d1305..cad325c 100644 --- a/sippy/UacStateRinging.py +++ b/sippy/UacStateRinging.py @@ -31,6 +31,8 @@ from sippy.UaStateGeneric import UaStateGeneric from sippy.CCEvents import CCEventRing, CCEventConnect, CCEventFail, CCEventRedirect, \ CCEventDisconnect, CCEventPreConnect +from sippy.SipHeader import SipHeader +from sippy.SipRAck import SipRAck class UacStateRinging(UaStateGeneric): sname = 'Ringing(UAC)' @@ -41,6 +43,17 @@ def recvResponse(self, resp, tr): code, reason = resp.getSCode() scode = (code, reason, body) if code < 200: + if resp.countHFs('rseq') > 0: + tag = resp.getHFBody('to').getTag() + self.ua.rUri.setTag(tag) + rseq = resp.getHFBody('rseq') + cseq = resp.getHFBody('cseq') + req = self.ua.genRequest('PRACK') + self.ua.lCSeq += 1 + rack = SipRAck(rseq = rseq.number, cseq = cseq.cseq, method = cseq.method) + req.appendHeader(SipHeader(name = 'rack', body = rack)) + self.ua.global_config['_sip_tm'].newTransaction(req, \ + laddress = self.ua.source_address, compact = self.ua.compact_sip) if self.ua.p1xx_ts == None: self.ua.p1xx_ts = resp.rtime self.ua.last_scode = code diff --git a/sippy/UacStateTrying.py b/sippy/UacStateTrying.py index c6b9c2e..a918e2f 100644 --- a/sippy/UacStateTrying.py +++ b/sippy/UacStateTrying.py @@ -32,6 +32,8 @@ from sippy.Time.Timeout import TimeoutAbsMono from sippy.CCEvents import CCEventRing, CCEventConnect, CCEventFail, CCEventRedirect, \ CCEventDisconnect, CCEventPreConnect +from sippy.SipHeader import SipHeader +from sippy.SipRAck import SipRAck class UacStateTrying(UaStateGeneric): sname = 'Trying(UAC)' @@ -61,6 +63,17 @@ def recvResponse(self, resp, tr): if code < 200 and self.ua.expire_time != None: self.ua.expire_timer = TimeoutAbsMono(self.ua.expires, self.ua.expire_mtime) if code < 200: + if resp.countHFs('rseq') > 0: + tag = resp.getHFBody('to').getTag() + self.ua.rUri.setTag(tag) + rseq = resp.getHFBody('rseq') + cseq = resp.getHFBody('cseq') + req = self.ua.genRequest('PRACK') + self.ua.lCSeq += 1 + rack = SipRAck(rseq = rseq.number, cseq = cseq.cseq, method = cseq.method) + req.appendHeader(SipHeader(name = 'rack', body = rack)) + self.ua.global_config['_sip_tm'].newTransaction(req, \ + laddress = self.ua.source_address, compact = self.ua.compact_sip) event = CCEventRing(scode, rtime = resp.rtime, origin = self.ua.origin) if body != None: if self.ua.on_remote_sdp_change != None: