Skip to content

Commit

Permalink
Multiple resource leaks fixed in proxy and dealer
Browse files Browse the repository at this point in the history
  • Loading branch information
DZabavchik committed Sep 6, 2023
1 parent 0089c1e commit 550ee32
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 110 deletions.
52 changes: 37 additions & 15 deletions crossbar/router/dealer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class InvocationRequest(object):
'id',
'registration',
'caller',
'caller_session_id',
'call',
'callee',
'forward_for',
Expand All @@ -53,6 +54,7 @@ def __init__(self, id, registration, caller, call, callee, forward_for, authoriz
self.id = id
self.registration = registration
self.caller = caller
self.caller_session_id = caller._session_id
self.call = call
self.callee = callee
self.forward_for = forward_for
Expand Down Expand Up @@ -185,6 +187,7 @@ def detach(self, session):
is_rlink_session = (session._authrole == "rlink")
if session in self._caller_to_invocations:

# this needs to update all four places where we track invocations similar to _remove_invoke_request
outstanding = self._caller_to_invocations.get(session, [])
for invoke in outstanding: # type: InvocationRequest
if invoke.canceled:
Expand All @@ -207,11 +210,22 @@ def detach(self, session):
request=invoke.id,
session=session._session_id,
)

invokes = self._callee_to_invocations[callee]
invokes.remove(invoke)
if not invokes:
del self._callee_to_invocations[callee]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

self._router.send(invoke.callee, message.Interrupt(
request=invoke.id,
mode=message.Cancel.KILLNOWAIT,
))

del self._caller_to_invocations[session]

if session in self._session_to_registrations:

# send out Errors for any in-flight calls we have
Expand All @@ -235,6 +249,17 @@ def detach(self, session):
if invoke.caller._transport:
invoke.caller._transport.send(reply)

invokes = self._caller_to_invocations[invoke.caller]
invokes.remove(invoke)
if not invokes:
del self._caller_to_invocations[invoke.caller]

del self._invocations[invoke.id]
del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)]

if outstanding:
del self._callee_to_invocations[session]

for registration in self._session_to_registrations[session]:
was_registered, was_last_callee = self._registration_map.drop_observer(session, registration)

Expand Down Expand Up @@ -1120,23 +1145,20 @@ def _remove_invoke_request(self, invocation_request):
invocation_request.timeout_call.cancel()
invocation_request.timeout_call = None

invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]
# all four places should always be updated together
if invocation_request.id in self._invocations:
del self._invocations[invocation_request.id]
invokes = self._callee_to_invocations[invocation_request.callee]
invokes.remove(invocation_request)
if not invokes:
del self._callee_to_invocations[invocation_request.callee]

del self._invocations[invocation_request.id]
invokes = self._caller_to_invocations[invocation_request.caller]
invokes.remove(invocation_request)
if not invokes:
del self._caller_to_invocations[invocation_request.caller]

# the session_id will be None if the caller session has
# already vanished
caller_id = invocation_request.caller._session_id
if caller_id is not None:
del self._invocations_by_call[caller_id, invocation_request.call.request]
del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request]

# noinspection PyUnusedLocal
def processCancel(self, session, cancel):
Expand Down
Loading

0 comments on commit 550ee32

Please sign in to comment.