Skip to content

Commit

Permalink
feat: Make use of new "async" RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
L3tum committed Feb 5, 2024
1 parent fca0221 commit 3100e53
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 1 deletion.
27 changes: 27 additions & 0 deletions src/AsyncStorageInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?php

namespace Spiral\RoadRunner\KeyValue;

use DateInterval;
use Spiral\RoadRunner\KeyValue\Exception\KeyValueException;

interface AsyncStorageInterface extends StorageInterface
{
/**
* @throws KeyValueException
*/
public function commitAsync(): bool;

/**
* @psalm-param iterable<string, mixed> $values
* @psalm-param positive-int|DateInterval|null $ttl
* @throws KeyValueException
*/
public function setMultipleAsync(iterable $values, null|int|DateInterval $ttl = null): bool;

/**
* @psalm-param positive-int|DateInterval|null $ttl
* @throws KeyValueException
*/
public function setAsync(string $key, mixed $value, null|int|DateInterval $ttl = null): bool;
}
50 changes: 49 additions & 1 deletion src/Cache.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use RoadRunner\KV\DTO\V1\Item;
use RoadRunner\KV\DTO\V1\Request;
use RoadRunner\KV\DTO\V1\Response;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Codec\ProtobufCodec;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\Goridge\RPC\RPCInterface;
Expand All @@ -22,7 +23,7 @@
/**
* @psalm-suppress PropertyNotSetInConstructor
*/
class Cache implements StorageInterface
class Cache implements AsyncStorageInterface
{
use SerializerAwareTrait;

Expand All @@ -43,6 +44,8 @@ class Cache implements StorageInterface
protected readonly RPCInterface $rpc;
protected readonly \DateTimeZone $zone;

protected array $itemsInFlight = [];

/**
* @param non-empty-string $name
*/
Expand Down Expand Up @@ -230,6 +233,11 @@ public function set(string $key, mixed $value, null|int|\DateInterval $ttl = nul
return $this->setMultiple([$key => $value], $ttl);
}

public function setAsync(string $key, mixed $value, null|int|\DateInterval $ttl = null): bool

Check warning on line 236 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L236

Added line #L236 was not covered by tests
{
return $this->setMultipleAsync([$key => $value], $ttl);

Check warning on line 238 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L238

Added line #L238 was not covered by tests
}

/**
* @param mixed|string $key
* @throws InvalidArgumentException
Expand All @@ -254,6 +262,20 @@ public function setMultiple(iterable $values, null|int|\DateInterval $ttl = null
return true;
}

public function setMultipleAsync(iterable $values, null|int|\DateInterval $ttl = null): bool

Check warning on line 265 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L265

Added line #L265 was not covered by tests
{
if ($this->rpc instanceof AsyncRPCInterface) {
$this->itemsInFlight[] = $this->rpc->callAsync(
'kv.Set',
$this->requestValues($values, $this->ttlToRfc3339String($ttl))

Check warning on line 270 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L267-L270

Added lines #L267 - L270 were not covered by tests
);
} else {
$this->setMultiple($values, $ttl);

Check warning on line 273 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L273

Added line #L273 was not covered by tests
}

return true;

Check warning on line 276 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L276

Added line #L276 was not covered by tests
}

/**
* @param iterable<string, mixed> $values
* @throws SerializationException
Expand Down Expand Up @@ -376,4 +398,30 @@ public function has(string $key): bool

return false;
}

/**
* @throws KeyValueException
*/
public function commitAsync(): bool

Check warning on line 405 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L405

Added line #L405 was not covered by tests
{
if ($this->rpc instanceof AsyncRPCInterface) {

Check warning on line 407 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L407

Added line #L407 was not covered by tests
try {
foreach ($this->itemsInFlight as $seq) {
$this->rpc->getResponse($seq, Response::class);

Check warning on line 410 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L409-L410

Added lines #L409 - L410 were not covered by tests
}
} catch (ServiceException $e) {
$message = \str_replace(["\t", "\n"], ' ', $e->getMessage());

Check warning on line 413 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L412-L413

Added lines #L412 - L413 were not covered by tests

if (\str_contains($message, 'no such storage')) {
throw new StorageException(\sprintf(self::ERROR_INVALID_STORAGE, $this->name));

Check warning on line 416 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L415-L416

Added lines #L415 - L416 were not covered by tests
}

throw new KeyValueException($message, $e->getCode(), $e);

Check warning on line 419 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L419

Added line #L419 was not covered by tests
} finally {
$this->itemsInFlight = [];

Check warning on line 421 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L421

Added line #L421 was not covered by tests
}
}

return true;

Check warning on line 425 in src/Cache.php

View check run for this annotation

Codecov / codecov/patch

src/Cache.php#L425

Added line #L425 was not covered by tests
}
}

0 comments on commit 3100e53

Please sign in to comment.