Skip to content

Commit

Permalink
Merge pull request #6 from Teknasyon-Teknoloji/master
Browse files Browse the repository at this point in the history
AWS SQS Job Locker to Prevent Duplication
  • Loading branch information
javibravo authored Nov 15, 2017
2 parents 3e3508a + a668219 commit b12ccea
Show file tree
Hide file tree
Showing 14 changed files with 386 additions and 4 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ matrix:
- php: nightly

before_script:
- phpenv config-add php-extensions.ini
- composer install

script: ./vendor/bin/phpunit
35 changes: 35 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,39 @@ $myConsumer = new QueueWorker($myQueue, new MyJob(), 10, true);
$myConsumer->start();
```

**AWS SQS Job Locking to Prevent Duplication**

When using AWS SQS Standart Queue, sometimes workers can get duplicated messages even if MessageVisibilityTimeout given.
To prevent this duplication, you can give Redis or Memcached Locker to SqsQueue object. If you proived locker object and lock failed, job sent to error queue.
Locker provider does not remove/unlock job. If required, you should unlock manually. You can get job key with **getJobUniqId** method.

```php
<?php

use Aws\Sqs\SqsClient;
use Simpleue\Queue\SqsQueue;
use Simpleue\Locker\MemcachedLocker;
use Simpleue\Worker\QueueWorker;
use MyProject\MyJob;

$memcached = new \Memcached();
$memcached->addServer('localhost', 11211);
$memcachedLocker = new MemcachedLocker($memcached);

$sqsClient = new SqsClient([
'profile' => 'aws-profile',
'region' => 'eu-west-1',
'version' => 'latest'
]);

$sqsQueue = new SqsQueue($sqsClient, 'my_queue_name', 20, 30);
$sqsQueue->setLocker($memcachedLocker);

