Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pipeline and transaction #47

Merged
merged 10 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,54 @@ and pass the command as a `list`.
redis.execute(command=["XLEN", "test_stream"])
```

### Pipelines & Transactions

If you want to submit commands in batches to reduce the number of roundtrips, you can utilize pipelining or
transactions. The difference between pipelines and transactions is that transactions are atomic: no other
command is executed during that transaction. In pipelines there is no such guarantee.

To use a pipeline, simply call the `pipeline` method:

```python
pipeline = redis.pipeline()

pipeline.set("foo", 1)
pipeline.incr("foo")
pipeline.get("foo")

result = pipeline.exec()

print(result)
# prints [True, 2, '2']
```

For transaction, use `mutli`:

```python
pipeline = redis.multi()

pipeline.set("foo", 1)
pipeline.incr("foo")
pipeline.get("foo")

result = pipeline.exec()

print(result)
# prints [True, 2, '2']
```

You can also chain the commands:

```python
pipeline = redis.pipeline()

pipeline.set("foo", 1).incr("foo").get("foo")
result = pipeline.exec()

print(result)
# prints [True, 2, '2']
```

# Encoding
Although Redis can store invalid JSON data, there might be problems with the deserialization.
To avoid this, the Upstash REST proxy is capable of encoding the data as base64 on the server and then sending it to the client to be
Expand Down
81 changes: 81 additions & 0 deletions tests/commands/asyncio/test_asyncio_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import pytest
import pytest_asyncio

from upstash_redis.asyncio import Redis


@pytest_asyncio.fixture(autouse=True)
async def flush_db(async_redis: Redis):
await async_redis.delete("rocket", "space", "marine")

@pytest.mark.asyncio
async def test_pipeline(async_redis: Redis):

pipeline = async_redis.pipeline()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

# can chain commands
pipeline.get("rocket").get("space").get("marine")

res = await pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

@pytest.mark.asyncio
async def test_multi(async_redis: Redis):

pipeline = async_redis.multi()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = await pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

@pytest.mark.asyncio
async def test_context_manager_usage(async_redis: Redis):

async with async_redis.pipeline() as pipeline:
pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

# redis still works after pipeline is done
result = await async_redis.get("rocket")
assert result == "4"

get_pipeline = async_redis.pipeline()
get_pipeline.get("rocket")
get_pipeline.get("space")
get_pipeline.get("marine")

res = await get_pipeline.exec()
assert res == ["4", "2", None]

@pytest.mark.asyncio
async def test_context_manager_raise(async_redis: Redis):
"""
Check that exceptions in context aren't silently ignored

This can happen if we return something in __exit__ method
"""
with pytest.raises(Exception):
async with async_redis.pipeline() as pipeline:
pipeline.incr("rocket")
raise Exception("test")
75 changes: 75 additions & 0 deletions tests/commands/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pytest

from upstash_redis import Redis


@pytest.fixture(autouse=True)
def flush_db(redis: Redis):
redis.delete("rocket", "space", "marine")

def test_pipeline(redis: Redis):

pipeline = redis.pipeline()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket").get("space").get("marine")

res = pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

def test_multi(redis: Redis):

pipeline = redis.multi()

pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

pipeline.get("rocket")
pipeline.get("space")
pipeline.get("marine")

res = pipeline.exec()
assert res == [1, 2, 1, 3, 2, 4, "4", "2", None]

def test_context_manager_usage(redis: Redis):

with redis.pipeline() as pipeline:
pipeline.incr("rocket")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")
pipeline.incr("space")
pipeline.incr("rocket")

# redis still works after pipeline is done
result = redis.get("rocket")
assert result == "4"

get_pipeline = redis.pipeline()
get_pipeline.get("rocket")
get_pipeline.get("space")
get_pipeline.get("marine")

res = get_pipeline.exec()
assert res == ["4", "2", None]

def test_context_manager_raise(redis: Redis):
"""
Check that exceptions in context aren't silently ignored

This can happen if we return something in __exit__ method
"""
with pytest.raises(Exception):
with redis.pipeline() as pipeline:
pipeline.incr("rocket")
raise Exception("test")
132 changes: 124 additions & 8 deletions upstash_redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from aiohttp import ClientSession

from upstash_redis.commands import AsyncCommands
from upstash_redis.format import FORMATTERS
from upstash_redis.commands import AsyncCommands, PipelineCommands
from upstash_redis.format import cast_response
from upstash_redis.http import async_execute, make_headers
from upstash_redis.typing import RESTResultT

Expand Down Expand Up @@ -126,14 +126,130 @@ async def execute(self, command: List) -> RESTResultT:
command=command,
)

main_command = command[0]
if len(command) > 1 and main_command == "SCRIPT":
main_command = f"{main_command} {command[1]}"
return cast_response(command, res)

if main_command in FORMATTERS:
return FORMATTERS[main_command](res, command)
def pipeline(self) -> "AsyncPipeline":
"""
Create a pipeline to send commands in batches
"""
return AsyncPipeline(
url=self._url,
token=self._token,
rest_encoding=self._rest_encoding,
rest_retries=self._rest_retries,
rest_retry_interval=self._rest_retry_interval,
allow_telemetry=self._allow_telemetry,
multi_exec="pipeline"
)

def multi(self) -> "AsyncPipeline":
"""
Create a pipeline to send commands in batches as a transaction
"""
return AsyncPipeline(
url=self._url,
token=self._token,
rest_encoding=self._rest_encoding,
rest_retries=self._rest_retries,
rest_retry_interval=self._rest_retry_interval,
allow_telemetry=self._allow_telemetry,
multi_exec="multi-exec"
)


class AsyncPipeline(PipelineCommands):

def __init__(
self,
url: str,
token: str,
rest_encoding: Optional[Literal["base64"]] = "base64",
rest_retries: int = 1,
rest_retry_interval: float = 3, # Seconds.
allow_telemetry: bool = True,
multi_exec: Literal["multi-exec", "pipeline"] = "pipeline"
):
"""
Creates a new blocking Redis client.

:param url: UPSTASH_REDIS_REST_URL in the console
:param token: UPSTASH_REDIS_REST_TOKEN in the console
:param rest_encoding: the encoding that can be used by the REST API to parse the response before sending it
:param rest_retries: how many times an HTTP request will be retried if it fails
:param rest_retry_interval: how many seconds will be waited between each retry
:param allow_telemetry: whether anonymous telemetry can be collected
:param miltiexec: Whether multi execution (transaction) or pipelining is to be used
"""

self._url = url
self._token = token

self._allow_telemetry = allow_telemetry

self._rest_encoding: Optional[Literal["base64"]] = rest_encoding
self._rest_retries = rest_retries
self._rest_retry_interval = rest_retry_interval

return res
self._headers = make_headers(token, rest_encoding, allow_telemetry)
self._context_manager: Optional[_SessionContextManager] = None

self._command_stack: List[List[str]] = []
self._multi_exec = multi_exec

def execute(self, command: List) -> "AsyncPipeline": # type: ignore[override]
"""
Adds commnd to the command stack which will be sent as a batch
later

:param command: Command to execute
"""
self._command_stack.append(command)
return self

async def exec(self) -> List[RESTResultT]:
"""
Executes the commands in the pipeline by sending them as a batch
"""
url = f"{self._url}/{self._multi_exec}"

context_manager = self._context_manager
if not context_manager:
context_manager = _SessionContextManager(
ClientSession(), close_session=True
)

async with context_manager:
res: List[RESTResultT] = await async_execute( # type: ignore[assignment]
session=context_manager.session,
url=url,
headers=self._headers,
encoding=self._rest_encoding,
retries=self._rest_retries,
retry_interval=self._rest_retry_interval,
command=self._command_stack,
from_pipeline=True
)

response = [
cast_response(command, response)
for command, response in zip(self._command_stack, res)
]
self._command_stack = []
return response

async def __aenter__(self) -> "AsyncPipeline":
self._context_manager = _SessionContextManager(
ClientSession(), close_session=False
)
return self

async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Any,
) -> None:
await self.exec()
mdumandag marked this conversation as resolved.
Show resolved Hide resolved


class _SessionContextManager:
Expand Down
Loading
Loading