Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TicketExistsError #4919

Merged
merged 7 commits into from
Dec 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/4918.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Locks for tickets in ``LockStore`` are immediately issued without a redundant
check for their availability.
4 changes: 1 addition & 3 deletions rasa/core/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,8 @@ def last_issued(self) -> int:
"""

ticket_number = self._ticket_number_for(-1)
if ticket_number is not None:
return ticket_number

return NO_TICKET_ISSUED
return ticket_number if ticket_number is not None else NO_TICKET_ISSUED

@property
def now_serving(self) -> Optional[int]:
Expand Down
68 changes: 9 additions & 59 deletions rasa/core/lock_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import json
import logging
import os
from typing import Text, Optional, Union, AsyncGenerator

from async_generator import asynccontextmanager
from typing import Text, Optional, AsyncGenerator

from rasa.core.constants import DEFAULT_LOCK_LIFETIME
from rasa.core.lock import TicketLock, NO_TICKET_ISSUED
from rasa.core.lock import TicketLock
from rasa.utils.endpoints import EndpointConfig

logger = logging.getLogger(__name__)
Expand All @@ -28,18 +28,6 @@ class LockError(Exception):
pass


# noinspection PyUnresolvedReferences
class TicketExistsError(Exception):
"""Exception that is raised when an already-existing ticket for a conversation
has been issued.