$myNewConsumer = new QueueWorker($sqsQueue, new MyJob());
$myNewConsumer->start();
```

See http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html#standard-queues-at-least-once-delivery for more info


(*) The idea is to support any queue system, so it is open for that. Contributions are welcome.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
},
"suggest": {
"predis/predis": "Allow work with Redis queues",
"ext-redis": "Allow work with Redis Locker",
"aws/aws-sdk-php": "Allow work with AWS Simple Queue Service (SQS) queues",
"pda/pheanstalk": "Allow work with Beanstalkd queues"
}
Expand Down
1 change: 1 addition & 0 deletions php-extensions.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
extension = "memcached.so"
31 changes: 31 additions & 0 deletions src/Simpleue/Locker/BaseLocker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace Simpleue\Locker;

abstract class BaseLocker implements Locker
{
protected $uniqIdFunction;
protected $keyPrefix = 'sqslocker-';

public function __construct()
{
$this->uniqIdFunction = function ($job) {
return md5(strtolower($job));
};
}

public function setJobUniqIdFunction(\Closure $function)
{
$this->uniqIdFunction = $function;
}

public function getJobUniqId($job)
{
if ($this->uniqIdFunction) {
$func = $this->uniqIdFunction;
return $this->keyPrefix . $func($job);
} else {
throw new \InvalidArgumentException('Locker::uniqIdFunction not defined!');
}
}
}
10 changes: 10 additions & 0 deletions src/Simpleue/Locker/Locker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

namespace Simpleue\Locker;

interface Locker
{
public function getLockerInfo();
public function lock($job, $timeout = 40);
public function disconnect();
}
45 changes: 45 additions & 0 deletions src/Simpleue/Locker/MemcachedLocker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace Simpleue\Locker;

class MemcachedLocker extends BaseLocker
{
/**
* @var \Memcached;
*/
private $memcached;

public function __construct(\Memcached $memcached)
{
parent::__construct();

$this->memcached = $memcached;
$this->memcached->setOptions([
\Memcached::OPT_TCP_NODELAY => true,
\Memcached::OPT_NO_BLOCK => true,
\Memcached::OPT_CONNECT_TIMEOUT => 60
]);
}

public function getLockerInfo()
{
return 'Memcached ( ' . json_encode($this->memcached->getServerList()) . ' )';
}

public function lock($job, $timeout = 30)
{
if (!$job) {
throw new \RuntimeException('Job for lock is invalid!');
}
return $this->memcached->add(
$this->getJobUniqId($job),
time() + $timeout + 1,
$timeout
);
}

public function disconnect()
{
$this->memcached->quit();
}
}
61 changes: 61 additions & 0 deletions src/Simpleue/Locker/RedisLocker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?php

namespace Simpleue\Locker;

class RedisLocker extends BaseLocker
{
/**
* @var \Redis;
*/
private $redis;

public function __construct(\Redis $redisClient)
{
parent::__construct();

$this->redis = $redisClient;
if ($this->redis->isConnected()===false) {
throw new \RuntimeException('Redis Client not connected!');
}
}

public function getLockerInfo()
{
return 'Redis ( '
. $this->redis->getHost()
. ':' . $this->redis->getPort()
. ' -> ' . $this->redis->getDbNum()
. ' )';
}

public function lock($job, $timeout = 40)
{
if (!$job) {
throw new \RuntimeException('Job for lock is invalid!');
}
$key = $this->getJobUniqId($job);
$status = $this->redis->set(
$key,
time() + $timeout + 1,
array('nx', 'ex' => $timeout)
);
if ($status) {
return true;
}

$currentLockTimestamp = $this->redis->get($key);
if ($currentLockTimestamp > time()) {
return false;
}
$oldLockTimestamp = $this->redis->getSet($key, (time() + $timeout + 1));
if ($oldLockTimestamp > time()) {
return false;
}
return true;
}

public function disconnect()
{
$this->redis->close();
}
}
24 changes: 23 additions & 1 deletion src/Simpleue/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use Aws\Sqs\Exception\SqsException;
use Aws\Sqs\SqsClient;
use Simpleue\Locker\BaseLocker;

/*
* AWS API 3.x doc : http://docs.aws.amazon.com/aws-sdk-php/v3/api/
Expand All @@ -20,6 +21,10 @@ class SqsQueue implements Queue {
private $errorQueueUrl;
private $maxWaitingSeconds;
private $visibilityTimeout;
/**
* @var BaseLocker
*/
private $locker;

public function __construct(SqsClient $sqsClient, $queueName, $maxWaitingSeconds = 20, $visibilityTimeout = 30) {
$this->sqsClient = $sqsClient;
Expand Down Expand Up @@ -73,6 +78,14 @@ public function setSqsClient(SqsClient $sqsClient) {
return $this;
}

/**
* @param BaseLocker $locker
*/
public function setLocker($locker)
{
$this->locker = $locker;
}

public function getNext() {
$queueItem = $this->sqsClient->receiveMessage([
'QueueUrl' => $this->sourceQueueUrl,
Expand All @@ -81,7 +94,16 @@ public function getNext() {
'VisibilityTimeout' => $this->visibilityTimeout
]);
if ($queueItem->hasKey('Messages')) {
return $queueItem->get('Messages')[0];
$msg = $queueItem->get('Messages')[0];
if ($this->locker && $this->locker->lock($this->getMessageBody($msg), $this->visibilityTimeout)===false) {
$this->error($msg);
throw new \RuntimeException(
'Sqs msg lock cannot acquired!'
.' LockId: ' . $this->locker->getJobUniqId($this->getMessageBody($msg))
.' LockerInfo: ' . $this->locker->getLockerInfo()
);
}
return $msg;
}
return false;
}
Expand Down
3 changes: 1 addition & 2 deletions src/Simpleue/Worker/QueueWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public function start()
$job = $this->queueHandler->getNext();
} catch (\Exception $exception) {
$this->log('error', 'Error getting data. Message: '.$exception->getMessage());
$this->queueHandler->error(false, $exception);
continue;
}
if ($this->isValidJob($job) && $this->jobHandler->isMyJob($this->queueHandler->getMessageBody($job))) {
Expand Down Expand Up @@ -157,7 +156,7 @@ private function manageJob($job)
}
} catch (\Exception $exception) {
$this->log('error', 'Error Managing data. Data :'.$this->queueHandler->toString($job).'. Message: '.$exception->getMessage());
$this->queueHandler->error($job, $exception);
$this->queueHandler->error($job);
}
}

Expand Down
63 changes: 63 additions & 0 deletions tests/Simpleue/Unitary/Locker/MemcachedLockerTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

namespace Simpleue\Unitary\Locker;

use Simpleue\Locker\MemcachedLocker;

class MemcachedLockerTest extends \PHPUnit_Framework_TestCase
{
private $memcachedClientMock;
/**
* @var MemcachedLocker
*/
private $memcachedLocker;

protected function setUp()
{
$this->memcachedClientMock = $this->getMock(
'\Memcached',
array('addServer', 'setOptions', 'getServerList', 'add', 'quit')
);
$this->memcachedClientMock->expects($this->any())->method('getServerList')->willReturn(
[['host' => 'localhost', 'port' => 11211, 'weight'=>10]]
);

$this->memcachedLocker = new MemcachedLocker($this->memcachedClientMock);
}

public function testGetJobUniqId()
{
$job = '{"string": "example", "uniqid":"123"}';
$this->assertEquals(
'sqslocker-'.md5(strtolower($job)),
$this->memcachedLocker->getJobUniqId($job)
);
$this->memcachedLocker->setJobUniqIdFunction(function ($job) {
return json_decode($job, true)['uniqid'];
});
$this->assertEquals(
'sqslocker-123',
$this->memcachedLocker->getJobUniqId($job)
);
}

public function testGetLockerInfo()
{
$this->assertEquals(
'Memcached ( ' . json_encode([['host' => 'localhost', 'port' => 11211, 'weight'=>10]]) . ' )',
$this->memcachedLocker->getLockerInfo()
);
}

public function testLock()
{
$job = '{"string": "example", "uniqid":"123"}';
$this->memcachedClientMock->expects($this->at(0))->method('add')->willReturn(true);
$this->memcachedClientMock->expects($this->at(1))->method('add')->willReturn(false);
$this->assertTrue($this->memcachedLocker->lock($job, 60));
$this->assertFalse($this->memcachedLocker->lock($job, 60));

$this->setExpectedException('RuntimeException', 'Job for lock is invalid!');
$this->memcachedLocker->lock(false, 60);
}
}
Loading

0 comments on commit b12ccea

Please sign in to comment.