Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis subscription consumer #503

Merged
merged 3 commits into from
Aug 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions pkg/redis/PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public function __construct(array $config)
/**
* {@inheritdoc}
*/
public function lpush($key, $value)
public function lpush(string $key, string $value): int
{
try {
$this->redis->lpush($key, [$value]);
return $this->redis->lpush($key, [$value]);
} catch (PRedisServerException $e) {
throw new ServerException('lpush command has failed', null, $e);
}
Expand All @@ -51,12 +51,14 @@ public function lpush($key, $value)
/**
* {@inheritdoc}
*/
public function brpop($key, $timeout)
public function brpop(array $keys, int $timeout): ?RedisResult
{
try {
if ($result = $this->redis->brpop([$key], $timeout)) {
return $result[1];
if ($result = $this->redis->brpop($keys, $timeout)) {
return new RedisResult($result[0], $result[1]);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('brpop command has failed', null, $e);
}
Expand All @@ -65,10 +67,14 @@ public function brpop($key, $timeout)
/**
* {@inheritdoc}
*/
public function rpop($key)
public function rpop(string $key): ?RedisResult
{
try {
return $this->redis->rpop($key);
if ($message = $this->redis->rpop($key)) {
return new RedisResult($key, $message);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('rpop command has failed', null, $e);
}
Expand All @@ -77,8 +83,12 @@ public function rpop($key)
/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
if ($this->redis) {
return;
}

$this->redis = new Client($this->config, ['exceptions' => true]);

if ($this->config['pass']) {
Expand All @@ -91,15 +101,15 @@ public function connect()
/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
$this->redis->disconnect();
}

/**
* {@inheritdoc}
*/
public function del($key)
public function del(string $key): void
{
$this->redis->del([$key]);
}
Expand Down
80 changes: 42 additions & 38 deletions pkg/redis/PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __construct(array $config)
'port' => null,
'pass' => null,
'user' => null,
'timeout' => null,
'timeout' => .0,
'reserved' => null,
'retry_interval' => null,
'persisted' => false,
Expand All @@ -35,69 +35,73 @@ public function __construct(array $config)
/**
* {@inheritdoc}
*/
public function lpush($key, $value)
public function lpush(string $key, string $value): int
{
if (false == $this->redis->lPush($key, $value)) {
throw new ServerException($this->redis->getLastError());
}
return $this->redis->lPush($key, $value);
}

/**
* {@inheritdoc}
*/
public function brpop($key, $timeout)
public function brpop(array $keys, int $timeout): ?RedisResult
{
if ($result = $this->redis->brPop([$key], $timeout)) {
return $result[1];
if ($result = $this->redis->brPop($keys, $timeout)) {
return new RedisResult($result[0], $result[1]);
}

return null;
}

/**
* {@inheritdoc}
*/
public function rpop($key)
public function rpop(string $key): ?RedisResult
{
return $this->redis->rPop($key);
if ($message = $this->redis->rPop($key)) {
return new RedisResult($key, $message);
}

return null;
}

/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
if (false == $this->redis) {
$this->redis = new \Redis();

if ($this->config['persisted']) {
$this->redis->pconnect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
} else {
$this->redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout'],
$this->config['reserved'],
$this->config['retry_interval']
);
}

if ($this->config['pass']) {
$this->redis->auth($this->config['pass']);
}

$this->redis->select($this->config['database']);
if ($this->redis) {
return;
}

$this->redis = new \Redis();

if ($this->config['persisted']) {
$this->redis->pconnect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
} else {
$this->redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout'],
$this->config['reserved'],
$this->config['retry_interval']
);
}

if ($this->config['pass']) {
$this->redis->auth($this->config['pass']);
}

return $this->redis;
$this->redis->select($this->config['database']);
}

/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
if ($this->redis) {
$this->redis->close();
Expand All @@ -107,7 +111,7 @@ public function disconnect()
/**
* {@inheritdoc}
*/
public function del($key)
public function del(string $key): void
{
$this->redis->del($key);
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/redis/Redis.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

interface Redis
Expand All @@ -10,29 +12,29 @@ interface Redis
*
* @return int length of the list
*/
public function lpush($key, $value);
public function lpush(string $key, string $value): int;

/**
* @param string $key
* @param int $timeout in seconds
* @param string[] $keys
* @param int $timeout in seconds
*
* @return string|null
* @return RedisResult|null
*/
public function brpop($key, $timeout);
public function brpop(array $keys, int $timeout): ?RedisResult;

/**
* @param string $key
*
* @return string|null
* @return RedisResult|null
*/
public function rpop($key);
public function rpop(string $key): ?RedisResult;

public function connect();
public function connect(): void;

public function disconnect();
public function disconnect(): void;

/**
* @param string $key
*/
public function del($key);
public function del(string $key): void;
}
6 changes: 5 additions & 1 deletion pkg/redis/RedisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ private function parseDsn($dsn)
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}

if (array_key_exists('port', $config)) {
$config['port'] = (int) $config['port'];
}

if ($query = parse_url($dsn, PHP_URL_QUERY)) {
$queryConfig = [];
parse_str($query, $queryConfig);
Expand All @@ -159,7 +163,7 @@ private function defaultConfig()
return [
'host' => 'localhost',
'port' => 6379,
'timeout' => null,
'timeout' => .0,
'reserved' => null,
'retry_interval' => null,
'vendor' => 'phpredis',
Expand Down
18 changes: 9 additions & 9 deletions pkg/redis/RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public function receive($timeout = 0)
{
$timeout = (int) ($timeout / 1000);
if (empty($timeout)) {
// Caused by
// Predis\Response\ServerException: ERR timeout is not an integer or out of range
// /mqdev/vendor/predis/predis/src/Client.php:370

return $this->receiveNoWait();
while (true) {
if ($message = $this->receive(5000)) {
return $message;
}
}
}

if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) {
return RedisMessage::jsonUnserialize($message);
if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
}

Expand All @@ -66,8 +66,8 @@ public function receive($timeout = 0)
*/
public function receiveNoWait()
{
if ($message = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($message);
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/redis/RedisContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
use Interop\Queue\PsrContext;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class RedisContext implements PsrContext
class RedisContext implements PsrContext, PsrSubscriptionConsumerAwareContext
{
/**
* @var Redis
Expand Down Expand Up @@ -122,6 +123,16 @@ public function createConsumer(PsrDestination $destination)
return new RedisConsumer($this, $destination);
}

/**
* {@inheritdoc}
*
* @return RedisSubscriptionConsumer
*/
public function createSubscriptionConsumer()
{
return new RedisSubscriptionConsumer($this);
}

public function close()
{
$this->getRedis()->disconnect();
Expand Down
32 changes: 32 additions & 0 deletions pkg/redis/RedisResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Enqueue\Redis;

class RedisResult
{
/**
* @var string
*/
private $key;

/**
* @var string
*/
private $message;

public function __construct(string $key, string $message)
{
$this->key = $key;
$this->message = $message;
}

public function getKey(): string
{
return $this->key;
}

public function getMessage(): string
{
return $this->message;
}
}
Loading