Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline and transaction #47

Merged
merged 10 commits into from
May 15, 2024
97 changes: 97 additions & 0 deletions tests/commands/asyncio/test_asyncio_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import pytest
import pytest_asyncio

from upstash_redis.asyncio import Redis


@pytest_asyncio.fixture(autouse=True)
async def flush_db(async_redis: Redis):
await async_redis.delete("rocket", "space", "marine")

@pytest.mark.asyncio
async def test_pipeline(async_redis: Redis):

pipeline = async_redis.pipeline()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = await pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

@pytest.mark.asyncio
async def test_multi(async_redis: Redis):

pipeline = async_redis.multi()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = await pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

@pytest.mark.asyncio
def test_raises(async_redis: Redis):

pipeline = async_redis.pipeline()
with pytest.raises(NotImplementedError):
pipeline.pipeline()
with pytest.raises(NotImplementedError):
pipeline.multi()

multi = async_redis.multi()
with pytest.raises(NotImplementedError):
multi.pipeline()
with pytest.raises(NotImplementedError):
multi.multi()

@pytest.mark.asyncio
async def test_context_manager_usage(async_redis: Redis):

async with async_redis.pipeline() as pipeline:
pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

# redis still works after pipeline is done
result = await async_redis.get("rocket")
assert result == "4"

get_pipeline = async_redis.pipeline()
get_pipeline.get("rocket")
get_pipeline.get("space")
get_pipeline.get("marine")

res = await get_pipeline.exec()
assert res == ["4", "2", None]

@pytest.mark.asyncio
def test_context_manager_raise(async_redis: Redis):
"""
Check that exceptions in context aren't silently ignored

This can happen if we return something in __exit__ method
"""
with pytest.raises(Exception):
with async_redis.pipeline() as pipeline:
pipeline.incr("rocket")
raise Exception("test")
91 changes: 91 additions & 0 deletions tests/commands/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pytest

from upstash_redis import Redis


@pytest.fixture(autouse=True)
def flush_db(redis: Redis):
redis.delete("rocket", "space", "marine")

def test_pipeline(redis: Redis):

pipeline = redis.pipeline()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

def test_multi(redis: Redis):

pipeline = redis.multi()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

def test_raises(redis: Redis):

pipeline = redis.pipeline()
with pytest.raises(NotImplementedError):
pipeline.pipeline()
with pytest.raises(NotImplementedError):
pipeline.multi()

multi = redis.multi()
with pytest.raises(NotImplementedError):
multi.pipeline()
with pytest.raises(NotImplementedError):
multi.multi()

def test_context_manager_usage(redis: Redis):

with redis.pipeline() as pipeline:
pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

# redis still works after pipeline is done
result = redis.get("rocket")
assert result == "4"

get_pipeline = redis.pipeline()
get_pipeline.get("rocket")
get_pipeline.get("space")
get_pipeline.get("marine")

res = get_pipeline.exec()
assert res == ["4", "2", None]

def test_context_manager_raise(redis: Redis):
"""
Check that exceptions in context aren't silently ignored

This can happen if we return something in __exit__ method
"""
with pytest.raises(Exception):
with redis.pipeline() as pipeline:
pipeline.incr("rocket")
raise Exception("test")
132 changes: 125 additions & 7 deletions upstash_redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from aiohttp import ClientSession

from upstash_redis.commands import AsyncCommands
from upstash_redis.format import FORMATTERS
from upstash_redis.format import cast_response
from upstash_redis.http import async_execute, make_headers
from upstash_redis.typing import RESTResultT

Expand Down Expand Up @@ -126,14 +126,132 @@ async def execute(self, command: List) -> RESTResultT:
command=command,
)

main_command = command[0]
if len(command) > 1 and main_command == "SCRIPT":
main_command = f"{main_command} {command[1]}"
return cast_response(command, res)

if main_command in FORMATTERS:
return FORMATTERS[main_command](res, command)
def pipeline(self):
CahidArda marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a pipeline to send commands in batches
"""
return AsyncPipeline(
url=self._url,
token=self._token,
rest_encoding=self._rest_encoding,
rest_retries=self._rest_retries,
rest_retry_interval=self._rest_retry_interval,
allow_telemetry=self._allow_telemetry,
multi_exec="pipeline"
)

def multi(self):
CahidArda marked this conversation as resolved.
Show resolved Hide resolved
"""
Create a pipeline to send commands in batches as a transaction
"""
return AsyncPipeline(
url=self._url,
token=self._token,
rest_encoding=self._rest_encoding,
rest_retries=self._rest_retries,
rest_retry_interval=self._rest_retry_interval,
allow_telemetry=self._allow_telemetry,
multi_exec="multi-exec"
)


class AsyncPipeline(Redis):

def __init__(
self,
url: str,
token: str,
rest_encoding: Optional[Literal["base64"]] = "base64",
rest_retries: int = 1,
rest_retry_interval: float = 3, # Seconds.
allow_telemetry: bool = True,
multi_exec: Literal["multi-exec", "pipeline"] = "pipeline"
):
"""
Creates a new blocking Redis client.

:param url: UPSTASH_REDIS_REST_URL in the console
:param token: UPSTASH_REDIS_REST_TOKEN in the console
:param rest_encoding: the encoding that can be used by the REST API to parse the response before sending it
:param rest_retries: how many times an HTTP request will be retried if it fails
:param rest_retry_interval: how many seconds will be waited between each retry
:param allow_telemetry: whether anonymous telemetry can be collected
:param miltiexec: Whether multi execution (transaction) or pipelining is to be used
"""
super().__init__(
url=url,
token=token,
rest_encoding=rest_encoding,
rest_retries=rest_retries,
rest_retry_interval=rest_retry_interval,
allow_telemetry=allow_telemetry,
)

self._command_stack: List[List[str]] = []
self._multi_exec = multi_exec

def execute(self, command: List) -> None: # type: ignore[override]
mdumandag marked this conversation as resolved.
Show resolved Hide resolved
"""
Adds commnd to the command stack which will be sent as a batch
later

return res
:param command: Command to execute
"""
self._command_stack.append(command)

async def exec(self) -> List[RESTResultT]:
"""
Executes the commands in the pipeline by sending them as a batch
"""
url = f"{self._url}/{self._multi_exec}"

context_manager = self._context_manager
if not context_manager:
context_manager = _SessionContextManager(
ClientSession(), close_session=True
)

async with context_manager:
res: List[RESTResultT] = await async_execute( # type: ignore[assignment]
session=context_manager.session,
url=url,
headers=self._headers,
encoding=self._rest_encoding,
retries=self._rest_retries,
retry_interval=self._rest_retry_interval,
command=self._command_stack,
from_pipeline=True
)

response = [
cast_response(command, response)
for command, response in zip(self._command_stack, res)
]
self._command_stack = []
return response

def pipeline(self):
raise NotImplementedError("A pipeline can not be created from a pipeline!")

def multi(self):
raise NotImplementedError("A pipeline can not be created from a pipeline!")

async def __aenter__(self) -> "Redis":
self._context_manager = _SessionContextManager(
ClientSession(), close_session=False
)
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Any,
) -> None:
await self.exec()
mdumandag marked this conversation as resolved.
Show resolved Hide resolved
await self.close()


class _SessionContextManager:
Expand Down
Loading
Loading