Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Import some changes from redis-py #70

Merged
merged 14 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,4 @@ valkeymodules
virtualenv
www
md
yaml
1 change: 0 additions & 1 deletion dev_requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pytest-asyncio
pytest-cov
pytest-timeout
ujson>=4.2.0
urllib3<2
uvloop
vulture>=2.3.0
wheel>=0.30.0
1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@
# further. For a list of options available for each theme, see the
# documentation.
html_theme_options = {
"display_version": True,
"footer_icons": [
{
"name": "GitHub",
Expand Down
6 changes: 3 additions & 3 deletions docs/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ClusterNode
Async Client
************

See complete example: `here <examples/asyncio_examples.html>`_
See complete example: `here <examples/asyncio_examples.html>`__

This client is used for communicating with Valkey, asynchronously.

Expand Down Expand Up @@ -88,7 +88,7 @@ ClusterPipeline (Async)
Connection
**********

See complete example: `here <examples/connection_examples.html>`_
See complete example: `here <examples/connection_examples.html>`__

Connection
==========
Expand All @@ -104,7 +104,7 @@ Connection (Async)
Connection Pools
****************

See complete example: `here <examples/connection_examples.html>`_
See complete example: `here <examples/connection_examples.html>`__

ConnectionPool
==============
Expand Down
8 changes: 4 additions & 4 deletions docs/opentelemetry.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Integrating OpenTelemetry
What is OpenTelemetry?
----------------------

`OpenTelemetry <https://opentelemetry.io>`_ is an open-source observability framework for traces, metrics, and logs. It is a merger of OpenCensus and OpenTracing projects hosted by Cloud Native Computing Foundation.
`OpenTelemetry <https://opentelemetry.io>`__ is an open-source observability framework for traces, metrics, and logs. It is a merger of OpenCensus and OpenTracing projects hosted by Cloud Native Computing Foundation.

OpenTelemetry allows developers to collect and export telemetry data in a vendor agnostic way. With OpenTelemetry, you can instrument your application once and then add or change vendors without changing the instrumentation, for example, here is a list of `popular DataDog competitors <https://uptrace.dev/get/compare/datadog-competitors.html>`_ that support OpenTelemetry.

Expand Down Expand Up @@ -61,7 +61,7 @@ Once the code is patched, you can use valkey-py as usually:
OpenTelemetry API
-----------------

`OpenTelemetry <https://uptrace.dev/opentelemetry/>`_ API is a programming interface that you can use to instrument code and collect telemetry data such as traces, metrics, and logs.
`OpenTelemetry API <https://uptrace.dev/opentelemetry/>`__ is a programming interface that you can use to instrument code and collect telemetry data such as traces, metrics, and logs.

You can use OpenTelemetry API to measure important operations:

Expand Down Expand Up @@ -125,7 +125,7 @@ Alerting and notifications

Uptrace also allows you to monitor `OpenTelemetry metrics <https://uptrace.dev/opentelemetry/metrics.html>`_ using alerting rules. For example, the following monitor uses the group by node expression to create an alert whenever an individual Valkey shard is down:

.. code-block:: python
.. code-block:: yaml

monitors:
- name: Valkey shard is down
Expand All @@ -142,7 +142,7 @@ Uptrace also allows you to monitor `OpenTelemetry metrics <https://uptrace.dev/o

You can also create queries with more complex expressions. For example, the following rule creates an alert when the keyspace hit rate is lower than 75%:

.. code-block:: python
.. code-block:: yaml

monitors:
- name: Valkey read hit rate < 75%
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
async-timeout>=4.0.2
async-timeout>=4.0.3
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,6 @@
],
extras_require={
"libvalkey": ["libvalkey>=4.0.0b1"],
"ocsp": ["cryptography>=36.0.1", "pyopenssl==20.0.1", "requests>=2.26.0"],
"ocsp": ["cryptography>=36.0.1", "pyopenssl==23.2.1", "requests>=2.31.0"],
},
)
4 changes: 3 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ def skip_ifmodversion_lt(min_version: str, module_name: str):
for j in modules:
if module_name == j.get("name"):
version = j.get("ver")
mv = int(min_version.replace(".", ""))
mv = int(
"".join(["%02d" % int(segment) for segment in min_version.split(".")])
)
check = version < mv
return pytest.mark.skipif(check, reason="Valkey module version")

Expand Down
56 changes: 55 additions & 1 deletion tests/test_asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
parse_url,
)
from valkey.asyncio import ConnectionPool, Valkey
from valkey.asyncio.connection import Connection, UnixDomainSocketConnection
from valkey.asyncio.connection import (
Connection,
SSLConnection,
UnixDomainSocketConnection,
)
from valkey.asyncio.retry import Retry
from valkey.backoff import NoBackoff
from valkey.exceptions import ConnectionError, InvalidResponse, TimeoutError
Expand Down Expand Up @@ -494,3 +498,53 @@ async def test_connection_garbage_collection(request):

await client.aclose()
await pool.aclose()


@pytest.mark.parametrize(
"conn, error, expected_message",
[
(SSLConnection(), OSError(), "Error connecting to localhost:6379."),
(SSLConnection(), OSError(12), "Error 12 connecting to localhost:6379."),
(
SSLConnection(),
OSError(12, "Some Error"),
"Error 12 connecting to localhost:6379. Some Error.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(),
"Error connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12),
"Error 12 connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12, "Some Error"),
"Error 12 connecting to unix:///tmp/valkey.sock. Some Error.",
),
],
)
async def test_format_error_message(conn, error, expected_message):
"""Test that the _error_message function formats errors correctly"""
error_message = conn._error_message(error)
assert error_message == expected_message


async def test_network_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(host="127.0.0.1", port=9999)
await valkey.set("a", "b")
assert str(e.value).startswith("Error 111 connecting to 127.0.0.1:9999. Connect")


async def test_unix_socket_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(unix_socket_path="unix:///tmp/a.sock")
await valkey.set("a", "b")
assert (
str(e.value)
== "Error 2 connecting to unix:///tmp/a.sock. No such file or directory."
)
6 changes: 3 additions & 3 deletions tests/test_asyncio/test_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ async def test_blocking(self, r):
lock_2 = self.get_lock(r, "foo")
assert lock_2.blocking

async def test_blocking_timeout(self, r, event_loop):
async def test_blocking_timeout(self, r):
lock1 = self.get_lock(r, "foo")
assert await lock1.acquire(blocking=False)
bt = 0.2
sleep = 0.05
lock2 = self.get_lock(r, "foo", sleep=sleep, blocking_timeout=bt)
start = event_loop.time()
start = asyncio.get_running_loop().time()
assert not await lock2.acquire()
# The elapsed duration should be less than the total blocking_timeout
assert bt >= (event_loop.time() - start) > bt - sleep
assert bt >= (asyncio.get_running_loop().time() - start) > bt - sleep
await lock1.release()

async def test_context_manager(self, r):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ def test_tcp_ssl_tls12_custom_ciphers(tcp_address, ssl_ciphers):
)


"""
Addresses bug CAE-333 which uncovered that the init method of the base
class did override the initialization of the socket_timeout parameter.
"""


def test_unix_socket_with_timeout():
conn = UnixDomainSocketConnection(socket_timeout=1000)

# Check if the base class defaults were taken over.
assert conn.db == 0

# Verify if the timeout and the path is set correctly.
assert conn.socket_timeout == 1000
assert conn.path == ""


@pytest.mark.ssl
@pytest.mark.skipif(not ssl.HAS_TLSv1_3, reason="requires TLSv1.3")
def test_tcp_ssl_version_mismatch(tcp_address):
Expand Down
50 changes: 50 additions & 0 deletions tests/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,3 +291,53 @@ def mock_disconnect(_):

assert called == 1
pool.disconnect()


@pytest.mark.parametrize(
"conn, error, expected_message",
[
aiven-sal marked this conversation as resolved.
Show resolved Hide resolved
(SSLConnection(), OSError(), "Error connecting to localhost:6379."),
(SSLConnection(), OSError(12), "Error 12 connecting to localhost:6379."),
(
SSLConnection(),
OSError(12, "Some Error"),
"Error 12 connecting to localhost:6379. Some Error.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(),
"Error connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12),
"Error 12 connecting to unix:///tmp/valkey.sock.",
),
(
UnixDomainSocketConnection(path="unix:///tmp/valkey.sock"),
OSError(12, "Some Error"),
"Error 12 connecting to unix:///tmp/valkey.sock. Some Error.",
),
],
)
def test_format_error_message(conn, error, expected_message):
"""Test that the _error_message function formats errors correctly"""
error_message = conn._error_message(error)
assert error_message == expected_message


def test_network_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(port=9999)
valkey.set("a", "b")
assert str(e.value) == "Error 111 connecting to localhost:9999. Connection refused."


def test_unix_socket_connection_failure():
with pytest.raises(ConnectionError) as e:
valkey = Valkey(unix_socket_path="unix:///tmp/a.sock")
valkey.set("a", "b")
assert (
str(e.value)
== "Error 2 connecting to unix:///tmp/a.sock. No such file or directory."
)
4 changes: 2 additions & 2 deletions tests/test_retry.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest.mock import patch

import pytest
from valkey.backoff import ExponentialBackoff, NoBackoff
from valkey.backoff import AbstractBackoff, ExponentialBackoff, NoBackoff
from valkey.client import Valkey
from valkey.connection import Connection, UnixDomainSocketConnection
from valkey.exceptions import (
Expand All @@ -15,7 +15,7 @@
from .conftest import _get_client


class BackoffMock:
class BackoffMock(AbstractBackoff):
def __init__(self):
self.reset_calls = 0
self.calls = 0
Expand Down
40 changes: 19 additions & 21 deletions valkey/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1315,37 +1315,35 @@ async def initialize(self) -> None:
port = int(primary_node[1])
host, port = self.remap_host_port(host, port)

nodes_for_slot = []

target_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_node:
target_node = ClusterNode(
host, port, PRIMARY, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_node.name] = target_node
nodes_for_slot.append(target_node)

replica_nodes = slot[3:]
for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
host, port = self.remap_host_port(host, port)

target_replica_node = tmp_nodes_cache.get(get_node_name(host, port))
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = target_replica_node
nodes_for_slot.append(target_replica_node)

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = []
tmp_slots[i].append(target_node)
replica_nodes = [slot[j] for j in range(3, len(slot))]

for replica_node in replica_nodes:
host = replica_node[0]
port = replica_node[1]
host, port = self.remap_host_port(host, port)

target_replica_node = tmp_nodes_cache.get(
get_node_name(host, port)
)
if not target_replica_node:
target_replica_node = ClusterNode(
host, port, REPLICA, **self.connection_kwargs
)
tmp_slots[i].append(target_replica_node)
# add this node to the nodes cache
tmp_nodes_cache[target_replica_node.name] = (
target_replica_node
)
tmp_slots[i] = nodes_for_slot
else:
# Validate that 2 nodes want to use the same slot cache
# setup
Expand Down
Loading
Loading