Skip to content

Commit

Permalink
Merge pull request #4919 from RasaHQ/fix-lock-store
Browse files Browse the repository at this point in the history
Fix TicketExistsError
  • Loading branch information
ricwo authored Dec 8, 2019
2 parents 0151e24 + 06f57a3 commit e9e0ac8
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 104 deletions.
2 changes: 2 additions & 0 deletions changelog/4918.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Locks for tickets in ``LockStore`` are immediately issued without a redundant
check for their availability.
4 changes: 1 addition & 3 deletions rasa/core/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ def last_issued(self) -> int:
"""

ticket_number = self._ticket_number_for(-1)
if ticket_number is not None:
return ticket_number

return NO_TICKET_ISSUED
return ticket_number if ticket_number is not None else NO_TICKET_ISSUED

@property
def now_serving(self) -> Optional[int]:
Expand Down
68 changes: 9 additions & 59 deletions rasa/core/lock_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import json
import logging
import os
from typing import Text, Optional, Union, AsyncGenerator

from async_generator import asynccontextmanager
from typing import Text, Optional, AsyncGenerator

from rasa.core.constants import DEFAULT_LOCK_LIFETIME
from rasa.core.lock import TicketLock, NO_TICKET_ISSUED
from rasa.core.lock import TicketLock
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)
Expand All @@ -28,18 +28,6 @@ class LockError(Exception):
pass


# noinspection PyUnresolvedReferences
class TicketExistsError(Exception):
"""Exception that is raised when an already-existing ticket for a conversation
has been issued.
Attributes:
message (str): explanation of which `conversation_id` raised the error
"""

pass


class LockStore:
@staticmethod
def find_lock_store(store: EndpointConfig = None) -> "LockStore":
Expand Down Expand Up @@ -93,7 +81,7 @@ def save_lock(self, lock: TicketLock) -> None:
raise NotImplementedError

def issue_ticket(
self, conversation_id: Text, lock_lifetime: Union[float, int] = LOCK_LIFETIME
self, conversation_id: Text, lock_lifetime: float = LOCK_LIFETIME
) -> int:
"""Issue new ticket with `lock_lifetime` for lock associated with
`conversation_id`.
Expand All @@ -103,18 +91,6 @@ def issue_ticket(

lock = self.get_or_create_lock(conversation_id)
ticket = lock.issue_ticket(lock_lifetime)

while True:
try:
self.ensure_ticket_available(lock)
break
except TicketExistsError:
# issue a new ticket if current ticket number has been issued twice
logger.exception(
"Ticket could not be issued. Issuing new ticket and retrying..."
)
ticket = lock.issue_ticket(lock_lifetime)

self.save_lock(lock)

return ticket
Expand All @@ -123,8 +99,8 @@ def issue_ticket(
async def lock(
self,
conversation_id: Text,
lock_lifetime: int = LOCK_LIFETIME,
wait_time_in_seconds: Union[int, float] = 1,
lock_lifetime: float = LOCK_LIFETIME,
wait_time_in_seconds: float = 1,
) -> AsyncGenerator[TicketLock, None]:
"""Acquire lock with lifetime `lock_lifetime`for `conversation_id`.
Expand All @@ -143,10 +119,7 @@ async def lock(
self.cleanup(conversation_id, ticket)

async def _acquire_lock(
self,
conversation_id: Text,
ticket: int,
wait_time_in_seconds: Union[int, float],
self, conversation_id: Text, ticket: int, wait_time_in_seconds: float,
) -> TicketLock:

while True:
Expand All @@ -162,17 +135,16 @@ async def _acquire_lock(
return lock

logger.debug(
"Failed to acquire lock for conversation ID '{}'. Retrying..."
"".format(conversation_id)
f"Failed to acquire lock for conversation ID '{conversation_id}'. "
f"Retrying..."
)

# sleep and update lock
await asyncio.sleep(wait_time_in_seconds)
self.update_lock(conversation_id)

raise LockError(
"Could not acquire lock for conversation_id '{}'."
"".format(conversation_id)
f"Could not acquire lock for conversation_id '{conversation_id}'."
)

def update_lock(self, conversation_id: Text) -> None:
Expand Down Expand Up @@ -229,28 +201,6 @@ def _log_deletion(conversation_id: Text, deletion_successful: bool) -> None:
else:
logger.debug(f"Could not delete lock for conversation '{conversation_id}'.")

def ensure_ticket_available(self, lock: TicketLock) -> None:
"""Check for duplicate tickets issued for `lock`.
This function should be called before saving `lock`. Raises `TicketExistsError`
if the last issued ticket for `lock` does not match the last ticket issued
for a lock fetched from storage for `lock.conversation_id`. This indicates
that some other process has issued a ticket for `lock` in the meantime.
"""

existing_lock = self.get_lock(lock.conversation_id)
if not existing_lock or existing_lock.last_issued == NO_TICKET_ISSUED:
# lock does not yet exist for conversation or no ticket has been issued
return

# raise if the last issued ticket number of `existing_lock` is not the same as
# that of the one being acquired
if existing_lock.last_issued != lock.last_issued:
raise TicketExistsError(
"Ticket '{}' already exists for conversation ID '{}'."
"".format(existing_lock.last_issued, lock.conversation_id)
)


class RedisLockStore(LockStore):
"""Redis store for ticket locks."""
Expand Down
72 changes: 30 additions & 42 deletions tests/core/test_lock_store.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
import asyncio
import copy
import os
from typing import Union, Text
from unittest.mock import patch

import numpy as np
import pytest
import time
from _pytest.tmpdir import TempdirFactory
from unittest.mock import patch

import rasa.utils.io
from rasa.core.agent import Agent
from rasa.core.channels import UserMessage
from rasa.core.constants import INTENT_MESSAGE_PREFIX, DEFAULT_LOCK_LIFETIME
from rasa.core.lock import TicketLock, Ticket
from rasa.core.lock_store import InMemoryLockStore, LockError, TicketExistsError
from rasa.core.lock import TicketLock
from rasa.core.lock_store import (
InMemoryLockStore,
LockError,
LockStore,
RedisLockStore,
)


class FakeRedisLockStore(RedisLockStore):
"""Fake `RedisLockStore` using `fakeredis` library."""

def __init__(self):
import fakeredis

self.red = fakeredis.FakeStrictRedis()

# added in redis==3.3.0, but not yet in fakeredis
self.red.connection_pool.connection_class.health_check_interval = 0

super(RedisLockStore, self).__init__()


def test_issue_ticket():
Expand Down Expand Up @@ -52,8 +68,8 @@ def test_remove_expired_tickets():
assert len(lock.tickets) == 1


def test_create_lock_store():
lock_store = InMemoryLockStore()
@pytest.mark.parametrize("lock_store", [InMemoryLockStore(), FakeRedisLockStore()])
def test_create_lock_store(lock_store: LockStore):
conversation_id = "my id 0"

# create and lock
Expand All @@ -64,8 +80,8 @@ def test_create_lock_store():
assert lock.conversation_id == conversation_id


def test_serve_ticket():
lock_store = InMemoryLockStore()
@pytest.mark.parametrize("lock_store", [InMemoryLockStore(), FakeRedisLockStore()])
def test_serve_ticket(lock_store: LockStore):
conversation_id = "my id 1"

lock = lock_store.create_lock(conversation_id)
Expand Down Expand Up @@ -99,8 +115,9 @@ def test_serve_ticket():
assert not lock.is_someone_waiting()


def test_lock_expiration():
lock_store = InMemoryLockStore()
# noinspection PyProtectedMember
@pytest.mark.parametrize("lock_store", [InMemoryLockStore(), FakeRedisLockStore()])
def test_lock_expiration(lock_store: LockStore):
conversation_id = "my id 2"
lock = lock_store.create_lock(conversation_id)
lock_store.save_lock(lock)
Expand All @@ -120,33 +137,6 @@ def test_lock_expiration():
assert lock.issue_ticket(10) == 1


def test_ticket_exists_error():
def mocked_issue_ticket(
self,
conversation_id: Text,
lock_lifetime: Union[float, int] = DEFAULT_LOCK_LIFETIME,
) -> None:
# mock LockStore.issue_ticket() so it issues two tickets for the same
# conversation ID simultaneously

lock = self.get_or_create_lock(conversation_id)
lock.issue_ticket(lock_lifetime)
self.save_lock(lock)

# issue another ticket for this lock
lock_2 = copy.deepcopy(lock)
lock_2.tickets.append(Ticket(1, time.time() + DEFAULT_LOCK_LIFETIME))

self.ensure_ticket_available(lock_2)

lock_store = InMemoryLockStore()
conversation_id = "my id 3"

with patch.object(InMemoryLockStore, "issue_ticket", mocked_issue_ticket):
with pytest.raises(TicketExistsError):
lock_store.issue_ticket(conversation_id)


async def test_multiple_conversation_ids(default_agent: Agent):
text = INTENT_MESSAGE_PREFIX + 'greet{"name":"Rasa"}'

Expand Down Expand Up @@ -176,9 +166,7 @@ async def test_message_order(tmpdir_factory: TempdirFactory, default_agent: Agen
# record messages as they come and and as they're processed in files so we
# can check the order later on. We don't need the return value of this method so
# we'll just return None.
async def mocked_handle_message(
self, message: UserMessage, wait: Union[int, float]
) -> None:
async def mocked_handle_message(self, message: UserMessage, wait: float) -> None:
# write incoming message to file
with open(str(incoming_order_file), "a+") as f_0:
f_0.write(message.text + "\n")
Expand Down

0 comments on commit e9e0ac8

Please sign in to comment.