From 57bc074d07adc3e3c4927bd258caacf9ad1275a5 Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Fri, 5 Feb 2021 09:24:57 -0800 Subject: [PATCH 1/9] Added lock_store param to processor so handle_reminders can lock --- rasa/core/agent.py | 1 + rasa/core/processor.py | 45 ++++++++++++++++++++++-------------------- tests/core/conftest.py | 3 +++ 3 files changed, 28 insertions(+), 21 deletions(-) diff --git a/rasa/core/agent.py b/rasa/core/agent.py index cfb94d31800e..a252a8dc9ff6 100644 --- a/rasa/core/agent.py +++ b/rasa/core/agent.py @@ -888,6 +888,7 @@ def create_processor( self.policy_ensemble, self.domain, self.tracker_store, + self.lock_store, self.nlg, action_endpoint=self.action_endpoint, message_preprocessor=preprocessor, diff --git a/rasa/core/processor.py b/rasa/core/processor.py index 2cd6d0f14c8c..9a06f485f22d 100644 --- a/rasa/core/processor.py +++ b/rasa/core/processor.py @@ -44,6 +44,7 @@ RegexInterpreter, ) from rasa.core.nlg import NaturalLanguageGenerator +from rasa.core.lock_store import LockStore from rasa.core.policies.ensemble import PolicyEnsemble from rasa.core.tracker_store import TrackerStore from rasa.core.trackers import DialogueStateTracker, EventVerbosity @@ -69,6 +70,7 @@ def __init__( policy_ensemble: PolicyEnsemble, domain: Domain, tracker_store: TrackerStore, + lock_store: LockStore, generator: NaturalLanguageGenerator, action_endpoint: Optional[EndpointConfig] = None, max_number_of_predictions: int = MAX_NUMBER_OF_PREDICTIONS, @@ -348,28 +350,29 @@ async def handle_reminder( ) -> None: """Handle a reminder that is triggered asynchronously.""" - tracker = await self.get_tracker_with_session_start(sender_id, output_channel) - - if not tracker: - logger.warning( - f"Failed to retrieve tracker for conversation ID '{sender_id}'." - ) - return None + async with self.lock_store.lock(sender_id): + tracker = await self.get_tracker_with_session_start(sender_id, output_channel) - if ( - reminder_event.kill_on_user_message - and self._has_message_after_reminder(tracker, reminder_event) - or not self._is_reminder_still_valid(tracker, reminder_event) - ): - logger.debug( - f"Canceled reminder because it is outdated ({reminder_event})." - ) - else: - intent = reminder_event.intent - entities = reminder_event.entities or {} - await self.trigger_external_user_uttered( - intent, entities, tracker, output_channel - ) + if not tracker: + logger.warning( + f"Failed to retrieve tracker for conversation ID '{sender_id}'." + ) + return None + + if ( + reminder_event.kill_on_user_message + and self._has_message_after_reminder(tracker, reminder_event) + or not self._is_reminder_still_valid(tracker, reminder_event) + ): + logger.debug( + f"Canceled reminder because it is outdated ({reminder_event})." + ) + else: + intent = reminder_event.intent + entities = reminder_event.entities or {} + await self.trigger_external_user_uttered( + intent, entities, tracker, output_channel + ) async def trigger_external_user_uttered( self, diff --git a/tests/core/conftest.py b/tests/core/conftest.py index d015816127e3..82c295afe33f 100644 --- a/tests/core/conftest.py +++ b/tests/core/conftest.py @@ -20,6 +20,7 @@ from rasa.core.processor import MessageProcessor from rasa.core.slots import Slot from rasa.core.tracker_store import InMemoryTrackerStore, MongoTrackerStore +from rasa.core.lock_store import LockStore, InMemoryLockStore from rasa.core.trackers import DialogueStateTracker @@ -148,11 +149,13 @@ def default_channel() -> OutputChannel: @pytest.fixture async def default_processor(default_agent: Agent) -> MessageProcessor: tracker_store = InMemoryTrackerStore(default_agent.domain) + lock_store = InMemoryLockStore() return MessageProcessor( default_agent.interpreter, default_agent.policy_ensemble, default_agent.domain, tracker_store, + lock_store, TemplatedNaturalLanguageGenerator(default_agent.domain.templates), ) From 12cf0a897916469d82e9a59498eba01dbb45c3ff Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Fri, 5 Feb 2021 09:39:11 -0800 Subject: [PATCH 2/9] Added changelog --- changelog/7895.bugfix.md | 1 + rasa/core/processor.py | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 changelog/7895.bugfix.md diff --git a/changelog/7895.bugfix.md b/changelog/7895.bugfix.md new file mode 100644 index 000000000000..6eb9a2b182d7 --- /dev/null +++ b/changelog/7895.bugfix.md @@ -0,0 +1 @@ +Fixed bug where the conversation does not lock before handling a reminder event. \ No newline at end of file diff --git a/rasa/core/processor.py b/rasa/core/processor.py index 9a06f485f22d..f483b16485ab 100644 --- a/rasa/core/processor.py +++ b/rasa/core/processor.py @@ -351,7 +351,9 @@ async def handle_reminder( """Handle a reminder that is triggered asynchronously.""" async with self.lock_store.lock(sender_id): - tracker = await self.get_tracker_with_session_start(sender_id, output_channel) + tracker = await self.get_tracker_with_session_start( + sender_id, output_channel + ) if not tracker: logger.warning( From 400d9c6389281669c9331f46b79f27d6e508fb9a Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Fri, 5 Feb 2021 13:04:52 -0800 Subject: [PATCH 3/9] Fixed processor init --- rasa/core/processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/rasa/core/processor.py b/rasa/core/processor.py index f483b16485ab..31ce3d106fe0 100644 --- a/rasa/core/processor.py +++ b/rasa/core/processor.py @@ -82,6 +82,7 @@ def __init__( self.policy_ensemble = policy_ensemble self.domain = domain self.tracker_store = tracker_store + self.lock_store = lock_store self.max_number_of_predictions = max_number_of_predictions self.message_preprocessor = message_preprocessor self.on_circuit_break = on_circuit_break From aa8eb6a60a153378cbcbb580e4e4939f5835ca55 Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Mon, 8 Feb 2021 15:54:40 -0800 Subject: [PATCH 4/9] Updated processor test case --- tests/core/test_processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/core/test_processor.py b/tests/core/test_processor.py index f1b84a283596..81bbfa29018a 100644 --- a/tests/core/test_processor.py +++ b/tests/core/test_processor.py @@ -109,7 +109,7 @@ async def test_http_parsing(): inter = RasaNLUHttpInterpreter(endpoint_config=endpoint) try: - await MessageProcessor(inter, None, None, None, None)._parse_message( + await MessageProcessor(inter, None, None, None, None, None)._parse_message( message ) except KeyError: From edae9904f5a2d1f74ff0622ab38009a9c95c0a58 Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Wed, 10 Feb 2021 19:17:23 -0800 Subject: [PATCH 5/9] Removed tracker condition --- rasa/core/processor.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/rasa/core/processor.py b/rasa/core/processor.py index 31ce3d106fe0..4cf478c73bd2 100644 --- a/rasa/core/processor.py +++ b/rasa/core/processor.py @@ -356,12 +356,6 @@ async def handle_reminder( sender_id, output_channel ) - if not tracker: - logger.warning( - f"Failed to retrieve tracker for conversation ID '{sender_id}'." - ) - return None - if ( reminder_event.kill_on_user_message and self._has_message_after_reminder(tracker, reminder_event) From 59e1c3fea31ae31644e7afca6702b4a33115b614 Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Fri, 12 Feb 2021 12:21:32 -0800 Subject: [PATCH 6/9] Added test case to check if lock_store properly locks when handling a reminder --- tests/core/test_processor.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/core/test_processor.py b/tests/core/test_processor.py index 81bbfa29018a..d1c94c7f223e 100644 --- a/tests/core/test_processor.py +++ b/tests/core/test_processor.py @@ -181,6 +181,29 @@ async def test_reminder_scheduled( assert t.events[-1] == ActionExecuted("action_listen") +async def test_reminder_lock( + default_channel: CollectingOutputChannel, default_processor: MessageProcessor +): + sender_id = uuid.uuid4().hex + + reminder = ReminderScheduled("remind", datetime.datetime.now()) + tracker = default_processor.tracker_store.get_or_create_tracker(sender_id) + + tracker.update(UserUttered("test")) + tracker.update(ActionExecuted("action_schedule_reminder")) + tracker.update(reminder) + + default_processor.tracker_store.save(tracker) + + async with default_processor.handle_reminder( + reminder, sender_id, default_channel, default_processor.nlg + ): + lock_store = default_processor.lock_store + lock = lock_store.get_lock(sender_id) + assert lock + assert lock.conversation_id == sender_id + + async def test_reminder_aborted( default_channel: CollectingOutputChannel, default_processor: MessageProcessor ): From 93ec73f497d9f4568300991618536faaf359d4a1 Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Fri, 12 Feb 2021 16:01:50 -0800 Subject: [PATCH 7/9] Fixed reminder lock_store test case --- tests/core/test_processor.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/core/test_processor.py b/tests/core/test_processor.py index d1c94c7f223e..7a74166af09d 100644 --- a/tests/core/test_processor.py +++ b/tests/core/test_processor.py @@ -184,6 +184,15 @@ async def test_reminder_scheduled( async def test_reminder_lock( default_channel: CollectingOutputChannel, default_processor: MessageProcessor ): + from io import StringIO + + logger = logging.getLogger("rasa.core.lock_store") + logger.setLevel(logging.DEBUG) + + log_stream = StringIO() + log_handler = logging.StreamHandler(log_stream) + logger.addHandler(log_handler) + sender_id = uuid.uuid4().hex reminder = ReminderScheduled("remind", datetime.datetime.now()) @@ -195,13 +204,13 @@ async def test_reminder_lock( default_processor.tracker_store.save(tracker) - async with default_processor.handle_reminder( + await default_processor.handle_reminder( reminder, sender_id, default_channel, default_processor.nlg - ): - lock_store = default_processor.lock_store - lock = lock_store.get_lock(sender_id) - assert lock - assert lock.conversation_id == sender_id + ) + + last_log_message = log_stream.getvalue().splitlines()[-1] + + assert last_log_message == f"Deleted lock for conversation '{sender_id}'." async def test_reminder_aborted( From d29940a66f7a076249d01e823a51bdc85e95c57f Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Tue, 16 Feb 2021 15:37:35 -0800 Subject: [PATCH 8/9] Updated test case to use caplog --- tests/core/test_processor.py | 40 +++++++++++++++--------------------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/tests/core/test_processor.py b/tests/core/test_processor.py index 7a74166af09d..825d921aba78 100644 --- a/tests/core/test_processor.py +++ b/tests/core/test_processor.py @@ -6,6 +6,7 @@ import uuid import json from _pytest.monkeypatch import MonkeyPatch +from _pytest.logging import LogCaptureFixture from aioresponses import aioresponses from typing import Optional, Text, List from unittest.mock import patch @@ -182,35 +183,28 @@ async def test_reminder_scheduled( async def test_reminder_lock( - default_channel: CollectingOutputChannel, default_processor: MessageProcessor + default_channel: CollectingOutputChannel, + default_processor: MessageProcessor, + caplog: LogCaptureFixture, ): - from io import StringIO + caplog.clear() + with caplog.at_level(logging.DEBUG): + sender_id = uuid.uuid4().hex - logger = logging.getLogger("rasa.core.lock_store") - logger.setLevel(logging.DEBUG) - - log_stream = StringIO() - log_handler = logging.StreamHandler(log_stream) - logger.addHandler(log_handler) - - sender_id = uuid.uuid4().hex - - reminder = ReminderScheduled("remind", datetime.datetime.now()) - tracker = default_processor.tracker_store.get_or_create_tracker(sender_id) - - tracker.update(UserUttered("test")) - tracker.update(ActionExecuted("action_schedule_reminder")) - tracker.update(reminder) + reminder = ReminderScheduled("remind", datetime.datetime.now()) + tracker = default_processor.tracker_store.get_or_create_tracker(sender_id) - default_processor.tracker_store.save(tracker) + tracker.update(UserUttered("test")) + tracker.update(ActionExecuted("action_schedule_reminder")) + tracker.update(reminder) - await default_processor.handle_reminder( - reminder, sender_id, default_channel, default_processor.nlg - ) + default_processor.tracker_store.save(tracker) - last_log_message = log_stream.getvalue().splitlines()[-1] + await default_processor.handle_reminder( + reminder, sender_id, default_channel, default_processor.nlg + ) - assert last_log_message == f"Deleted lock for conversation '{sender_id}'." + assert f"Deleted lock for conversation '{sender_id}'." in caplog.text async def test_reminder_aborted( From fa472a5b663ea08e8f717a5499bbf8cda0ddccba Mon Sep 17 00:00:00 2001 From: b-quachtran Date: Thu, 18 Feb 2021 15:05:50 -0800 Subject: [PATCH 9/9] Renamed changelog file from md to rst --- changelog/{7895.bugfix.md => 7895.bugfix.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/{7895.bugfix.md => 7895.bugfix.rst} (100%) diff --git a/changelog/7895.bugfix.md b/changelog/7895.bugfix.rst similarity index 100% rename from changelog/7895.bugfix.md rename to changelog/7895.bugfix.rst