From 550ee32d166f625cff1d8cd22a628cc9f5c9f7a3 Mon Sep 17 00:00:00 2001 From: Denis Zabavchik Date: Wed, 6 Sep 2023 10:04:39 -0400 Subject: [PATCH] Multiple resource leaks fixed in proxy and dealer --- crossbar/router/dealer.py | 52 +++++--- crossbar/worker/proxy.py | 274 +++++++++++++++++++++++++------------- 2 files changed, 216 insertions(+), 110 deletions(-) diff --git a/crossbar/router/dealer.py b/crossbar/router/dealer.py index 083fab783..24fd99abe 100644 --- a/crossbar/router/dealer.py +++ b/crossbar/router/dealer.py @@ -40,6 +40,7 @@ class InvocationRequest(object): 'id', 'registration', 'caller', + 'caller_session_id', 'call', 'callee', 'forward_for', @@ -53,6 +54,7 @@ def __init__(self, id, registration, caller, call, callee, forward_for, authoriz self.id = id self.registration = registration self.caller = caller + self.caller_session_id = caller._session_id self.call = call self.callee = callee self.forward_for = forward_for @@ -185,6 +187,7 @@ def detach(self, session): is_rlink_session = (session._authrole == "rlink") if session in self._caller_to_invocations: + # this needs to update all four places where we track invocations similar to _remove_invoke_request outstanding = self._caller_to_invocations.get(session, []) for invoke in outstanding: # type: InvocationRequest if invoke.canceled: @@ -207,11 +210,22 @@ def detach(self, session): request=invoke.id, session=session._session_id, ) + + invokes = self._callee_to_invocations[callee] + invokes.remove(invoke) + if not invokes: + del self._callee_to_invocations[callee] + + del self._invocations[invoke.id] + del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)] + self._router.send(invoke.callee, message.Interrupt( request=invoke.id, mode=message.Cancel.KILLNOWAIT, )) + del self._caller_to_invocations[session] + if session in self._session_to_registrations: # send out Errors for any in-flight calls we have @@ -235,6 +249,17 @@ def detach(self, session): if invoke.caller._transport: invoke.caller._transport.send(reply) + invokes = self._caller_to_invocations[invoke.caller] + invokes.remove(invoke) + if not invokes: + del self._caller_to_invocations[invoke.caller] + + del self._invocations[invoke.id] + del self._invocations_by_call[(invoke.caller_session_id, invoke.call.request)] + + if outstanding: + del self._callee_to_invocations[session] + for registration in self._session_to_registrations[session]: was_registered, was_last_callee = self._registration_map.drop_observer(session, registration) @@ -1120,23 +1145,20 @@ def _remove_invoke_request(self, invocation_request): invocation_request.timeout_call.cancel() invocation_request.timeout_call = None - invokes = self._callee_to_invocations[invocation_request.callee] - invokes.remove(invocation_request) - if not invokes: - del self._callee_to_invocations[invocation_request.callee] - - invokes = self._caller_to_invocations[invocation_request.caller] - invokes.remove(invocation_request) - if not invokes: - del self._caller_to_invocations[invocation_request.caller] + # all four places should always be updated together + if invocation_request.id in self._invocations: + del self._invocations[invocation_request.id] + invokes = self._callee_to_invocations[invocation_request.callee] + invokes.remove(invocation_request) + if not invokes: + del self._callee_to_invocations[invocation_request.callee] - del self._invocations[invocation_request.id] + invokes = self._caller_to_invocations[invocation_request.caller] + invokes.remove(invocation_request) + if not invokes: + del self._caller_to_invocations[invocation_request.caller] - # the session_id will be None if the caller session has - # already vanished - caller_id = invocation_request.caller._session_id - if caller_id is not None: - del self._invocations_by_call[caller_id, invocation_request.call.request] + del self._invocations_by_call[invocation_request.caller_session_id, invocation_request.call.request] # noinspection PyUnusedLocal def processCancel(self, session, cancel): diff --git a/crossbar/worker/proxy.py b/crossbar/worker/proxy.py index 1dd9c2e36..476ccd9f6 100644 --- a/crossbar/worker/proxy.py +++ b/crossbar/worker/proxy.py @@ -189,8 +189,9 @@ def onClose(self, wasClean): :param wasClean: Indicates if the transport has been closed regularly. :type wasClean: bool """ - self.log.info('{func} proxy frontend session closed (wasClean={wasClean})', + self.log.info('{func} proxy frontend session {sessionId} closed (wasClean={wasClean})', func=hltype(self.onClose), + sessionId=hlid(self._session_id), wasClean=wasClean) # actually, at this point, the backend session should already be gone, but better check! @@ -324,6 +325,11 @@ def backend_connected(backend: ProxyBackendSession): # first, wait for the WAMP-level transport to connect before starting to join yield backend._on_connect + # while we were yielding, frontend session might have been closed (transport disconnected) + if self.transport is None: + backend.disconnect() + raise TransportLost("Proxy frontend session disconnected while connecting to backend") + # node private key key = _read_node_key(self._controller._cbdir, private=False) @@ -428,6 +434,16 @@ def backend_failed(fail): return result + def _send_if_transport_is_alive(self, msg): + # this is a debugging helper + # We received a message on the backend connection (auth, welcome) and need to send it to the client + # this is semantically the same as _forward, and it solve the same issues + # We don't send here, but we also do not cause an exception if the transport is gone + if self.transport: + self.transport.send(msg) + else: + self.log.debug('Trying to send a message to the client, but no frontend transport! [{msg}]', msg=msg) + def _forward(self, msg): # we received a message on the backend connection: forward to client over frontend connection if self.transport: @@ -609,7 +625,7 @@ def _process_Hello(self, msg): hello_result = yield as_future(self._pending_auth.hello, realm, details) except Exception as e: self.log.failure() - self.transport.send( + self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message='Frontend connection accept failed ({})'.format(e))) return @@ -618,61 +634,94 @@ def _process_Hello(self, msg): authmethod=hlval(authmethod), hello_result=hello_result) - # if the frontend session is accepted right away (eg when doing "anonymous" authentication), process the - # frontend accept .. - if isinstance(hello_result, types.Accept): - try: - # get a backend session mapped to the incoming frontend session - session = yield self.frontend_accepted(hello_result) - except Exception as e: - self.log.failure() - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='Frontend connection accept failed ({})'.format(e))) - return + #check if client disconnected while we were yielding to authenticator + if not self.transport: + self.log.debug( + '{func} proxy frontend disconnected while processing hello in authenticator:' + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + return + else: + # if the frontend session is accepted right away (eg when doing "anonymous" authentication), process the + # frontend accept .. + if isinstance(hello_result, types.Accept): + try: + # get a backend session mapped to the incoming frontend session + session = yield self.frontend_accepted(hello_result) + except Exception as e: + self.log.failure() + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='Frontend connection accept failed ({})'.format(e))) + return - def _on_backend_joined(session, details): - # we now got everything! the frontend is authenticated, and a backend session is associated. - msg = message.Welcome(self._session_id, - ProxyFrontendSession.ROLES, - realm=details.realm, - authid=details.authid, - authrole=details.authrole, - authmethod=hello_result.authmethod, - authprovider=hello_result.authprovider, - authextra=dict(details.authextra or {}, **self._custom_authextra)) - self._backend_session = session - self.transport.send(msg) - self.log.debug( - '{func} proxy frontend session WELCOME: session_id={session}, session={session}, ' - 'details="{details}"', - func=hltype(self._process_Hello), - session_id=hlid(self._session_id), - session=self, - details=details) + if not self.transport: + # we have not yet established a backend session, only authenticator session was used + self.log.debug('{func} proxy frontend disconnected while connecting backend session' + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + self._controller.unmap_backend(self, session) + self._backend_session = None + else: + def _on_backend_joined(session, details): + # we now got everything! the frontend is authenticated, and a backend session is associated. + msg = message.Welcome(self._session_id, + ProxyFrontendSession.ROLES, + realm=details.realm, + authid=details.authid, + authrole=details.authrole, + authmethod=hello_result.authmethod, + authprovider=hello_result.authprovider, + authextra=dict(details.authextra or {}, **self._custom_authextra)) + if self.transport: + self._backend_session = session + self.transport.send(msg) + self.log.debug( + '{func} proxy frontend session WELCOME: session_id={session_id}, session={session}, ' + 'details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + else: + self.log.debug( + '{func} proxy frontend disconnected while joining backend session' + ': session_id={session_id}, session={session}, details="{details}"', + func=hltype(self._process_Hello), + session_id=hlid(self._session_id), + session=self, + details=details) + self._controller.unmap_backend(self, session) + self._backend_session = None - session.on('join', _on_backend_joined) + session.on('join', _on_backend_joined) - # if the client is required to do an authentication message exchange, answer sending a CHALLENGE message - elif isinstance(hello_result, types.Challenge): - self.transport.send(message.Challenge(hello_result.method, extra=hello_result.extra)) + # if the client is required to do an authentication message exchange, answer sending a CHALLENGE message + elif isinstance(hello_result, types.Challenge): + self._send_if_transport_is_alive(message.Challenge(hello_result.method, extra=hello_result.extra)) - # if the client is denied right away, answer by sending an ABORT message - elif isinstance(hello_result, types.Deny): - self.transport.send(message.Abort(hello_result.reason, message=hello_result.message)) + # if the client is denied right away, answer by sending an ABORT message + elif isinstance(hello_result, types.Deny): + self._send_if_transport_is_alive(message.Abort(hello_result.reason, message=hello_result.message)) - else: - # should not arrive here: internal (logic) error - self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', - rtype=hltype(hello_result), - func=hltype(self._process_Hello)) - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='internal error: unexpected authenticator return type {}'.format( - type(hello_result)))) - return + else: + # should not arrive here: internal (logic) error + self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', + rtype=hltype(hello_result), + func=hltype(self._process_Hello)) + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='internal error: unexpected authenticator return type {}'.format( + type(hello_result)))) + return - self.transport.send(message.Abort(ApplicationError.NO_AUTH_METHOD, message='no suitable authmethod found')) + self._send_if_transport_is_alive(message.Abort(ApplicationError.NO_AUTH_METHOD, message='no suitable authmethod found')) @inlineCallbacks def _process_Authenticate(self, msg): @@ -689,55 +738,84 @@ def _process_Authenticate(self, msg): func=hltype(self._process_Authenticate), pending_auth=self._pending_auth, authresult=auth_result) - if isinstance(auth_result, types.Accept): - try: - session = yield self.frontend_accepted(auth_result) - except Exception as e: - self.log.failure() - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='Frontend connection accept failed ({})'.format(e))) - else: - def _on_backend_joined(session, details): - msg = message.Welcome(self._session_id, - ProxyFrontendSession.ROLES, - realm=details.realm, - authid=details.authid, - authrole=details.authrole, - authmethod=auth_result.authmethod, - authprovider=auth_result.authprovider, - authextra=dict(details.authextra or {}, **self._custom_authextra)) - self._backend_session = session - self.transport.send(msg) - self.log.debug( - '{func} proxy frontend session WELCOME: session_id={session_id}, ' - 'session={session}, msg={msg}', - func=hltype(self._process_Authenticate), - session_id=hlid(self._session_id), - session=self, - msg=msg) - - session.on('join', _on_backend_joined) - elif isinstance(auth_result, types.Deny): - self.transport.send(message.Abort(auth_result.reason, message=auth_result.message)) + # check if client disconnected while we were yielding to authenticator + if not self.transport: + # we have not yet established a backend session, only authenticator session was used + self.log.info('{func} frontend disconnected while processing pending' + ' authentication {pending_auth}: {authresult}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth, + authresult=auth_result) else: - # should not arrive here: logic error - self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', - rtype=hltype(auth_result), - func=hltype(self._process_Authenticate)) - self.transport.send( - message.Abort(ApplicationError.AUTHENTICATION_FAILED, - message='internal error: unexpected authenticator return type {}'.format( - type(auth_result)))) + if isinstance(auth_result, types.Accept): + try: + session = yield self.frontend_accepted(auth_result) + except TransportLost: + self.log.info('{func} frontend disconnected while connecting backend session {pending_auth}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth) + except Exception as e: + self.log.failure() + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='Frontend connection accept failed ({})'.format(e))) + else: + if self.transport is None: + # we have not yet established a backend session, only authenticator session was used + self.log.info('{func} frontend disconnected connecting backend session {pending_auth}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth) + self._controller.unmap_backend(self, session) + self._backend_session = None + else: + def _on_backend_joined(session, details): + msg = message.Welcome(self._session_id, + ProxyFrontendSession.ROLES, + realm=details.realm, + authid=details.authid, + authrole=details.authrole, + authmethod=auth_result.authmethod, + authprovider=auth_result.authprovider, + authextra=dict(details.authextra or {}, **self._custom_authextra)) + if self.transport: + self._backend_session = session + self.transport.send(msg) + self.log.debug( + '{func} proxy frontend session WELCOME: session_id={session_id}, ' + 'session={session}, msg={msg}', + func=hltype(self._process_Authenticate), + session_id=hlid(self._session_id), + session=self, + msg=msg) + else: + self.log.info( + '{func} frontend disconnected while joining backend session {pending_auth}', + func=hltype(self._process_Authenticate), + pending_auth=self._pending_auth) + self._controller.unmap_backend(self, session) + self._backend_session = None + + session.on('join', _on_backend_joined) + elif isinstance(auth_result, types.Deny): + self._send_if_transport_is_alive(message.Abort(auth_result.reason, message=auth_result.message)) + else: + # should not arrive here: logic error + self.log.warn('{func} internal error: unexpected authenticator return type {rtype}', + rtype=hltype(auth_result), + func=hltype(self._process_Authenticate)) + self._send_if_transport_is_alive( + message.Abort(ApplicationError.AUTHENTICATION_FAILED, + message='internal error: unexpected authenticator return type {}'.format( + type(auth_result)))) else: # should not arrive here: logic error - self.transport.send( + self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message='internal error: unexpected pending authentication')) else: # should not arrive here: client misbehaving! - self.transport.send( + self._send_if_transport_is_alive( message.Abort(ApplicationError.AUTHENTICATION_FAILED, message='no pending authentication')) @@ -1428,7 +1506,6 @@ def stop(self): topic = '{}.on_proxy_connection_stopped'.format(self._controller._uri_prefix) yield self._controller.publish(topic, self.marshal(), options=types.PublishOptions(acknowledge=True)) - class ProxyController(TransportController): """ Controller for proxy workers. Manages: @@ -1498,6 +1575,7 @@ def __init__(self, config=None, reactor=None, personality=None): # map: (realm_name, role_name) -> ProxyRoute self._service_sessions = {} + def has_realm(self, realm: str) -> bool: """ Check if a route to a realm with the given name is currently running. @@ -1592,12 +1670,18 @@ def get_service_session(self, realm: str, authrole: str) -> ApplicationSession: backend_config = self.get_backend_config(realm, authrole) # create and store a new service session connected to the backend router worker - self._service_sessions[realm][authrole] = yield make_service_session( - self._reactor, self, backend_config, realm, authrole) + self._service_sessions[realm][authrole] = make_service_session(self._reactor, self, backend_config, realm, authrole) else: # mark as non-existing! self._service_sessions[realm][authrole] = None + if self._service_sessions[realm] and self._service_sessions[realm][authrole]: + service_session_or_deferred = self._service_sessions[realm][authrole] + if isinstance(service_session_or_deferred, Deferred): + service_session = yield service_session_or_deferred + if service_session is not None: + self._service_sessions[realm][authrole] = service_session + # return cached service session if self._service_sessions[realm] and self._service_sessions[realm][authrole]: service_session = self._service_sessions[realm][authrole]