Attributes:
message (str): explanation of which `conversation_id` raised the error
"""

pass


class LockStore:
@staticmethod
def find_lock_store(store: EndpointConfig = None) -> "LockStore":
Expand Down Expand Up @@ -93,7 +81,7 @@ def save_lock(self, lock: TicketLock) -> None:
raise NotImplementedError

def issue_ticket(
self, conversation_id: Text, lock_lifetime: Union[float, int] = LOCK_LIFETIME
self, conversation_id: Text, lock_lifetime: float = LOCK_LIFETIME
) -> int:
"""Issue new ticket with `lock_lifetime` for lock associated with
`conversation_id`.
Expand All @@ -103,18 +91,6 @@ def issue_ticket(

lock = self.get_or_create_lock(conversation_id)
ticket = lock.issue_ticket(lock_lifetime)

while True:
try:
self.ensure_ticket_available(lock)
break
except TicketExistsError:
# issue a new ticket if current ticket number has been issued twice
logger.exception(
"Ticket could not be issued. Issuing new ticket and retrying..."
)
ticket = lock.issue_ticket(lock_lifetime)

self.save_lock(lock)

return ticket
Expand All @@ -123,8 +99,8 @@ def issue_ticket(
async def lock(
self,
conversation_id: Text,
lock_lifetime: int = LOCK_LIFETIME,
wait_time_in_seconds: Union[int, float] = 1,
lock_lifetime: float = LOCK_LIFETIME,
wait_time_in_seconds: float = 1,
) -> AsyncGenerator[TicketLock, None]:
"""Acquire lock with lifetime `lock_lifetime`for `conversation_id`.

Expand All @@ -143,10 +119,7 @@ async def lock(
self.cleanup(conversation_id, ticket)

async def _acquire_lock(
self,
conversation_id: Text,
ticket: int,
wait_time_in_seconds: Union[int, float],
self, conversation_id: Text, ticket: int, wait_time_in_seconds: float,
) -> TicketLock:

while True:
Expand All @@ -162,17 +135,16 @@ async def _acquire_lock(
return lock

logger.debug(
"Failed to acquire lock for conversation ID '{}'. Retrying..."
"".format(conversation_id)
f"Failed to acquire lock for conversation ID '{conversation_id}'. "
f"Retrying..."
)

# sleep and update lock
await asyncio.sleep(wait_time_in_seconds)
self.update_lock(conversation_id)

raise LockError(
"Could not acquire lock for conversation_id '{}'."
"".format(conversation_id)
f"Could not acquire lock for conversation_id '{conversation_id}'."
)

def update_lock(self, conversation_id: Text) -> None:
Expand Down Expand Up @@ -229,28 +201,6 @@ def _log_deletion(conversation_id: Text, deletion_successful: bool) -> None:
else:
logger.debug(f"Could not delete lock for conversation '{conversation_id}'.")

def ensure_ticket_available(self, lock: TicketLock) -> None:
"""Check for duplicate tickets issued for `lock`.

This function should be called before saving `lock`. Raises `TicketExistsError`
if the last issued ticket for `lock` does not match the last ticket issued
for a lock fetched from storage for `lock.conversation_id`. This indicates
that some other process has issued a ticket for `lock` in the meantime.
"""

existing_lock = self.get_lock(lock.conversation_id)
if not existing_lock or existing_lock.last_issued == NO_TICKET_ISSUED:
# lock does not yet exist for conversation or no ticket has been issued
return

# raise if the last issued ticket number of `existing_lock` is not the same as
# that of the one being acquired
if existing_lock.last_issued != lock.last_issued:
raise TicketExistsError(
"Ticket '{}' already exists for conversation ID '{}'."
"".format(existing_lock.last_issued, lock.conversation_id)
)


class RedisLockStore(LockStore):
"""Redis store for ticket locks."""
Expand Down
72 changes: 30 additions & 42 deletions tests/core/test_lock_store.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
import asyncio
import copy
import os
from typing import Union, Text
from unittest.mock import patch

import numpy as np
import pytest
import time
from _pytest.tmpdir import TempdirFactory
from unittest.mock import patch

import rasa.utils.io
from rasa.core.agent import Agent
from rasa.core.channels import UserMessage
from rasa.core.constants import INTENT_MESSAGE_PREFIX, DEFAULT_LOCK_LIFETIME
from rasa.core.lock import TicketLock, Ticket
from rasa.core.lock_store import InMemoryLockStore, LockError, TicketExistsError
from rasa.core.lock import TicketLock
from rasa.core.lock_store import (
InMemoryLockStore,
LockError,
LockStore,
RedisLockStore,
)


class FakeRedisLockStore(RedisLockStore):
"""Fake `RedisLockStore` using `fakeredis` library."""

def __init__(self):
import fakeredis

self.red = fakeredis.FakeStrictRedis()

# added in redis==3.3.0, but not yet in fakeredis
self.red.connection_pool.connection_class.health_check_interval = 0

super(RedisLockStore, self).__init__()


def test_issue_ticket():
Expand Down Expand Up @@ -52,8 +68,8 @@ def test_remove_expired_tickets():
assert len(lock.tickets) == 1


def test_create_lock_store():
lock_store = InMemoryLockStore()
@pytest.mark.parametrize("lock_store", [InMemoryLockStore(), FakeRedisLockStore()])
def test_create_lock_store(lock_store: LockStore):
conversation_id = "my id 0"

# create and lock
Expand All @@ -64,8 +80,8 @@ def test_create_lock_store():
assert lock.conversation_id == conversation_id


def test_serve_ticket():
lock_store = InMemoryLockStore()
@pytest.mark.parametrize("lock_store", [InMemoryLockStore(), FakeRedisLockStore()])
def test_serve_ticket(lock_store: LockStore):
conversation_id = "my id 1"

lock = lock_store.create_lock(conversation_id)
Expand Down Expand Up @@ -99,8 +115,9 @@ def test_serve_ticket():
assert not lock.is_someone_waiting()


def test_lock_expiration():
lock_store = InMemoryLockStore()
# noinspection PyProtectedMember
@pytest.mark.parametrize("lock_store", [InMemoryLockStore(), FakeRedisLockStore()])
def test_lock_expiration(lock_store: LockStore):
conversation_id = "my id 2"
lock = lock_store.create_lock(conversation_id)
lock_store.save_lock(lock)
Expand All @@ -120,33 +137,6 @@ def test_lock_expiration():
assert lock.issue_ticket(10) == 1


def test_ticket_exists_error():
def mocked_issue_ticket(
self,
conversation_id: Text,
lock_lifetime: Union[float, int] = DEFAULT_LOCK_LIFETIME,
) -> None:
# mock LockStore.issue_ticket() so it issues two tickets for the same
# conversation ID simultaneously

lock = self.get_or_create_lock(conversation_id)
lock.issue_ticket(lock_lifetime)
self.save_lock(lock)

# issue another ticket for this lock
lock_2 = copy.deepcopy(lock)
lock_2.tickets.append(Ticket(1, time.time() + DEFAULT_LOCK_LIFETIME))

self.ensure_ticket_available(lock_2)

lock_store = InMemoryLockStore()
conversation_id = "my id 3"

with patch.object(InMemoryLockStore, "issue_ticket", mocked_issue_ticket):
with pytest.raises(TicketExistsError):
lock_store.issue_ticket(conversation_id)


async def test_multiple_conversation_ids(default_agent: Agent):
text = INTENT_MESSAGE_PREFIX + 'greet{"name":"Rasa"}'

Expand Down Expand Up @@ -176,9 +166,7 @@ async def test_message_order(tmpdir_factory: TempdirFactory, default_agent: Agen
# record messages as they come and and as they're processed in files so we
# can check the order later on. We don't need the return value of this method so
# we'll just return None.
async def mocked_handle_message(
self, message: UserMessage, wait: Union[int, float]
) -> None:
async def mocked_handle_message(self, message: UserMessage, wait: float) -> None:
# write incoming message to file
with open(str(incoming_order_file), "a+") as f_0:
f_0.write(message.text + "\n")
Expand Down