Skip to content

Commit

Permalink
Merge pull request #503 from php-enqueue/redis-subscription-consumer
Browse files Browse the repository at this point in the history
Redis subscription consumer
  • Loading branch information
makasim authored Aug 14, 2018
2 parents 764ec22 + 2cd2524 commit 6937b2a
Show file tree
Hide file tree
Showing 27 changed files with 844 additions and 75 deletions.
30 changes: 20 additions & 10 deletions pkg/redis/PRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ public function __construct(array $config)
/**
* {@inheritdoc}
*/
public function lpush($key, $value)
public function lpush(string $key, string $value): int
{
try {
$this->redis->lpush($key, [$value]);
return $this->redis->lpush($key, [$value]);
} catch (PRedisServerException $e) {
throw new ServerException('lpush command has failed', null, $e);
}
Expand All @@ -51,12 +51,14 @@ public function lpush($key, $value)
/**
* {@inheritdoc}
*/
public function brpop($key, $timeout)
public function brpop(array $keys, int $timeout): ?RedisResult
{
try {
if ($result = $this->redis->brpop([$key], $timeout)) {
return $result[1];
if ($result = $this->redis->brpop($keys, $timeout)) {
return new RedisResult($result[0], $result[1]);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('brpop command has failed', null, $e);
}
Expand All @@ -65,10 +67,14 @@ public function brpop($key, $timeout)
/**
* {@inheritdoc}
*/
public function rpop($key)
public function rpop(string $key): ?RedisResult
{
try {
return $this->redis->rpop($key);
if ($message = $this->redis->rpop($key)) {
return new RedisResult($key, $message);
}

return null;
} catch (PRedisServerException $e) {
throw new ServerException('rpop command has failed', null, $e);
}
Expand All @@ -77,8 +83,12 @@ public function rpop($key)
/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
if ($this->redis) {
return;
}

$this->redis = new Client($this->config, ['exceptions' => true]);

if ($this->config['pass']) {
Expand All @@ -91,15 +101,15 @@ public function connect()
/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
$this->redis->disconnect();
}

/**
* {@inheritdoc}
*/
public function del($key)
public function del(string $key): void
{
$this->redis->del([$key]);
}
Expand Down
80 changes: 42 additions & 38 deletions pkg/redis/PhpRedis.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __construct(array $config)
'port' => null,
'pass' => null,
'user' => null,
'timeout' => null,
'timeout' => .0,
'reserved' => null,
'retry_interval' => null,
'persisted' => false,
Expand All @@ -35,69 +35,73 @@ public function __construct(array $config)
/**
* {@inheritdoc}
*/
public function lpush($key, $value)
public function lpush(string $key, string $value): int
{
if (false == $this->redis->lPush($key, $value)) {
throw new ServerException($this->redis->getLastError());
}
return $this->redis->lPush($key, $value);
}

/**
* {@inheritdoc}
*/
public function brpop($key, $timeout)
public function brpop(array $keys, int $timeout): ?RedisResult
{
if ($result = $this->redis->brPop([$key], $timeout)) {
return $result[1];
if ($result = $this->redis->brPop($keys, $timeout)) {
return new RedisResult($result[0], $result[1]);
}

return null;
}

/**
* {@inheritdoc}
*/
public function rpop($key)
public function rpop(string $key): ?RedisResult
{
return $this->redis->rPop($key);
if ($message = $this->redis->rPop($key)) {
return new RedisResult($key, $message);
}

return null;
}

/**
* {@inheritdoc}
*/
public function connect()
public function connect(): void
{
if (false == $this->redis) {
$this->redis = new \Redis();

if ($this->config['persisted']) {
$this->redis->pconnect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
} else {
$this->redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout'],
$this->config['reserved'],
$this->config['retry_interval']
);
}

if ($this->config['pass']) {
$this->redis->auth($this->config['pass']);
}

$this->redis->select($this->config['database']);
if ($this->redis) {
return;
}

$this->redis = new \Redis();

if ($this->config['persisted']) {
$this->redis->pconnect(
$this->config['host'],
$this->config['port'],
$this->config['timeout']
);
} else {
$this->redis->connect(
$this->config['host'],
$this->config['port'],
$this->config['timeout'],
$this->config['reserved'],
$this->config['retry_interval']
);
}

if ($this->config['pass']) {
$this->redis->auth($this->config['pass']);
}

return $this->redis;
$this->redis->select($this->config['database']);
}

/**
* {@inheritdoc}
*/
public function disconnect()
public function disconnect(): void
{
if ($this->redis) {
$this->redis->close();
Expand All @@ -107,7 +111,7 @@ public function disconnect()
/**
* {@inheritdoc}
*/
public function del($key)
public function del(string $key): void
{
$this->redis->del($key);
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/redis/Redis.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
<?php

declare(strict_types=1);

namespace Enqueue\Redis;

interface Redis
Expand All @@ -10,29 +12,29 @@ interface Redis
*
* @return int length of the list
*/
public function lpush($key, $value);
public function lpush(string $key, string $value): int;

/**
* @param string $key
* @param int $timeout in seconds
* @param string[] $keys
* @param int $timeout in seconds
*
* @return string|null
* @return RedisResult|null
*/
public function brpop($key, $timeout);
public function brpop(array $keys, int $timeout): ?RedisResult;

/**
* @param string $key
*
* @return string|null
* @return RedisResult|null
*/
public function rpop($key);
public function rpop(string $key): ?RedisResult;

public function connect();
public function connect(): void;

public function disconnect();
public function disconnect(): void;

/**
* @param string $key
*/
public function del($key);
public function del(string $key): void;
}
6 changes: 5 additions & 1 deletion pkg/redis/RedisConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ private function parseDsn($dsn)
throw new \LogicException(sprintf('Failed to parse DSN "%s"', $dsn));
}

if (array_key_exists('port', $config)) {
$config['port'] = (int) $config['port'];
}

if ($query = parse_url($dsn, PHP_URL_QUERY)) {
$queryConfig = [];
parse_str($query, $queryConfig);
Expand All @@ -159,7 +163,7 @@ private function defaultConfig()
return [
'host' => 'localhost',
'port' => 6379,
'timeout' => null,
'timeout' => .0,
'reserved' => null,
'retry_interval' => null,
'vendor' => 'phpredis',
Expand Down
18 changes: 9 additions & 9 deletions pkg/redis/RedisConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public function receive($timeout = 0)
{
$timeout = (int) ($timeout / 1000);
if (empty($timeout)) {
// Caused by
// Predis\Response\ServerException: ERR timeout is not an integer or out of range
// /mqdev/vendor/predis/predis/src/Client.php:370

return $this->receiveNoWait();
while (true) {
if ($message = $this->receive(5000)) {
return $message;
}
}
}

if ($message = $this->getRedis()->brpop($this->queue->getName(), $timeout)) {
return RedisMessage::jsonUnserialize($message);
if ($result = $this->getRedis()->brpop([$this->queue->getName()], $timeout)) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
}

Expand All @@ -66,8 +66,8 @@ public function receive($timeout = 0)
*/
public function receiveNoWait()
{
if ($message = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($message);
if ($result = $this->getRedis()->rpop($this->queue->getName())) {
return RedisMessage::jsonUnserialize($result->getMessage());
}
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/redis/RedisContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
use Interop\Queue\PsrContext;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrSubscriptionConsumerAwareContext;
use Interop\Queue\PsrTopic;

class RedisContext implements PsrContext
class RedisContext implements PsrContext, PsrSubscriptionConsumerAwareContext
{
/**
* @var Redis
Expand Down Expand Up @@ -122,6 +123,16 @@ public function createConsumer(PsrDestination $destination)
return new RedisConsumer($this, $destination);
}

/**
* {@inheritdoc}
*
* @return RedisSubscriptionConsumer
*/
public function createSubscriptionConsumer()
{
return new RedisSubscriptionConsumer($this);
}

public function close()
{
$this->getRedis()->disconnect();
Expand Down
32 changes: 32 additions & 0 deletions pkg/redis/RedisResult.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Enqueue\Redis;

class RedisResult
{
/**
* @var string
*/
private $key;

/**
* @var string
*/
private $message;

public function __construct(string $key, string $message)
{
$this->key = $key;
$this->message = $message;
}

public function getKey(): string
{
return $this->key;
}

public function getMessage(): string
{
return $this->message;
}
}
Loading

0 comments on commit 6937b2a

Please sign in to comment.