Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make use of new "async" RPC #36

Merged
merged 7 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@
"ext-json": "*",
"psr/simple-cache": "2 - 3",
"roadrunner-php/roadrunner-api-dto": "^1.0",
"spiral/roadrunner": "^2023.1",
L3tum marked this conversation as resolved.
Show resolved Hide resolved
"spiral/goridge": "^4.2",
"spiral/roadrunner": "^2023.1 || ^2024.1",
"spiral/goridge": "^4.0",
"google/protobuf": "^3.7"
},
"autoload": {
Expand Down
146 changes: 146 additions & 0 deletions src/AsyncCache.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php

namespace Spiral\RoadRunner\KeyValue;

use DateInterval;
use RoadRunner\KV\DTO\V1\Response;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\KeyValue\Exception\KeyValueException;
use Spiral\RoadRunner\KeyValue\Exception\StorageException;
use Spiral\RoadRunner\KeyValue\Serializer\DefaultSerializer;
use Spiral\RoadRunner\KeyValue\Serializer\SerializerInterface;
use function sprintf;
use function str_contains;
use function str_replace;

/**
* @psalm-suppress PropertyNotSetInConstructor
*/
class AsyncCache extends Cache implements AsyncStorageInterface
{
/**
* @var positive-int[]
*/
protected array $callsInFlight = [];

/**
* @param AsyncRPCInterface $rpc
* @param non-empty-string $name
*/
public function __construct(
RPCInterface $rpc,
string $name,
SerializerInterface $serializer = new DefaultSerializer()
) {
parent::__construct($rpc, $name, $serializer);

// This should result in things like the Symfony ContainerBuilder throwing during build instead of runtime.
assert($this->rpc instanceof AsyncRPCInterface);
}

/**
* Note: The current PSR-16 implementation always returns true or
* exception on error.
*
* {@inheritDoc}
*
* @throws KeyValueException
* @throws RPCException
*/
public function deleteAsync(string $key): bool
{
return $this->deleteMultipleAsync([$key]);
}

/**
* Note: The current PSR-16 implementation always returns true or
* exception on error.
*
* {@inheritDoc}
*
* @psalm-param iterable<string> $keys
*
* @throws KeyValueException
* @throws RPCException
*/
public function deleteMultipleAsync(iterable $keys): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

// Handle someone never calling commitAsync()
if (count($this->callsInFlight) > 1000) {
$this->commitAsync();
}

$this->callsInFlight[] = $this->rpc->callAsync('kv.Delete', $this->requestKeys($keys));

return true;
}

/**
* {@inheritDoc}
*
* @psalm-param positive-int|\DateInterval|null $ttl
* @psalm-suppress MoreSpecificImplementedParamType
* @throws KeyValueException
* @throws RPCException
*/
public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl = null): bool
{
return $this->setMultipleAsync([$key => $value], $ttl);
}

/**
* {@inheritDoc}
*
* @psalm-param iterable<string, mixed> $values
* @psalm-param positive-int|\DateInterval|null $ttl
* @psalm-suppress MoreSpecificImplementedParamType
* @throws KeyValueException
* @throws RPCException
*/
public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl = null): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

// Handle someone never calling commitAsync()
if (count($this->callsInFlight) > 1000) {
$this->commitAsync();
}

$this->callsInFlight[] = $this->rpc->callAsync(
'kv.Set',
$this->requestValues($values, $this->ttlToRfc3339String($ttl))
);

return true;
}

/**
* @throws KeyValueException
* @throws RPCException
*/
public function commitAsync(): bool
{
assert($this->rpc instanceof AsyncRPCInterface);

try {
$this->rpc->getResponses($this->callsInFlight, Response::class);
} catch (ServiceException $e) {
$message = str_replace(["\t", "\n"], ' ', $e->getMessage());

if (str_contains($message, 'no such storage')) {
throw new StorageException(sprintf(self::ERROR_INVALID_STORAGE, $this->name));
}

throw new KeyValueException($message, $e->getCode(), $e);
} finally {
$this->callsInFlight = [];
}

return true;
}
}
73 changes: 73 additions & 0 deletions src/AsyncStorageInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

namespace Spiral\RoadRunner\KeyValue;

use DateInterval;
use Spiral\RoadRunner\KeyValue\Exception\KeyValueException;

