From d6447ddfdc327e20eecf6ff8fa7e593241da388c Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 23 Sep 2020 15:14:30 +0200 Subject: [PATCH 01/30] Pika event broker with multithreading --- rasa/core/brokers/pika.py | 184 ++++++++++++++++++++++---------------- 1 file changed, 105 insertions(+), 79 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 229baeb2e753..73d299fa3054 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -1,11 +1,12 @@ import json import logging import os +import sys import time import typing +import multiprocessing from collections import deque from contextlib import contextmanager -from threading import Thread from typing import ( Callable, Deque, @@ -19,6 +20,8 @@ Generator, ) +from pika.channel import Channel + from rasa.constants import DEFAULT_LOG_LEVEL_LIBRARIES, ENV_LOG_LEVEL_LIBRARIES from rasa.shared.constants import DOCS_URL_PIKA_EVENT_BROKER from rasa.core.brokers.broker import EventBroker @@ -29,7 +32,6 @@ if typing.TYPE_CHECKING: from pika.adapters.blocking_connection import BlockingChannel from pika import SelectConnection, BlockingConnection, BasicProperties - from pika.channel import Channel import pika from pika.connection import Parameters, Connection @@ -252,6 +254,18 @@ def close_pika_connection(connection: "Connection") -> None: class PikaEventBroker(EventBroker): """Pika-based event broker for publishing messages to RabbitMQ.""" + NUMBER_OF_MP_WORKERS = 1 + MP_CONTEXT = None + if sys.platform == "darwin" and sys.version_info < (3, 8): + # On macOS, Python 3.8 has switched the default start method to "spawn". To + # quote the documentation: "The fork start method should be considered + # unsafe as it can lead to crashes of the subprocess". Apply this fix when + # running on macOS on Python <= 3.7.x as well. + + # See: + # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + MP_CONTEXT = "spawn" + def __init__( self, host: Text, @@ -288,6 +302,7 @@ def __init__( self.password = password self.port = port self.channel: Optional["Channel"] = None + self.process: Optional[multiprocessing.Process] = None self.queues = self._get_queues_from_args(queues) self.should_keep_unpublished_messages = should_keep_unpublished_messages self.raise_on_failure = raise_on_failure @@ -297,6 +312,9 @@ def __init__( self._run_pika() def __del__(self) -> None: + if self.process.is_alive(): + self.process.kill() + if self.channel: close_pika_channel(self.channel) close_pika_connection(self.channel.connection) @@ -305,11 +323,6 @@ def close(self) -> None: """Close the pika channel and connection.""" self.__del__() - @property - def rasa_environment(self) -> Optional[Text]: - """Get value of the `RASA_ENVIRONMENT` environment variable.""" - return os.environ.get("RASA_ENVIRONMENT") - @staticmethod def _get_queues_from_args( queues_arg: Union[List[Text], Tuple[Text], Text, None] @@ -374,8 +387,8 @@ def _run_pika(self) -> None: self._pika_connection = initialise_pika_select_connection( parameters, self._on_open_connection, self._on_open_connection_error ) - # Run Pika io loop in extra thread so it's not blocking - self._run_pika_io_loop_in_thread() + self._process_queue = self._get_mp_context().Queue() + self.process = self._start_pika_process() def _on_open_connection(self, connection: "SelectConnection") -> None: logger.debug(f"RabbitMQ connection to '{self.host}' was established.") @@ -408,13 +421,90 @@ def _on_channel_open(self, channel: "Channel") -> None: f"Remaining unpublished messages: {len(self._unpublished_messages)}." ) - def _run_pika_io_loop_in_thread(self) -> None: - thread = Thread(target=self._run_pika_io_loop, daemon=True) - thread.start() + def _get_mp_context(self) -> multiprocessing.context.BaseContext: + return multiprocessing.get_context(self.MP_CONTEXT) + + def _start_pika_process(self) -> multiprocessing.Process: + process = multiprocessing.Process( + target=self.process_pika_messages, + args=(self._process_queue, self.channel, self.host), + daemon=True + ) + process.start() + return process + + @staticmethod + def _get_message_properties( + headers: Optional[Dict[Text, Text]] = None + ) -> "BasicProperties": + """Create RabbitMQ message `BasicProperties`. + + The `app_id` property is set to the value of `self.rasa_environment` if + present, and the message delivery mode is set to 2 (persistent). In + addition, the `headers` property is set if supplied. + + Args: + headers: Message headers to add to the message properties of the + published message (key-value dictionary). The headers can be retrieved in + the consumer from the `headers` attribute of the message's + `BasicProperties`. + + Returns: + `pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable + as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the + properties' headers. + """ + from pika.spec import BasicProperties + + # make message persistent + kwargs = {"delivery_mode": 2} + + if os.environ.get("RASA_ENVIRONMENT"): + kwargs["app_id"] = os.environ.get("RASA_ENVIRONMENT") + + if headers: + kwargs["headers"] = headers - def _run_pika_io_loop(self) -> None: - # noinspection PyUnresolvedReferences - self._pika_connection.ioloop.start() + return BasicProperties(**kwargs) + + @staticmethod + def process_pika_messages(queue: multiprocessing.Queue, channel: Channel, host: Text) -> None: + try: + while True: + (body, headers) = queue.get() + channel.basic_publish( + exchange=RABBITMQ_EXCHANGE, + routing_key="", + body=body.encode(DEFAULT_ENCODING), + properties=PikaEventBroker._get_message_properties(headers), + ) + + logger.debug( + f"Published Pika events to exchange '{RABBITMQ_EXCHANGE}' on host " + f"'{host}':\n{body}" + ) + except EOFError: + # Will most likely happen when shutting down Rasa X. + logger.debug( + "Pika message queue of worker was closed. Stopping to listen for more " + "messages on this worker." + ) + + def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> None: + if self._pika_connection.is_closed: + # Try to reset connection + self._run_pika() + self._process_queue.put((body, headers)) + elif not self.channel and self.should_keep_unpublished_messages: + logger.warning( + f"RabbitMQ channel has not been assigned. Adding message to " + f"list of unpublished messages and trying to publish them " + f"later. Current number of unpublished messages is " + f"{len(self._unpublished_messages)}." + ) + self._unpublished_messages.append(body) + else: + self._process_queue.put((body, headers)) def is_ready( self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 @@ -476,70 +566,6 @@ def publish( logger.error(f"Failed to publish Pika event on host '{self.host}':\n{body}") - def _get_message_properties( - self, headers: Optional[Dict[Text, Text]] = None - ) -> "BasicProperties": - """Create RabbitMQ message `BasicProperties`. - - The `app_id` property is set to the value of `self.rasa_environment` if - present, and the message delivery mode is set to 2 (persistent). In - addition, the `headers` property is set if supplied. - - Args: - headers: Message headers to add to the message properties of the - published message (key-value dictionary). The headers can be retrieved in - the consumer from the `headers` attribute of the message's - `BasicProperties`. - - Returns: - `pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable - as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the - properties' headers. - """ - from pika.spec import BasicProperties - - # make message persistent - kwargs = {"delivery_mode": 2} - - if self.rasa_environment: - kwargs["app_id"] = self.rasa_environment - - if headers: - kwargs["headers"] = headers - - return BasicProperties(**kwargs) - - def _basic_publish( - self, body: Text, headers: Optional[Dict[Text, Text]] = None - ) -> None: - self.channel.basic_publish( - exchange=RABBITMQ_EXCHANGE, - routing_key="", - body=body.encode(DEFAULT_ENCODING), - properties=self._get_message_properties(headers), - ) - - logger.debug( - f"Published Pika events to exchange '{RABBITMQ_EXCHANGE}' on host " - f"'{self.host}':\n{body}" - ) - - def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> None: - if self._pika_connection.is_closed: - # Try to reset connection - self._run_pika() - self._basic_publish(body, headers) - elif not self.channel and self.should_keep_unpublished_messages: - logger.warning( - f"RabbitMQ channel has not been assigned. Adding message to " - f"list of unpublished messages and trying to publish them " - f"later. Current number of unpublished messages is " - f"{len(self._unpublished_messages)}." - ) - self._unpublished_messages.append(body) - else: - self._basic_publish(body, headers) - def create_rabbitmq_ssl_options( rabbitmq_host: Optional[Text] = None, From 78158eeb776c12f6b9498db65a015c3c33372cc7 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 23 Sep 2020 15:35:17 +0200 Subject: [PATCH 02/30] Fix to multithreading --- rasa/core/brokers/pika.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 73d299fa3054..d69149da8b06 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -303,6 +303,7 @@ def __init__( self.port = port self.channel: Optional["Channel"] = None self.process: Optional[multiprocessing.Process] = None + self.process_queue: Optional[multiprocessing.Queue] = None self.queues = self._get_queues_from_args(queues) self.should_keep_unpublished_messages = should_keep_unpublished_messages self.raise_on_failure = raise_on_failure @@ -312,8 +313,8 @@ def __init__( self._run_pika() def __del__(self) -> None: - if self.process.is_alive(): - self.process.kill() + if self.process and self.process.is_alive(): + self.process.terminate() if self.channel: close_pika_channel(self.channel) @@ -387,7 +388,7 @@ def _run_pika(self) -> None: self._pika_connection = initialise_pika_select_connection( parameters, self._on_open_connection, self._on_open_connection_error ) - self._process_queue = self._get_mp_context().Queue() + self.process_queue = self._get_mp_context().Queue() self.process = self._start_pika_process() def _on_open_connection(self, connection: "SelectConnection") -> None: @@ -426,8 +427,8 @@ def _get_mp_context(self) -> multiprocessing.context.BaseContext: def _start_pika_process(self) -> multiprocessing.Process: process = multiprocessing.Process( - target=self.process_pika_messages, - args=(self._process_queue, self.channel, self.host), + target=self._process_pika_messages, + args=(self.process_queue, self.channel, self.host), daemon=True ) process.start() @@ -468,7 +469,10 @@ def _get_message_properties( return BasicProperties(**kwargs) @staticmethod - def process_pika_messages(queue: multiprocessing.Queue, channel: Channel, host: Text) -> None: + def _process_pika_messages(queue: multiprocessing.Queue, channel: Channel, host: Text) -> None: + # noinspection PyUnresolvedReferences + self._pika_connection.ioloop.start() + try: while True: (body, headers) = queue.get() @@ -478,7 +482,6 @@ def process_pika_messages(queue: multiprocessing.Queue, channel: Channel, host: body=body.encode(DEFAULT_ENCODING), properties=PikaEventBroker._get_message_properties(headers), ) - logger.debug( f"Published Pika events to exchange '{RABBITMQ_EXCHANGE}' on host " f"'{host}':\n{body}" @@ -494,7 +497,7 @@ def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> No if self._pika_connection.is_closed: # Try to reset connection self._run_pika() - self._process_queue.put((body, headers)) + self.process_queue.put((body, headers)) elif not self.channel and self.should_keep_unpublished_messages: logger.warning( f"RabbitMQ channel has not been assigned. Adding message to " @@ -504,7 +507,7 @@ def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> No ) self._unpublished_messages.append(body) else: - self._process_queue.put((body, headers)) + self.process_queue.put((body, headers)) def is_ready( self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 From 173bae928b61890ca66ed808180f137cd00da0b9 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 23 Sep 2020 17:46:15 +0200 Subject: [PATCH 03/30] PikaConnector --- rasa/core/brokers/pika.py | 337 +++++++++++++++++++++----------------- 1 file changed, 188 insertions(+), 149 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index d69149da8b06..81486f684bb9 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -251,78 +251,22 @@ def close_pika_connection(connection: "Connection") -> None: logger.exception("Failed to close Pika connection with host.") -class PikaEventBroker(EventBroker): - """Pika-based event broker for publishing messages to RabbitMQ.""" - - NUMBER_OF_MP_WORKERS = 1 - MP_CONTEXT = None - if sys.platform == "darwin" and sys.version_info < (3, 8): - # On macOS, Python 3.8 has switched the default start method to "spawn". To - # quote the documentation: "The fork start method should be considered - # unsafe as it can lead to crashes of the subprocess". Apply this fix when - # running on macOS on Python <= 3.7.x as well. - - # See: - # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - MP_CONTEXT = "spawn" - +class PikaConnector: def __init__( self, - host: Text, - username: Text, - password: Text, - port: Union[int, Text] = 5672, + parameters: "Parameters", + process_queue: multiprocessing.Queue, queues: Union[List[Text], Tuple[Text], Text, None] = None, - should_keep_unpublished_messages: bool = True, - raise_on_failure: bool = False, - log_level: Union[Text, int] = os.environ.get( - ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES - ), - **kwargs: Any, - ): - """Initialise RabbitMQ event broker. - - Args: - host: Pika host. - username: Username for authentication with Pika host. - password: Password for authentication with Pika host. - port: port of the Pika host. - queues: Pika queues to declare and publish to. - should_keep_unpublished_messages: Whether or not the event broker should - maintain a queue of unpublished messages to be published later in - case of errors. - raise_on_failure: Whether to raise an exception if publishing fails. If - `False`, keep retrying. - log_level: Logging level. - """ - logging.getLogger("pika").setLevel(log_level) - - self.host = host - self.username = username - self.password = password - self.port = port - self.channel: Optional["Channel"] = None - self.process: Optional[multiprocessing.Process] = None - self.process_queue: Optional[multiprocessing.Queue] = None + on_connected: Optional[Callable[[Channel], None]] = None, + ) -> None: + self.parameters: "Parameters" = parameters + self.pika_connection: Optional["SelectConnection"] = None self.queues = self._get_queues_from_args(queues) - self.should_keep_unpublished_messages = should_keep_unpublished_messages - self.raise_on_failure = raise_on_failure + self.process_queue = process_queue + self.on_connected = on_connected # List to store unpublished messages which hopefully will be published later - self._unpublished_messages: Deque[Text] = deque() - self._run_pika() - - def __del__(self) -> None: - if self.process and self.process.is_alive(): - self.process.terminate() - - if self.channel: - close_pika_channel(self.channel) - close_pika_connection(self.channel.connection) - - def close(self) -> None: - """Close the pika channel and connection.""" - self.__del__() + self.unpublished_messages: Deque[Text] = deque() @staticmethod def _get_queues_from_args( @@ -364,40 +308,56 @@ def _get_queues_from_args( return [DEFAULT_QUEUE_NAME] - @classmethod - def from_endpoint_config( - cls, broker_config: Optional["EndpointConfig"] - ) -> Optional["PikaEventBroker"]: - """Initialise `PikaEventBroker` from `EndpointConfig`. + @staticmethod + def _get_message_properties( + headers: Optional[Dict[Text, Text]] = None + ) -> "BasicProperties": + """Create RabbitMQ message `BasicProperties`. + + The `app_id` property is set to the value of `self.rasa_environment` if + present, and the message delivery mode is set to 2 (persistent). In + addition, the `headers` property is set if supplied. Args: - broker_config: `EndpointConfig` to read. + headers: Message headers to add to the message properties of the + published message (key-value dictionary). The headers can be retrieved in + the consumer from the `headers` attribute of the message's + `BasicProperties`. Returns: - `PikaEventBroker` if `broker_config` was supplied, else `None`. + `pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable + as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the + properties' headers. """ - if broker_config is None: - return None + from pika.spec import BasicProperties - return cls(broker_config.url, **broker_config.kwargs) + # make message persistent + kwargs = {"delivery_mode": 2} - def _run_pika(self) -> None: - parameters = _get_pika_parameters( - self.host, self.username, self.password, self.port - ) - self._pika_connection = initialise_pika_select_connection( - parameters, self._on_open_connection, self._on_open_connection_error + if os.environ.get("RASA_ENVIRONMENT"): + kwargs["app_id"] = os.environ.get("RASA_ENVIRONMENT") + + if headers: + kwargs["headers"] = headers + + return BasicProperties(**kwargs) + + @property + def is_connected(self) -> bool: + return self.pika_connection is None or not self.pika_connection.is_open + + def _connect(self): + self.pika_connection = initialise_pika_select_connection( + self.parameters, self._on_open_connection, self._on_open_connection_error ) - self.process_queue = self._get_mp_context().Queue() - self.process = self._start_pika_process() def _on_open_connection(self, connection: "SelectConnection") -> None: - logger.debug(f"RabbitMQ connection to '{self.host}' was established.") + logger.debug(f"RabbitMQ connection to '{self.parameters.host}' was established.") connection.channel(on_open_callback=self._on_channel_open) def _on_open_connection_error(self, _, error: Text) -> None: logger.warning( - f"Connecting to '{self.host}' failed with error '{error}'. Trying again." + f"Connecting to '{self.parameters.host}' failed with error '{error}'. Trying again." ) def _on_channel_open(self, channel: "Channel") -> None: @@ -411,80 +371,49 @@ def _on_channel_open(self, channel: "Channel") -> None: channel.queue_declare(queue=queue, durable=True) channel.queue_bind(exchange=RABBITMQ_EXCHANGE, queue=queue) - self.channel = channel + if self.on_connected: + self.on_connected(channel) - while self._unpublished_messages: + while self.unpublished_messages: # Send unpublished messages - message = self._unpublished_messages.popleft() - self._publish(message) + message = self.unpublished_messages.popleft() + self.publish(message) logger.debug( f"Published message from queue of unpublished messages. " - f"Remaining unpublished messages: {len(self._unpublished_messages)}." + f"Remaining unpublished messages: {len(self.unpublished_messages)}." ) - def _get_mp_context(self) -> multiprocessing.context.BaseContext: - return multiprocessing.get_context(self.MP_CONTEXT) + def publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None): + self.process_queue.put((body, headers)) - def _start_pika_process(self) -> multiprocessing.Process: - process = multiprocessing.Process( - target=self._process_pika_messages, - args=(self.process_queue, self.channel, self.host), - daemon=True + def append_unpublished_message(self, body: Text) -> None: + logger.warning( + f"RabbitMQ channel has not been assigned. Adding message to " + f"list of unpublished messages and trying to publish them " + f"later. Current number of unpublished messages is " + f"{len(self.unpublished_messages)}." ) - process.start() - return process + self.unpublished_messages.append(body) - @staticmethod - def _get_message_properties( - headers: Optional[Dict[Text, Text]] = None - ) -> "BasicProperties": - """Create RabbitMQ message `BasicProperties`. + def process_messages(self, channel: Channel) -> None: + if not self.is_connected: + self._connect() - The `app_id` property is set to the value of `self.rasa_environment` if - present, and the message delivery mode is set to 2 (persistent). In - addition, the `headers` property is set if supplied. - - Args: - headers: Message headers to add to the message properties of the - published message (key-value dictionary). The headers can be retrieved in - the consumer from the `headers` attribute of the message's - `BasicProperties`. - - Returns: - `pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable - as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the - properties' headers. - """ - from pika.spec import BasicProperties - - # make message persistent - kwargs = {"delivery_mode": 2} - - if os.environ.get("RASA_ENVIRONMENT"): - kwargs["app_id"] = os.environ.get("RASA_ENVIRONMENT") - - if headers: - kwargs["headers"] = headers - - return BasicProperties(**kwargs) - - @staticmethod - def _process_pika_messages(queue: multiprocessing.Queue, channel: Channel, host: Text) -> None: # noinspection PyUnresolvedReferences self._pika_connection.ioloop.start() try: while True: - (body, headers) = queue.get() + (body, headers) = self.process_queue.get() channel.basic_publish( exchange=RABBITMQ_EXCHANGE, routing_key="", body=body.encode(DEFAULT_ENCODING), - properties=PikaEventBroker._get_message_properties(headers), + properties=self._get_message_properties(headers), ) logger.debug( f"Published Pika events to exchange '{RABBITMQ_EXCHANGE}' on host " - f"'{host}':\n{body}" + f"'{self.parameters.host}':\n{body}" ) except EOFError: # Will most likely happen when shutting down Rasa X. @@ -493,21 +422,131 @@ def _process_pika_messages(queue: multiprocessing.Queue, channel: Channel, host: "messages on this worker." ) + +class PikaEventBroker(EventBroker): + """Pika-based event broker for publishing messages to RabbitMQ.""" + + NUMBER_OF_MP_WORKERS = 1 + MP_CONTEXT = None + if sys.platform == "darwin" and sys.version_info < (3, 8): + # On macOS, Python 3.8 has switched the default start method to "spawn". To + # quote the documentation: "The fork start method should be considered + # unsafe as it can lead to crashes of the subprocess". Apply this fix when + # running on macOS on Python <= 3.7.x as well. + + # See: + # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods + MP_CONTEXT = "spawn" + + def __init__( + self, + host: Text, + username: Text, + password: Text, + port: Union[int, Text] = 5672, + queues: Union[List[Text], Tuple[Text], Text, None] = None, + should_keep_unpublished_messages: bool = True, + raise_on_failure: bool = False, + log_level: Union[Text, int] = os.environ.get( + ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES + ), + **kwargs: Any, + ): + """Initialise RabbitMQ event broker. + + Args: + host: Pika host. + username: Username for authentication with Pika host. + password: Password for authentication with Pika host. + port: port of the Pika host. + queues: Pika queues to declare and publish to. + should_keep_unpublished_messages: Whether or not the event broker should + maintain a queue of unpublished messages to be published later in + case of errors. + raise_on_failure: Whether to raise an exception if publishing fails. If + `False`, keep retrying. + log_level: Logging level. + """ + logging.getLogger("pika").setLevel(log_level) + + self.host = host + self.username = username + self.password = password + self.port = port + self.queues = queues + self.channel: Optional["Channel"] = None + self.process: Optional[multiprocessing.Process] = None + self.should_keep_unpublished_messages = should_keep_unpublished_messages + self.raise_on_failure = raise_on_failure + + self._connect() + + def __del__(self) -> None: + if self.process and self.process.is_alive(): + self.process.terminate() + + if self.channel: + close_pika_channel(self.channel) + close_pika_connection(self.channel.connection) + + def close(self) -> None: + """Close the pika channel and connection.""" + self.__del__() + + @classmethod + def from_endpoint_config( + cls, broker_config: Optional["EndpointConfig"] + ) -> Optional["PikaEventBroker"]: + """Initialise `PikaEventBroker` from `EndpointConfig`. + + Args: + broker_config: `EndpointConfig` to read. + + Returns: + `PikaEventBroker` if `broker_config` was supplied, else `None`. + """ + if broker_config is None: + return None + + return cls(broker_config.url, **broker_config.kwargs) + + def _connect(self) -> None: + parameters = _get_pika_parameters( + self.host, self.username, self.password, self.port + ) + + def on_connected(channel: Channel): + self.channel = channel + + self.pika_connector = PikaConnector( + parameters, + process_queue=self._get_mp_context().Queue(), + queues=self.queues, + on_connected=on_connected, + ) + self.process = self._start_pika_process() + + def _get_mp_context(self) -> multiprocessing.context.BaseContext: + return multiprocessing.get_context(self.MP_CONTEXT) + + def _start_pika_process(self) -> multiprocessing.Process: + process = multiprocessing.Process( + target=self.pika_connector.process_messages, + args=(self.channel,), + daemon=True + ) + process.start() + return process + def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> None: - if self._pika_connection.is_closed: + if not self.pika_connector or not self.pika_connector.is_connected: # Try to reset connection - self._run_pika() - self.process_queue.put((body, headers)) + self._connect() + self.pika_connector.publish(body, headers) elif not self.channel and self.should_keep_unpublished_messages: - logger.warning( - f"RabbitMQ channel has not been assigned. Adding message to " - f"list of unpublished messages and trying to publish them " - f"later. Current number of unpublished messages is " - f"{len(self._unpublished_messages)}." - ) - self._unpublished_messages.append(body) + self.pika_connector.append_unpublished_message(body) else: - self.process_queue.put((body, headers)) + self.pika_connector.publish(body, headers) def is_ready( self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 From 22f79947eb5b287fad46c6a2152b56f585920752 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 23 Sep 2020 17:54:17 +0200 Subject: [PATCH 04/30] Code fixes --- rasa/core/brokers/pika.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 81486f684bb9..631c35f53a7b 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -268,6 +268,14 @@ def __init__( # List to store unpublished messages which hopefully will be published later self.unpublished_messages: Deque[Text] = deque() + def __del__(self) -> None: + if self.is_connected: + self.pika_connection.close() + + def close(self) -> None: + """Close the pika connector.""" + self.__del__() + @staticmethod def _get_queues_from_args( queues_arg: Union[List[Text], Tuple[Text], Text, None] @@ -344,7 +352,7 @@ def _get_message_properties( @property def is_connected(self) -> bool: - return self.pika_connection is None or not self.pika_connection.is_open + return self.pika_connection and self.pika_connection.is_open def _connect(self): self.pika_connection = initialise_pika_select_connection( @@ -395,13 +403,15 @@ def append_unpublished_message(self, body: Text) -> None: ) self.unpublished_messages.append(body) - def process_messages(self, channel: Channel) -> None: + def process_messages(self, channel: Optional[Channel]) -> None: if not self.is_connected: self._connect() # noinspection PyUnresolvedReferences self._pika_connection.ioloop.start() + # TODO(alwx): channel might still not be initialized + try: while True: (body, headers) = self.process_queue.get() @@ -485,6 +495,9 @@ def __del__(self) -> None: if self.process and self.process.is_alive(): self.process.terminate() + if self.pika_connector: + self.pika_connector.close() + if self.channel: close_pika_channel(self.channel) close_pika_connection(self.channel.connection) From b1b39c5f57134264aef4b3fd96dd3ecb82f652d2 Mon Sep 17 00:00:00 2001 From: alwx Date: Thu, 24 Sep 2020 13:25:42 +0200 Subject: [PATCH 05/30] Updates for pika broker --- rasa/core/brokers/pika.py | 197 +++++++++++++++++++++----------------- tests/core/test_broker.py | 37 ++++--- 2 files changed, 131 insertions(+), 103 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 631c35f53a7b..6fc43921daeb 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -251,29 +251,43 @@ def close_pika_connection(connection: "Connection") -> None: logger.exception("Failed to close Pika connection with host.") -class PikaConnector: +MessageHeaders = Optional[Dict[Text, Text]] +Message = Tuple[Text, MessageHeaders] + + +class PikaMessageProcessor: + """A class that holds all the Pika connection details and processes Pika messages. + All the messages should be published to a `process_queue` provided.""" + def __init__( self, parameters: "Parameters", - process_queue: multiprocessing.Queue, - queues: Union[List[Text], Tuple[Text], Text, None] = None, - on_connected: Optional[Callable[[Channel], None]] = None, + get_message: Callable[[], Message], + queues: Union[List[Text], Tuple[Text], Text, None], ) -> None: + """Initialise Pika connector. + + Args: + parameters: Pika connection parameters + queues: Pika queues to declare and publish to + """ self.parameters: "Parameters" = parameters - self.pika_connection: Optional["SelectConnection"] = None - self.queues = self._get_queues_from_args(queues) - self.process_queue = process_queue - self.on_connected = on_connected + self.queues: Union[List[Text], Tuple[Text]] = self._get_queues_from_args(queues) + self.get_message: Callable[[], Message] = get_message - # List to store unpublished messages which hopefully will be published later - self.unpublished_messages: Deque[Text] = deque() + self.pika_connection: Optional["SelectConnection"] = None + self.channel: Optional["Channel"] = None def __del__(self) -> None: + if self.channel: + close_pika_channel(self.channel) + close_pika_connection(self.channel.connection) + if self.is_connected: self.pika_connection.close() def close(self) -> None: - """Close the pika connector.""" + """Close the Pika connector.""" self.__del__() @staticmethod @@ -317,14 +331,12 @@ def _get_queues_from_args( return [DEFAULT_QUEUE_NAME] @staticmethod - def _get_message_properties( - headers: Optional[Dict[Text, Text]] = None - ) -> "BasicProperties": + def _get_message_properties(headers: MessageHeaders = None) -> "BasicProperties": """Create RabbitMQ message `BasicProperties`. - The `app_id` property is set to the value of `self.rasa_environment` if - present, and the message delivery mode is set to 2 (persistent). In - addition, the `headers` property is set if supplied. + The `app_id` property is set to the value of `RASA_ENVIRONMENT` env variable + if present, and the message delivery mode is set to 2 (persistent). + In addition, the `headers` property is set if supplied. Args: headers: Message headers to add to the message properties of the @@ -342,8 +354,9 @@ def _get_message_properties( # make message persistent kwargs = {"delivery_mode": 2} - if os.environ.get("RASA_ENVIRONMENT"): - kwargs["app_id"] = os.environ.get("RASA_ENVIRONMENT") + env = os.environ.get("RASA_ENVIRONMENT") + if env: + kwargs["app_id"] = env if headers: kwargs["headers"] = headers @@ -352,15 +365,47 @@ def _get_message_properties( @property def is_connected(self) -> bool: - return self.pika_connection and self.pika_connection.is_open + """Indicates if Pika is connected and the channel is initialized. + + Returns: + A boolean value indicating if the connection is established + """ + return self.pika_connection and self.pika_connection.is_open and self.channel + + def is_ready( + self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 + ) -> bool: + """Spin until the connector is ready to process messages. + + It typically takes 50 ms or so for the pika channel to open. We'll wait up + to 10 seconds just in case. + + Args: + attempts: Number of retries. + wait_time_between_attempts_in_seconds: Wait time between retries. + + Returns: + `True` if the channel is available, `False` otherwise. + """ + while attempts: + if self.is_connected: + return True + time.sleep(wait_time_between_attempts_in_seconds) + attempts -= 1 + + return False def _connect(self): self.pika_connection = initialise_pika_select_connection( self.parameters, self._on_open_connection, self._on_open_connection_error ) + # noinspection PyUnresolvedReferences + self.pika_connection.ioloop.start() def _on_open_connection(self, connection: "SelectConnection") -> None: - logger.debug(f"RabbitMQ connection to '{self.parameters.host}' was established.") + logger.debug( + f"RabbitMQ connection to '{self.parameters.host}' was established." + ) connection.channel(on_open_callback=self._on_channel_open) def _on_open_connection_error(self, _, error: Text) -> None: @@ -379,51 +424,39 @@ def _on_channel_open(self, channel: "Channel") -> None: channel.queue_declare(queue=queue, durable=True) channel.queue_bind(exchange=RABBITMQ_EXCHANGE, queue=queue) - if self.on_connected: - self.on_connected(channel) + self.channel = channel - while self.unpublished_messages: - # Send unpublished messages - message = self.unpublished_messages.popleft() - self.publish(message) - logger.debug( - f"Published message from queue of unpublished messages. " - f"Remaining unpublished messages: {len(self.unpublished_messages)}." - ) + def _publish(self, message: Message) -> None: + (body, headers) = message - def publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None): - self.process_queue.put((body, headers)) - - def append_unpublished_message(self, body: Text) -> None: - logger.warning( - f"RabbitMQ channel has not been assigned. Adding message to " - f"list of unpublished messages and trying to publish them " - f"later. Current number of unpublished messages is " - f"{len(self.unpublished_messages)}." + self.channel.basic_publish( + exchange=RABBITMQ_EXCHANGE, + routing_key="", + body=body.encode(DEFAULT_ENCODING), + properties=self._get_message_properties(headers), ) - self.unpublished_messages.append(body) - def process_messages(self, channel: Optional[Channel]) -> None: + def process_messages(self) -> None: + """Start to process messages. This process is indefinite thus it should + be started in a separate thread or process. + + This method also automatically establishes a connection and wait for + the channel to get ready to process messages. + """ if not self.is_connected: self._connect() - # noinspection PyUnresolvedReferences - self._pika_connection.ioloop.start() - - # TODO(alwx): channel might still not be initialized + if not self.is_ready(): + logger.warning("RabbitMQ channel cannot be opened.") + return try: while True: - (body, headers) = self.process_queue.get() - channel.basic_publish( - exchange=RABBITMQ_EXCHANGE, - routing_key="", - body=body.encode(DEFAULT_ENCODING), - properties=self._get_message_properties(headers), - ) + message = self.get_message() + self._publish(message) logger.debug( f"Published Pika events to exchange '{RABBITMQ_EXCHANGE}' on host " - f"'{self.parameters.host}':\n{body}" + f"'{self.parameters.host}':\n{message[0]}" ) except EOFError: # Will most likely happen when shutting down Rasa X. @@ -484,24 +517,20 @@ def __init__( self.password = password self.port = port self.queues = queues - self.channel: Optional["Channel"] = None self.process: Optional[multiprocessing.Process] = None self.should_keep_unpublished_messages = should_keep_unpublished_messages self.raise_on_failure = raise_on_failure + self.pika_message_processor: Optional[PikaMessageProcessor] = None self._connect() def __del__(self) -> None: + if self.pika_message_processor: + self.pika_message_processor.close() + if self.process and self.process.is_alive(): self.process.terminate() - if self.pika_connector: - self.pika_connector.close() - - if self.channel: - close_pika_channel(self.channel) - close_pika_connection(self.channel.connection) - def close(self) -> None: """Close the pika channel and connection.""" self.__del__() @@ -528,15 +557,13 @@ def _connect(self) -> None: self.host, self.username, self.password, self.port ) - def on_connected(channel: Channel): - self.channel = channel - - self.pika_connector = PikaConnector( + self.process_queue = self._get_mp_context().Queue() + self.pika_message_processor = PikaMessageProcessor( parameters, - process_queue=self._get_mp_context().Queue(), queues=self.queues, - on_connected=on_connected, + get_message=lambda: self.process_queue.get(), ) + self.process = self._start_pika_process() def _get_mp_context(self) -> multiprocessing.context.BaseContext: @@ -544,27 +571,25 @@ def _get_mp_context(self) -> multiprocessing.context.BaseContext: def _start_pika_process(self) -> multiprocessing.Process: process = multiprocessing.Process( - target=self.pika_connector.process_messages, - args=(self.channel,), - daemon=True + target=self.pika_message_processor.process_messages, args=(), daemon=True ) process.start() return process - def _publish(self, body: Text, headers: Optional[Dict[Text, Text]] = None) -> None: - if not self.pika_connector or not self.pika_connector.is_connected: - # Try to reset connection + def _publish(self, body: Text, headers: MessageHeaders = None) -> None: + if not self.pika_message_processor: self._connect() - self.pika_connector.publish(body, headers) - elif not self.channel and self.should_keep_unpublished_messages: - self.pika_connector.append_unpublished_message(body) - else: - self.pika_connector.publish(body, headers) + + if ( + self.pika_message_processor.is_connected + or self.should_keep_unpublished_messages + ): + self.process_queue.put((body, headers)) def is_ready( self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 ) -> bool: - """Spin until the pika channel is open. + """Spin until Pika is ready to process messages. It typically takes 50 ms or so for the pika channel to open. We'll wait up to 10 seconds just in case. @@ -576,13 +601,9 @@ def is_ready( Returns: `True` if the channel is available, `False` otherwise. """ - while attempts: - if self.channel: - return True - time.sleep(wait_time_between_attempts_in_seconds) - attempts -= 1 - - return False + return self.pika_message_processor and self.pika_message_processor.is_ready( + attempts, wait_time_between_attempts_in_seconds + ) def publish( self, @@ -612,7 +633,7 @@ def publish( f"Could not open Pika channel at host '{self.host}'. " f"Failed with error: {e}" ) - self.channel = None + self.pika_message_processor = None if self.raise_on_failure: raise e diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index 20f3a7bac93b..1e553fda99c5 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -2,12 +2,9 @@ import logging from pathlib import Path import textwrap - from typing import Union, Text, List, Optional, Type - import pytest from _pytest.logging import LogCaptureFixture - from _pytest.monkeypatch import MonkeyPatch import rasa.shared.utils.io @@ -15,18 +12,28 @@ from rasa.core.brokers.broker import EventBroker from rasa.core.brokers.file import FileEventBroker from rasa.core.brokers.kafka import KafkaEventBroker -from rasa.core.brokers.pika import PikaEventBroker, DEFAULT_QUEUE_NAME +from rasa.core.brokers.pika import ( + PikaEventBroker, + PikaMessageProcessor, + DEFAULT_QUEUE_NAME, +) from rasa.core.brokers.sql import SQLEventBroker from rasa.shared.core.events import Event, Restarted, SlotSet, UserUttered from rasa.utils.endpoints import EndpointConfig, read_endpoint_config from tests.core.conftest import DEFAULT_ENDPOINTS_FILE +import pika.connection + TEST_EVENTS = [ UserUttered("/greet", {"name": "greet", "confidence": 1.0}, []), SlotSet("name", "rasa"), Restarted(), ] +TEST_CONNECTION_PARAMETERS = pika.connection.ConnectionParameters( + "amqp://username:password@host:port" +) + def test_pika_broker_from_config(): cfg = read_endpoint_config( @@ -42,18 +49,18 @@ def test_pika_broker_from_config(): # noinspection PyProtectedMember def test_pika_message_property_app_id(monkeypatch: MonkeyPatch): - # patch PikaEventBroker so it doesn't try to connect to RabbitMQ on init - monkeypatch.setattr(PikaEventBroker, "_run_pika", lambda _: None) - pika_producer = PikaEventBroker("", "", "") + pika_processor = PikaMessageProcessor( + TEST_CONNECTION_PARAMETERS, queues=None, get_message=lambda: ("", None) + ) # unset RASA_ENVIRONMENT env var results in empty App ID monkeypatch.delenv("RASA_ENVIRONMENT", raising=False) - assert not pika_producer._get_message_properties().app_id + assert not pika_processor._get_message_properties().app_id # setting it to some value results in that value as the App ID rasa_environment = "some-test-environment" monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment) - assert pika_producer._get_message_properties().app_id == rasa_environment + assert pika_processor._get_message_properties().app_id == rasa_environment @pytest.mark.parametrize( @@ -71,15 +78,15 @@ def test_pika_queues_from_args( queues_arg: Union[Text, List[Text], None], expected: List[Text], warning: Optional[Type[Warning]], - monkeypatch: MonkeyPatch, ): - # patch PikaEventBroker so it doesn't try to connect to RabbitMQ on init - monkeypatch.setattr(PikaEventBroker, "_run_pika", lambda _: None) - with pytest.warns(warning): - pika_producer = PikaEventBroker("", "", "", queues=queues_arg) + pika_processor = PikaMessageProcessor( + TEST_CONNECTION_PARAMETERS, + queues=queues_arg, + get_message=lambda: ("", None), + ) - assert pika_producer.queues == expected + assert pika_processor.queues == expected def test_no_broker_in_config(): From 4090c27c574ec3e82bd00338b5365d9121e543d1 Mon Sep 17 00:00:00 2001 From: alwx Date: Thu, 24 Sep 2020 13:30:22 +0200 Subject: [PATCH 06/30] Changelog entry --- changelog/6760.bugfix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog/6760.bugfix.md diff --git a/changelog/6760.bugfix.md b/changelog/6760.bugfix.md new file mode 100644 index 000000000000..08faa2c48cd9 --- /dev/null +++ b/changelog/6760.bugfix.md @@ -0,0 +1,3 @@ +Update Pika event broker to be a separate process and make it use +`multiprocessing.Queue` to send and process messages. This change should help +to avoid situations when events stop being send after a while. \ No newline at end of file From 526c91bfd03e0a6cee5ad0cda86226cfc0124117 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 30 Sep 2020 17:05:25 +0200 Subject: [PATCH 07/30] Changelog entry update --- changelog/6760.bugfix.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/changelog/6760.bugfix.md b/changelog/6760.bugfix.md index 08faa2c48cd9..8b44f62b6c45 100644 --- a/changelog/6760.bugfix.md +++ b/changelog/6760.bugfix.md @@ -1,3 +1,3 @@ -Update Pika event broker to be a separate process and make it use +Update Pika event broker to be a separate process and make it use a `multiprocessing.Queue` to send and process messages. This change should help -to avoid situations when events stop being send after a while. \ No newline at end of file +avoid situations when events stop being sent after a while. \ No newline at end of file From bed55f56f0a517a884bac3d29987a594ce0307b0 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 30 Sep 2020 17:08:09 +0200 Subject: [PATCH 08/30] Pika import fixes --- rasa/core/brokers/pika.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 6fc43921daeb..cedbdf2b976a 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -3,13 +3,10 @@ import os import sys import time -import typing import multiprocessing -from collections import deque from contextlib import contextmanager from typing import ( Callable, - Deque, Dict, Optional, Text, @@ -18,6 +15,7 @@ List, Tuple, Generator, + TYPE_CHECKING ) from pika.channel import Channel @@ -29,7 +27,7 @@ from rasa.utils.endpoints import EndpointConfig from rasa.shared.utils.io import DEFAULT_ENCODING -if typing.TYPE_CHECKING: +if TYPE_CHECKING: from pika.adapters.blocking_connection import BlockingChannel from pika import SelectConnection, BlockingConnection, BasicProperties import pika @@ -272,7 +270,7 @@ def __init__( queues: Pika queues to declare and publish to """ self.parameters: "Parameters" = parameters - self.queues: Union[List[Text], Tuple[Text]] = self._get_queues_from_args(queues) + self.queues: List[Text] = self._get_queues_from_args(queues) self.get_message: Callable[[], Message] = get_message self.pika_connection: Optional["SelectConnection"] = None @@ -293,7 +291,7 @@ def close(self) -> None: @staticmethod def _get_queues_from_args( queues_arg: Union[List[Text], Tuple[Text], Text, None] - ) -> Union[List[Text], Tuple[Text]]: + ) -> List[Text]: """Get queues for this event broker. The preferred argument defining the RabbitMQ queues the `PikaEventBroker` should @@ -311,7 +309,7 @@ def _get_queues_from_args( `ValueError` if no valid `queues` argument was found. """ if queues_arg and isinstance(queues_arg, (list, tuple)): - return queues_arg + return list(queues_arg) if queues_arg and isinstance(queues_arg, str): logger.debug( From 1ab4126305d9e993c498452c17887712cc481340 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 30 Sep 2020 17:13:56 +0200 Subject: [PATCH 09/30] Code style updates --- rasa/core/brokers/pika.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index cedbdf2b976a..b52d8ca8e89c 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -18,8 +18,6 @@ TYPE_CHECKING ) -from pika.channel import Channel - from rasa.constants import DEFAULT_LOG_LEVEL_LIBRARIES, ENV_LOG_LEVEL_LIBRARIES from rasa.shared.constants import DOCS_URL_PIKA_EVENT_BROKER from rasa.core.brokers.broker import EventBroker @@ -28,9 +26,10 @@ from rasa.shared.utils.io import DEFAULT_ENCODING if TYPE_CHECKING: + import pika from pika.adapters.blocking_connection import BlockingChannel from pika import SelectConnection, BlockingConnection, BasicProperties - import pika + from pika.channel import Channel from pika.connection import Parameters, Connection logger = logging.getLogger(__name__) @@ -254,8 +253,7 @@ def close_pika_connection(connection: "Connection") -> None: class PikaMessageProcessor: - """A class that holds all the Pika connection details and processes Pika messages. - All the messages should be published to a `process_queue` provided.""" + """A class that holds all the Pika connection details and processes Pika messages.""" def __init__( self, @@ -285,7 +283,7 @@ def __del__(self) -> None: self.pika_connection.close() def close(self) -> None: - """Close the Pika connector.""" + """Close the Pika connection.""" self.__del__() @staticmethod @@ -338,13 +336,13 @@ def _get_message_properties(headers: MessageHeaders = None) -> "BasicProperties" Args: headers: Message headers to add to the message properties of the - published message (key-value dictionary). The headers can be retrieved in - the consumer from the `headers` attribute of the message's - `BasicProperties`. + published message (key-value dictionary). The headers can be retrieved + in the consumer from the `headers` attribute of the message's + `BasicProperties`. Returns: `pika.spec.BasicProperties` with the `RASA_ENVIRONMENT` environment variable - as the properties' `app_id` value, `delivery_mode`=2 and `headers` as the + as the properties' `app_id` value, `delivery_mode=2` and `headers` as the properties' headers. """ from pika.spec import BasicProperties @@ -366,7 +364,7 @@ def is_connected(self) -> bool: """Indicates if Pika is connected and the channel is initialized. Returns: - A boolean value indicating if the connection is established + A boolean value indicating if the connection is established. """ return self.pika_connection and self.pika_connection.is_open and self.channel @@ -425,7 +423,7 @@ def _on_channel_open(self, channel: "Channel") -> None: self.channel = channel def _publish(self, message: Message) -> None: - (body, headers) = message + body, headers = message self.channel.basic_publish( exchange=RABBITMQ_EXCHANGE, @@ -436,7 +434,7 @@ def _publish(self, message: Message) -> None: def process_messages(self) -> None: """Start to process messages. This process is indefinite thus it should - be started in a separate thread or process. + be started in a separate process. This method also automatically establishes a connection and wait for the channel to get ready to process messages. @@ -469,6 +467,7 @@ class PikaEventBroker(EventBroker): NUMBER_OF_MP_WORKERS = 1 MP_CONTEXT = None + if sys.platform == "darwin" and sys.version_info < (3, 8): # On macOS, Python 3.8 has switched the default start method to "spawn". To # quote the documentation: "The fork start method should be considered @@ -492,7 +491,7 @@ def __init__( ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES ), **kwargs: Any, - ): + ) -> None: """Initialise RabbitMQ event broker. Args: @@ -530,7 +529,7 @@ def __del__(self) -> None: self.process.terminate() def close(self) -> None: - """Close the pika channel and connection.""" + """Close the Pika connector.""" self.__del__() @classmethod From 21ee0827f87ea68c3e2a9305bbe0560dea93927f Mon Sep 17 00:00:00 2001 From: alwx Date: Thu, 8 Oct 2020 12:02:13 +0200 Subject: [PATCH 10/30] Black reformatting --- rasa/core/brokers/pika.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index b52d8ca8e89c..d55d07d04f3b 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -15,7 +15,7 @@ List, Tuple, Generator, - TYPE_CHECKING + TYPE_CHECKING, ) from rasa.constants import DEFAULT_LOG_LEVEL_LIBRARIES, ENV_LOG_LEVEL_LIBRARIES From f8e6de8ffb618416902c32d1f443f67c6bac5916 Mon Sep 17 00:00:00 2001 From: alwx Date: Thu, 8 Oct 2020 16:27:29 +0200 Subject: [PATCH 11/30] Fix --- rasa/core/brokers/pika.py | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index d55d07d04f3b..51b25c3fa89e 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -391,7 +391,8 @@ def is_ready( return False - def _connect(self): + def connect(self): + """Establish a connection to Pika.""" self.pika_connection = initialise_pika_select_connection( self.parameters, self._on_open_connection, self._on_open_connection_error ) @@ -435,13 +436,7 @@ def _publish(self, message: Message) -> None: def process_messages(self) -> None: """Start to process messages. This process is indefinite thus it should be started in a separate process. - - This method also automatically establishes a connection and wait for - the channel to get ready to process messages. """ - if not self.is_connected: - self._connect() - if not self.is_ready(): logger.warning("RabbitMQ channel cannot be opened.") return @@ -567,6 +562,9 @@ def _get_mp_context(self) -> multiprocessing.context.BaseContext: return multiprocessing.get_context(self.MP_CONTEXT) def _start_pika_process(self) -> multiprocessing.Process: + if not self.pika_message_processor.is_connected: + self.pika_message_processor.connect() + process = multiprocessing.Process( target=self.pika_message_processor.process_messages, args=(), daemon=True ) From f2c3f62fad8d2fdb986e78653d37974acfd6ee57 Mon Sep 17 00:00:00 2001 From: melindaloubser1 Date: Fri, 9 Oct 2020 18:33:34 +0200 Subject: [PATCH 12/30] monkeypatch pika test --- tests/core/test_broker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index 92f8823e8298..fa55226c66ab 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -43,6 +43,9 @@ def test_pika_broker_from_config(): ) actual = EventBroker.create(cfg) + # patch PikaEventBroker so it doesn't try to connect to RabbitMQ on init + monkeypatch.setattr(PikaEventBroker, "_connect", lambda _: None) + assert isinstance(actual, PikaEventBroker) assert actual.host == "localhost" assert actual.username == "username" From d3e6c136a037dbdf9fc4e209561cb856814a3451 Mon Sep 17 00:00:00 2001 From: alwx Date: Tue, 13 Oct 2020 10:14:58 +0200 Subject: [PATCH 13/30] Fix for running ioloop in a separate process --- rasa/core/brokers/pika.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 51b25c3fa89e..e7a06a5845db 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -391,13 +391,18 @@ def is_ready( return False + def _run_pika_io_loop(self): + # noinspection PyUnresolvedReferences + self.pika_connection.ioloop.start() + def connect(self): """Establish a connection to Pika.""" self.pika_connection = initialise_pika_select_connection( self.parameters, self._on_open_connection, self._on_open_connection_error ) - # noinspection PyUnresolvedReferences - self.pika_connection.ioloop.start() + + process = multiprocessing.Process(target=self._run_pika_io_loop, args=(), daemon=True) + process.start() def _on_open_connection(self, connection: "SelectConnection") -> None: logger.debug( From cdb140b96832497bcf526614bbadcd1988fb2182 Mon Sep 17 00:00:00 2001 From: alwx Date: Tue, 13 Oct 2020 12:23:41 +0200 Subject: [PATCH 14/30] Fix download button in docs --- docs/themes/theme-custom/theme/Prototyper/download-button.jsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/themes/theme-custom/theme/Prototyper/download-button.jsx b/docs/themes/theme-custom/theme/Prototyper/download-button.jsx index 236590da1311..5f9e5bfafae2 100644 --- a/docs/themes/theme-custom/theme/Prototyper/download-button.jsx +++ b/docs/themes/theme-custom/theme/Prototyper/download-button.jsx @@ -9,7 +9,7 @@ const DownloadButton = (props) => { return ( Download project From 25aa275e00c625ac4cf2add29f698e825b79900c Mon Sep 17 00:00:00 2001 From: alwx Date: Tue, 13 Oct 2020 12:44:51 +0200 Subject: [PATCH 15/30] Update for `connect` function --- rasa/core/brokers/pika.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index e7a06a5845db..1abbde42e674 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -366,7 +366,7 @@ def is_connected(self) -> bool: Returns: A boolean value indicating if the connection is established. """ - return self.pika_connection and self.pika_connection.is_open and self.channel + return self.pika_connection and self.channel def is_ready( self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 From 9d36d5d4ed296da226479b6aed7fa366e4d48473 Mon Sep 17 00:00:00 2001 From: alwx Date: Tue, 13 Oct 2020 12:47:14 +0200 Subject: [PATCH 16/30] Changelog entry --- changelog/7001.bugfix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/7001.bugfix.md diff --git a/changelog/7001.bugfix.md b/changelog/7001.bugfix.md new file mode 100644 index 000000000000..9994c3276af4 --- /dev/null +++ b/changelog/7001.bugfix.md @@ -0,0 +1 @@ +Update Rasa Playground "Download" button to work correctly depending on the current chat state. \ No newline at end of file From 3efe6f9d0bd33201f2c76ce0616b5c1d397e13d5 Mon Sep 17 00:00:00 2001 From: alwx Date: Tue, 13 Oct 2020 17:09:25 +0200 Subject: [PATCH 17/30] Bugfix for .ready() function --- rasa/core/brokers/pika.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 1abbde42e674..eb5df87b1799 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -566,15 +566,19 @@ def _connect(self) -> None: def _get_mp_context(self) -> multiprocessing.context.BaseContext: return multiprocessing.get_context(self.MP_CONTEXT) - def _start_pika_process(self) -> multiprocessing.Process: + def _start_pika_process(self) -> Optional[multiprocessing.Process]: if not self.pika_message_processor.is_connected: self.pika_message_processor.connect() - process = multiprocessing.Process( - target=self.pika_message_processor.process_messages, args=(), daemon=True - ) - process.start() - return process + if self.pika_message_processor.is_ready(): + process = multiprocessing.Process( + target=self.pika_message_processor.process_messages, args=(), daemon=True + ) + process.start() + return process + + logger.warning("RabbitMQ channel cannot be opened.") + return None def _publish(self, body: Text, headers: MessageHeaders = None) -> None: if not self.pika_message_processor: From f92bf225c6c6acd4014d2234e74db9abae2b589c Mon Sep 17 00:00:00 2001 From: alwx Date: Tue, 13 Oct 2020 17:13:09 +0200 Subject: [PATCH 18/30] Removed unnecessary check --- rasa/core/brokers/pika.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index eb5df87b1799..1cc67664098f 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -442,10 +442,6 @@ def process_messages(self) -> None: """Start to process messages. This process is indefinite thus it should be started in a separate process. """ - if not self.is_ready(): - logger.warning("RabbitMQ channel cannot be opened.") - return - try: while True: message = self.get_message() From 06bc16d9fcd40a0eda7ac45affbc933e030cd261 Mon Sep 17 00:00:00 2001 From: Greg Stephens Date: Tue, 13 Oct 2020 19:47:22 -0700 Subject: [PATCH 19/30] Correct missing mapping policy message --- rasa/core/policies/mapping_policy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rasa/core/policies/mapping_policy.py b/rasa/core/policies/mapping_policy.py index ac3fa5686a99..8faa7efdac07 100644 --- a/rasa/core/policies/mapping_policy.py +++ b/rasa/core/policies/mapping_policy.py @@ -75,7 +75,7 @@ def validate_against_domain( "You have defined triggers in your domain, but haven't " "added the MappingPolicy to your policy ensemble. " "Either remove the triggers from your domain or " - "exclude the MappingPolicy from your policy configuration." + "include the MappingPolicy from your policy configuration." ) def train( From 6d67a483d5548cf7bf3d437661200610cac313d1 Mon Sep 17 00:00:00 2001 From: Akela Drissner-Schmid <32450038+akelad@users.noreply.github.com> Date: Wed, 14 Oct 2020 10:38:08 +0200 Subject: [PATCH 20/30] Update rasa/core/policies/mapping_policy.py --- rasa/core/policies/mapping_policy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rasa/core/policies/mapping_policy.py b/rasa/core/policies/mapping_policy.py index 8faa7efdac07..bca8cb9b9d2c 100644 --- a/rasa/core/policies/mapping_policy.py +++ b/rasa/core/policies/mapping_policy.py @@ -75,7 +75,7 @@ def validate_against_domain( "You have defined triggers in your domain, but haven't " "added the MappingPolicy to your policy ensemble. " "Either remove the triggers from your domain or " - "include the MappingPolicy from your policy configuration." + "include the MappingPolicy in your policy configuration." ) def train( From f1beb7d8aa5819fd678053074a3df95aa426387d Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 14 Oct 2020 13:56:40 +0200 Subject: [PATCH 21/30] Better PikaMessageProcessor, with more callbacks and updated functions --- rasa/core/brokers/pika.py | 98 ++++++++++++++++++++++++++------------- 1 file changed, 65 insertions(+), 33 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 1cc67664098f..28adabe78243 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -17,6 +17,7 @@ Generator, TYPE_CHECKING, ) +from threading import Thread from rasa.constants import DEFAULT_LOG_LEVEL_LIBRARIES, ENV_LOG_LEVEL_LIBRARIES from rasa.shared.constants import DOCS_URL_PIKA_EVENT_BROKER @@ -271,16 +272,18 @@ def __init__( self.queues: List[Text] = self._get_queues_from_args(queues) self.get_message: Callable[[], Message] = get_message - self.pika_connection: Optional["SelectConnection"] = None - self.channel: Optional["Channel"] = None + self._connection: Optional["SelectConnection"] = None + self._channel: Optional["Channel"] = None + self._closing = False def __del__(self) -> None: - if self.channel: - close_pika_channel(self.channel) - close_pika_connection(self.channel.connection) + if self._channel: + logger.warning("Closing connection...") + close_pika_channel(self._channel) + close_pika_connection(self._channel.connection) if self.is_connected: - self.pika_connection.close() + self._connection.close() def close(self) -> None: """Close the Pika connection.""" @@ -366,7 +369,7 @@ def is_connected(self) -> bool: Returns: A boolean value indicating if the connection is established. """ - return self.pika_connection and self.channel + return self._connection and self._channel def is_ready( self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01 @@ -391,23 +394,17 @@ def is_ready( return False - def _run_pika_io_loop(self): - # noinspection PyUnresolvedReferences - self.pika_connection.ioloop.start() - - def connect(self): + def _connect(self) -> "SelectConnection": """Establish a connection to Pika.""" - self.pika_connection = initialise_pika_select_connection( + return initialise_pika_select_connection( self.parameters, self._on_open_connection, self._on_open_connection_error ) - process = multiprocessing.Process(target=self._run_pika_io_loop, args=(), daemon=True) - process.start() - def _on_open_connection(self, connection: "SelectConnection") -> None: logger.debug( f"RabbitMQ connection to '{self.parameters.host}' was established." ) + connection.add_on_close_callback(self._on_connection_closed) connection.channel(on_open_callback=self._on_channel_open) def _on_open_connection_error(self, _, error: Text) -> None: @@ -415,23 +412,50 @@ def _on_open_connection_error(self, _, error: Text) -> None: f"Connecting to '{self.parameters.host}' failed with error '{error}'. Trying again." ) + def _on_connection_closed(self, connection: "SelectConnection", reason): + self._channel = None + if self._closing: + # noinspection PyUnresolvedReferences + self._connection.ioloop.stop() + else: + logger.warning(f"Connection closed, reopening in 5 seconds: {reason}") + # noinspection PyUnresolvedReferences + self._connection.ioloop.call_later(5, self._reconnect) + + def _reconnect(self): + # noinspection PyUnresolvedReferences + self._connection.ioloop.stop() + + if not self._closing: + self._connection = self._connect() + # noinspection PyUnresolvedReferences + self._connection.ioloop.start() + def _on_channel_open(self, channel: "Channel") -> None: logger.debug("RabbitMQ channel was opened. Declaring fanout exchange.") + self._channel = channel + self._channel.add_on_close_callback(self._on_channel_closed) + # declare exchange of type 'fanout' in order to publish to multiple queues # (https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchange-fanout) - channel.exchange_declare(RABBITMQ_EXCHANGE, exchange_type="fanout") + self._channel.exchange_declare(RABBITMQ_EXCHANGE, exchange_type="fanout") for queue in self.queues: - channel.queue_declare(queue=queue, durable=True) - channel.queue_bind(exchange=RABBITMQ_EXCHANGE, queue=queue) + self._channel.queue_declare(queue=queue, durable=True) + self._channel.queue_bind(exchange=RABBITMQ_EXCHANGE, queue=queue) + + self.process_messages() - self.channel = channel + def _on_channel_closed(self, channel: "Channel", reason): + logger.warning(f"Channel {channel} was closed: {reason}") + self._connection.close() def _publish(self, message: Message) -> None: body, headers = message - self.channel.basic_publish( + logger.debug(f"Publishing: {self._channel}") + self._channel.basic_publish( exchange=RABBITMQ_EXCHANGE, routing_key="", body=body.encode(DEFAULT_ENCODING), @@ -439,9 +463,10 @@ def _publish(self, message: Message) -> None: ) def process_messages(self) -> None: - """Start to process messages. This process is indefinite thus it should - be started in a separate process. - """ + """Start to process messages.""" + + logger.debug(f"Channel: {self._channel}") + try: while True: message = self.get_message() @@ -457,6 +482,18 @@ def process_messages(self) -> None: "messages on this worker." ) + def run(self): + """Run the message processor by connecting to RabbitMQ and then + starting the IOLoop to block and allow the SelectConnection to operate. + + This function is blocking and indefinite thus it + should be started in a separate process. + """ + self._connection = self._connect() + + # noinspection PyUnresolvedReferences + self._pika_connection.ioloop.start() + class PikaEventBroker(EventBroker): """Pika-based event broker for publishing messages to RabbitMQ.""" @@ -556,24 +593,19 @@ def _connect(self) -> None: queues=self.queues, get_message=lambda: self.process_queue.get(), ) - self.process = self._start_pika_process() def _get_mp_context(self) -> multiprocessing.context.BaseContext: return multiprocessing.get_context(self.MP_CONTEXT) def _start_pika_process(self) -> Optional[multiprocessing.Process]: - if not self.pika_message_processor.is_connected: - self.pika_message_processor.connect() - - if self.pika_message_processor.is_ready(): + if self.pika_message_processor: process = multiprocessing.Process( - target=self.pika_message_processor.process_messages, args=(), daemon=True + target=self.pika_message_processor.run, daemon=True ) process.start() return process - logger.warning("RabbitMQ channel cannot be opened.") return None def _publish(self, body: Text, headers: MessageHeaders = None) -> None: @@ -581,7 +613,7 @@ def _publish(self, body: Text, headers: MessageHeaders = None) -> None: self._connect() if ( - self.pika_message_processor.is_connected + (self.process and self.process.is_alive()) or self.should_keep_unpublished_messages ): self.process_queue.put((body, headers)) @@ -633,7 +665,7 @@ def publish( f"Could not open Pika channel at host '{self.host}'. " f"Failed with error: {e}" ) - self.pika_message_processor = None + self.close() if self.raise_on_failure: raise e From 2e0774315d90d410e107c81813a4e6e74d529e45 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 14 Oct 2020 14:01:12 +0200 Subject: [PATCH 22/30] Logger cleanup --- rasa/core/brokers/pika.py | 94 +++++++++++++++++++-------------------- 1 file changed, 45 insertions(+), 49 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index 28adabe78243..fc7160ebfdba 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -92,6 +92,49 @@ def _pika_log_level(temporary_log_level: int) -> Generator[None, None, None]: pika_logger.setLevel(old_log_level) +def create_rabbitmq_ssl_options( + rabbitmq_host: Optional[Text] = None, +) -> Optional["pika.SSLOptions"]: + """Create RabbitMQ SSL options. + + Requires the following environment variables to be set: + + RABBITMQ_SSL_CLIENT_CERTIFICATE - path to the SSL client certificate (required) + RABBITMQ_SSL_CLIENT_KEY - path to the SSL client key (required) + RABBITMQ_SSL_CA_FILE - path to the SSL CA file for verification (optional) + RABBITMQ_SSL_KEY_PASSWORD - SSL private key password (optional) + + Details on how to enable RabbitMQ TLS support can be found here: + https://www.rabbitmq.com/ssl.html#enabling-tls + + Args: + rabbitmq_host: RabbitMQ hostname + + Returns: + Pika SSL context of type `pika.SSLOptions` if + the RABBITMQ_SSL_CLIENT_CERTIFICATE and RABBITMQ_SSL_CLIENT_KEY + environment variables are valid paths, else `None`. + """ + client_certificate_path = os.environ.get("RABBITMQ_SSL_CLIENT_CERTIFICATE") + client_key_path = os.environ.get("RABBITMQ_SSL_CLIENT_KEY") + + if client_certificate_path and client_key_path: + import pika + import rasa.server + + logger.debug(f"Configuring SSL context for RabbitMQ host '{rabbitmq_host}'.") + + ca_file_path = os.environ.get("RABBITMQ_SSL_CA_FILE") + key_password = os.environ.get("RABBITMQ_SSL_KEY_PASSWORD") + + ssl_context = rasa.server.create_ssl_context( + client_certificate_path, client_key_path, ca_file_path, key_password + ) + return pika.SSLOptions(ssl_context, rabbitmq_host) + else: + return None + + def _get_pika_parameters( host: Text, username: Text, @@ -454,7 +497,6 @@ def _on_channel_closed(self, channel: "Channel", reason): def _publish(self, message: Message) -> None: body, headers = message - logger.debug(f"Publishing: {self._channel}") self._channel.basic_publish( exchange=RABBITMQ_EXCHANGE, routing_key="", @@ -465,8 +507,6 @@ def _publish(self, message: Message) -> None: def process_messages(self) -> None: """Start to process messages.""" - logger.debug(f"Channel: {self._channel}") - try: while True: message = self.get_message() @@ -613,9 +653,8 @@ def _publish(self, body: Text, headers: MessageHeaders = None) -> None: self._connect() if ( - (self.process and self.process.is_alive()) - or self.should_keep_unpublished_messages - ): + self.process and self.process.is_alive() + ) or self.should_keep_unpublished_messages: self.process_queue.put((body, headers)) def is_ready( @@ -673,46 +712,3 @@ def publish( time.sleep(retry_delay_in_seconds) logger.error(f"Failed to publish Pika event on host '{self.host}':\n{body}") - - -def create_rabbitmq_ssl_options( - rabbitmq_host: Optional[Text] = None, -) -> Optional["pika.SSLOptions"]: - """Create RabbitMQ SSL options. - - Requires the following environment variables to be set: - - RABBITMQ_SSL_CLIENT_CERTIFICATE - path to the SSL client certificate (required) - RABBITMQ_SSL_CLIENT_KEY - path to the SSL client key (required) - RABBITMQ_SSL_CA_FILE - path to the SSL CA file for verification (optional) - RABBITMQ_SSL_KEY_PASSWORD - SSL private key password (optional) - - Details on how to enable RabbitMQ TLS support can be found here: - https://www.rabbitmq.com/ssl.html#enabling-tls - - Args: - rabbitmq_host: RabbitMQ hostname - - Returns: - Pika SSL context of type `pika.SSLOptions` if - the RABBITMQ_SSL_CLIENT_CERTIFICATE and RABBITMQ_SSL_CLIENT_KEY - environment variables are valid paths, else `None`. - """ - client_certificate_path = os.environ.get("RABBITMQ_SSL_CLIENT_CERTIFICATE") - client_key_path = os.environ.get("RABBITMQ_SSL_CLIENT_KEY") - - if client_certificate_path and client_key_path: - import pika - import rasa.server - - logger.debug(f"Configuring SSL context for RabbitMQ host '{rabbitmq_host}'.") - - ca_file_path = os.environ.get("RABBITMQ_SSL_CA_FILE") - key_password = os.environ.get("RABBITMQ_SSL_KEY_PASSWORD") - - ssl_context = rasa.server.create_ssl_context( - client_certificate_path, client_key_path, ca_file_path, key_password - ) - return pika.SSLOptions(ssl_context, rabbitmq_host) - else: - return None From 7d07c546ba796d269694663d25f2345d6f25cd06 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 14 Oct 2020 14:13:55 +0200 Subject: [PATCH 23/30] Code style updates --- rasa/core/brokers/pika.py | 5 ++--- tests/core/test_broker.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index fc7160ebfdba..e2815be04db8 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -17,7 +17,6 @@ Generator, TYPE_CHECKING, ) -from threading import Thread from rasa.constants import DEFAULT_LOG_LEVEL_LIBRARIES, ENV_LOG_LEVEL_LIBRARIES from rasa.shared.constants import DOCS_URL_PIKA_EVENT_BROKER @@ -455,7 +454,7 @@ def _on_open_connection_error(self, _, error: Text) -> None: f"Connecting to '{self.parameters.host}' failed with error '{error}'. Trying again." ) - def _on_connection_closed(self, connection: "SelectConnection", reason): + def _on_connection_closed(self, _, reason: Any): self._channel = None if self._closing: # noinspection PyUnresolvedReferences @@ -490,7 +489,7 @@ def _on_channel_open(self, channel: "Channel") -> None: self.process_messages() - def _on_channel_closed(self, channel: "Channel", reason): + def _on_channel_closed(self, channel: "Channel", reason: Any): logger.warning(f"Channel {channel} was closed: {reason}") self._connection.close() diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index fa55226c66ab..1074b4c58f7d 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -37,7 +37,7 @@ ) -def test_pika_broker_from_config(): +def test_pika_broker_from_config(monkeypatch: MonkeyPatch): cfg = read_endpoint_config( "data/test_endpoints/event_brokers/pika_endpoint.yml", "event_broker" ) From ffb280d5c3af8392047860931f3c201803fdc303 Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 14 Oct 2020 14:37:27 +0200 Subject: [PATCH 24/30] Small bugfix --- rasa/core/brokers/pika.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rasa/core/brokers/pika.py b/rasa/core/brokers/pika.py index e2815be04db8..5f5e5e19116f 100644 --- a/rasa/core/brokers/pika.py +++ b/rasa/core/brokers/pika.py @@ -531,7 +531,7 @@ def run(self): self._connection = self._connect() # noinspection PyUnresolvedReferences - self._pika_connection.ioloop.start() + self._connection.ioloop.start() class PikaEventBroker(EventBroker): From b16375b6c41728b51ede366b7a56f9684e8881ce Mon Sep 17 00:00:00 2001 From: alwx Date: Wed, 14 Oct 2020 16:29:31 +0200 Subject: [PATCH 25/30] Test fix --- tests/core/test_broker.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/core/test_broker.py b/tests/core/test_broker.py index 1074b4c58f7d..f3e493d171db 100644 --- a/tests/core/test_broker.py +++ b/tests/core/test_broker.py @@ -38,14 +38,14 @@ def test_pika_broker_from_config(monkeypatch: MonkeyPatch): + # patch PikaEventBroker so it doesn't try to connect to RabbitMQ on init + monkeypatch.setattr(PikaEventBroker, "_connect", lambda _: None) + cfg = read_endpoint_config( "data/test_endpoints/event_brokers/pika_endpoint.yml", "event_broker" ) actual = EventBroker.create(cfg) - # patch PikaEventBroker so it doesn't try to connect to RabbitMQ on init - monkeypatch.setattr(PikaEventBroker, "_connect", lambda _: None) - assert isinstance(actual, PikaEventBroker) assert actual.host == "localhost" assert actual.username == "username" From de01dc76d082480698e2ec8695810a36257e0cd4 Mon Sep 17 00:00:00 2001 From: Lucas Dutra Date: Mon, 28 Sep 2020 20:45:17 -0300 Subject: [PATCH 26/30] Add model relative path information log Co-authored-by: Leticia Meneses --- rasa/model.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rasa/model.py b/rasa/model.py index 51a570e3bc00..8005fcddb61a 100644 --- a/rasa/model.py +++ b/rasa/model.py @@ -150,6 +150,10 @@ def get_model(model_path: Text = DEFAULT_MODELS_PATH) -> TempDirectoryPath: elif not model_path.endswith(".tar.gz"): raise ModelNotFound(f"Path '{model_path}' does not point to a Rasa model file.") + model_relative_path = create_output_path(model_path) + model_relative_path = os.path.relpath(model_relative_path) + logger.info("Loading model {}...".format(model_relative_path)) + return unpack_model(model_path) From 275b4b5106c2b766d604b9cb015888178384bd8a Mon Sep 17 00:00:00 2001 From: mbslet Date: Mon, 28 Sep 2020 22:06:46 -0300 Subject: [PATCH 27/30] Add changelog file Co-authored-by: Lucas Dutra --- changelog/6571.improvement.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog/6571.improvement.md diff --git a/changelog/6571.improvement.md b/changelog/6571.improvement.md new file mode 100644 index 000000000000..5aff692d913d --- /dev/null +++ b/changelog/6571.improvement.md @@ -0,0 +1 @@ +Add a model relative path to CLI commands log \ No newline at end of file From 8d25ccd6f0dfcf23a2f4790c777ad245b7295b65 Mon Sep 17 00:00:00 2001 From: Lucas Dutra Date: Mon, 28 Sep 2020 22:26:03 -0300 Subject: [PATCH 28/30] Change string dynamic value method to f string Co-authored-by: Leticia Meneses --- rasa/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rasa/model.py b/rasa/model.py index 8005fcddb61a..93d9599f1898 100644 --- a/rasa/model.py +++ b/rasa/model.py @@ -152,7 +152,7 @@ def get_model(model_path: Text = DEFAULT_MODELS_PATH) -> TempDirectoryPath: model_relative_path = create_output_path(model_path) model_relative_path = os.path.relpath(model_relative_path) - logger.info("Loading model {}...".format(model_relative_path)) + logger.info(f"Loading model {model_relative_path}...") return unpack_model(model_path) From 152433f42527292e66904f7fe9459f6e0041223c Mon Sep 17 00:00:00 2001 From: Lucas Dutra Date: Tue, 6 Oct 2020 09:34:02 -0300 Subject: [PATCH 29/30] Update 6571 changelog according to requested changes Co-authored-by: Leticia Meneses --- changelog/6571.improvement.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/6571.improvement.md b/changelog/6571.improvement.md index 5aff692d913d..1f98fc652472 100644 --- a/changelog/6571.improvement.md +++ b/changelog/6571.improvement.md @@ -1 +1 @@ -Add a model relative path to CLI commands log \ No newline at end of file +Log the model's relative path when using CLI commands. \ No newline at end of file From 89980fb09efcfa9e1a30861fa16e606a5827e9d7 Mon Sep 17 00:00:00 2001 From: Lucas Dutra Date: Wed, 14 Oct 2020 13:10:47 -0300 Subject: [PATCH 30/30] Handle python relative path function windows error Co-authored-by: Leticia Meneses --- rasa/model.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rasa/model.py b/rasa/model.py index 93d9599f1898..76b67719a298 100644 --- a/rasa/model.py +++ b/rasa/model.py @@ -150,8 +150,10 @@ def get_model(model_path: Text = DEFAULT_MODELS_PATH) -> TempDirectoryPath: elif not model_path.endswith(".tar.gz"): raise ModelNotFound(f"Path '{model_path}' does not point to a Rasa model file.") - model_relative_path = create_output_path(model_path) - model_relative_path = os.path.relpath(model_relative_path) + try: + model_relative_path = os.path.relpath(model_path) + except ValueError: + model_relative_path = model_path logger.info(f"Loading model {model_relative_path}...") return unpack_model(model_path)