interface AsyncStorageInterface extends StorageInterface
{
/**
* Needs to be called to make sure all async calls have completed successfully.
*
* @throws KeyValueException
*/
public function commitAsync(): bool;

/**
* Persists a set of key => value pairs in the cache, with an optional TTL.
*
* @param iterable $values A list of key => value pairs for a multiple-set operation.
* @param null|int|\DateInterval $ttl Optional. The TTL value of this item. If no value is sent and
* the driver supports TTL then the library may set a default value
* for it or let the driver take care of that.
*
* @return bool True on success and false on failure.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if $values is neither an array nor a Traversable,
* or if any of the $values are not a legal value.
*/
public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl = null): bool;

/**
* Persists data in the cache, uniquely referenced by a key with an optional expiration TTL time.
*
* @param string $key The key of the item to store.
* @param mixed $value The value of the item to store, must be serializable.
* @param null|int|\DateInterval $ttl Optional. The TTL value of this item. If no value is sent and
* the driver supports TTL then the library may set a default value
* for it or let the driver take care of that.
*
* @return bool True on success and false on failure.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if the $key string is not a legal value.
*/
public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl = null): bool;

/**
* Delete an item from the cache by its unique key.
*
* @param string $key The unique cache key of the item to delete.
*
* @return bool True if the item was successfully removed. False if there was an error.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if the $key string is not a legal value.
*/
public function deleteAsync(string $key): bool;

/**
* Deletes multiple cache items in a single operation.
*
* @param iterable<string> $keys A list of string-based keys to be deleted.
*
* @return bool True if the items were successfully removed. False if there was an error.
*
* @throws \Psr\SimpleCache\InvalidArgumentException
* MUST be thrown if $keys is neither an array nor a Traversable,
* or if any of the $keys are not a legal value.
*/
public function deleteMultipleAsync(iterable $keys): bool;
}
18 changes: 10 additions & 8 deletions src/Cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Cache implements StorageInterface
{
use SerializerAwareTrait;

private const ERROR_INVALID_STORAGE =
protected const ERROR_INVALID_STORAGE =
'Storage "%s" has not been defined. Please make sure your '.
'RoadRunner "kv" configuration contains a storage key named "%1$s"';

Expand All @@ -47,9 +47,9 @@ class Cache implements StorageInterface
* @param non-empty-string $name
*/
public function __construct(
RPCInterface $rpc,
private readonly string $name,
SerializerInterface $serializer = new DefaultSerializer()
RPCInterface $rpc,
protected readonly string $name,
SerializerInterface $serializer = new DefaultSerializer()
) {
$this->rpc = $rpc->withCodec(new ProtobufCodec());
$this->zone = new \DateTimeZone('UTC');
Expand Down Expand Up @@ -108,7 +108,7 @@ public function getMultipleTtl(iterable $keys = []): iterable
/**
* @return array<string, Item>
*/
private function createIndex(Response $response): array
protected function createIndex(Response $response): array
{
$result = [];

Expand Down Expand Up @@ -146,7 +146,7 @@ private function call(string $method, Request $request): Response
* @param iterable<string> $keys
* @throws InvalidArgumentException
*/
private function requestKeys(iterable $keys): Request
protected function requestKeys(iterable $keys): Request
{
$items = [];

Expand Down Expand Up @@ -257,8 +257,9 @@ public function setMultiple(iterable $values, null|int|\DateInterval $ttl = null
/**
* @param iterable<string, mixed> $values
* @throws SerializationException
* @throws InvalidArgumentException
*/
private function requestValues(iterable $values, string $ttl): Request
protected function requestValues(iterable $values, string $ttl): Request
{
$items = [];
$serializer = $this->getSerializer();
Expand All @@ -279,8 +280,9 @@ private function requestValues(iterable $values, string $ttl): Request

/**
* @throws InvalidArgumentException
* @throws \Exception
*/
private function ttlToRfc3339String(null|int|\DateInterval $ttl): string
protected function ttlToRfc3339String(null|int|\DateInterval $ttl): string
{
if ($ttl === null) {
return '';
Expand Down
9 changes: 7 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@

namespace Spiral\RoadRunner\KeyValue;

use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\KeyValue\Serializer\DefaultSerializer;
use Spiral\RoadRunner\KeyValue\Serializer\SerializerAwareTrait;
use Spiral\RoadRunner\KeyValue\Serializer\SerializerInterface;
use Spiral\RoadRunner\KeyValue\Serializer\DefaultSerializer;

/**
* @psalm-suppress PropertyNotSetInConstructor
Expand All @@ -25,6 +26,10 @@ public function __construct(

public function select(string $name): StorageInterface
{
return new Cache($this->rpc, $name, $this->getSerializer());
if ($this->rpc instanceof AsyncRPCInterface) {
return new AsyncCache($this->rpc, $name, $this->getSerializer());
} else {
return new Cache($this->rpc, $name, $this->getSerializer());
}
}
}
Loading
Loading