Skip to content

Commit

Permalink
Merge pull request #17 from gaillard/master
Browse files Browse the repository at this point in the history
Add newTimestamp parameter and functionality to ackSend() and requeue(). Also add prefix check for indexes.
  • Loading branch information
chadicus committed Feb 5, 2014
2 parents 07c4b8a + d4bf1cd commit 4aefe27
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 6 deletions.
32 changes: 26 additions & 6 deletions src/DominionEnterprises/Mongo/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public function ensureGetIndex(array $beforeSort = array(), array $afterSort = a

/**
* Ensure an index for the count() method.
* Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call, call it first.
*
* @param array $fields fields in count() call to index in same format as \MongoCollection::ensureIndex()
* @param bool $includeRunning whether to include the running field in the index
Expand Down Expand Up @@ -297,15 +298,17 @@ public function ack(array $message)
* @param array $payload the data to store in the message to send. Data is handled same way as \MongoCollection::insert()
* @param int $earliestGet earliest unix timestamp the message can be retreived.
* @param float $priority priority for order out of get(). 0 is higher priority than 1
* @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
*
* @return void
*
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
* @throws \InvalidArgumentException $earliestGet was not an int
* @throws \InvalidArgumentException $priority was not a float
* @throws \InvalidArgumentException $priority is NaN
* @throws \InvalidArgumentException $newTimestamp was not a bool
*/
public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0)
public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
{
$id = null;
if (array_key_exists('id', $message)) {
Expand All @@ -328,23 +331,29 @@ public function ackSend(array $message, array $payload, $earliestGet = 0, $prior
throw new \InvalidArgumentException('$priority was NaN');
}

if ($newTimestamp !== true && $newTimestamp !== false) {
throw new \InvalidArgumentException('$newTimestamp was not a bool');
}

if ($earliestGet > self::MONGO_INT32_MAX) {
$earliestGet = self::MONGO_INT32_MAX;
} elseif ($earliestGet < 0) {
$earliestGet = 0;
}

$newMessage = array(
$toSet = array(
'payload' => $payload,
'running' => false,
'resetTimestamp' => new \MongoDate(self::MONGO_INT32_MAX),
'earliestGet' => new \MongoDate($earliestGet),
'priority' => $priority,
'created' => new \MongoDate(),
);
if ($newTimestamp) {
$toSet['created'] = new \MongoDate();
}

//using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY) so we can just send
$this->_collection->update(array('_id' => $id), $newMessage, array('upsert' => true));
$this->_collection->update(array('_id' => $id), array('$set' => $toSet), array('upsert' => true));
}

/**
Expand All @@ -353,19 +362,21 @@ public function ackSend(array $message, array $payload, $earliestGet = 0, $prior
* @param array $message message received from get().
* @param int $earliestGet earliest unix timestamp the message can be retreived.
* @param float $priority priority for order out of get(). 0 is higher priority than 1
* @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
*
* @return void
*
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
* @throws \InvalidArgumentException $earliestGet was not an int
* @throws \InvalidArgumentException $priority was not a float
* @throws \InvalidArgumentException priority is NaN
* @throws \InvalidArgumentException $newTimestamp was not a bool
*/
public function requeue(array $message, $earliestGet = 0, $priority = 0.0)
public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
{
$forRequeue = $message;
unset($forRequeue['id']);
$this->ackSend($message, $forRequeue, $earliestGet, $priority);
$this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
}

/**
Expand Down Expand Up @@ -415,6 +426,7 @@ public function send(array $payload, $earliestGet = 0, $priority = 0.0)

/**
* Ensure index of correct specification and a unique name whether the specification or name already exist or not.
* Will not create index if $index is a prefix of an existing index
*
* @param array $index index to create in same format as \MongoCollection::ensureIndex()
*
Expand All @@ -424,6 +436,14 @@ public function send(array $payload, $earliestGet = 0, $priority = 0.0)
*/
private function _ensureIndex(array $index)
{
//if $index is a prefix of any existing index we are good
foreach ($this->_collection->getIndexInfo() as $existingIndex) {
$slice = array_slice($existingIndex['key'], 0, count($index), true);
if ($slice === $index) {
return;
}
}

for ($i = 0; $i < 5; ++$i) {
for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
//creating an index with same name and different spec does nothing.
Expand Down
50 changes: 50 additions & 0 deletions tests/DominionEnterprises/Mongo/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ public function ensureCountIndex()
$this->assertSame($expectedTwo, $resultTwo[2]['key']);
}

/**
* @test
*/
public function ensureCountIndexWithPrefixOfPrevious()
{
$this->_queue->ensureCountIndex(array('type' => 1, 'boo' => -1), false);
$this->_queue->ensureCountIndex(array('type' => 1), false);

$this->assertSame(2, count($this->_collection->getIndexInfo()));

$expected = array('payload.type' => 1, 'payload.boo' => -1);
$result = $this->_collection->getIndexInfo();
$this->assertSame($expected, $result[1]['key']);
}

/**
* @test
* @expectedException \InvalidArgumentException
Expand Down Expand Up @@ -315,6 +330,32 @@ public function getWithTimeBasedPriority()
$this->assertSame(array('id' => $resultThree['id']) + $messageThree, $resultThree);
}

/**
* @test
*/
public function getWithTimeBasedPriorityWithOldTimestamp()
{
$messageOne = array('key' => 0);
$messageTwo = array('key' => 1);
$messageThree = array('key' => 2);

$this->_queue->send($messageOne);
$this->_queue->send($messageTwo);
$this->_queue->send($messageThree);

$resultTwo = $this->_queue->get(array(), PHP_INT_MAX, 0);
//ensuring using old timestamp shouldn't affect normal time order of send()s
$this->_queue->requeue($resultTwo, 0, 0.0, false);

$resultOne = $this->_queue->get(array(), PHP_INT_MAX, 0);
$resultTwo = $this->_queue->get(array(), PHP_INT_MAX, 0);
$resultThree = $this->_queue->get(array(), PHP_INT_MAX, 0);

$this->assertSame(array('id' => $resultOne['id']) + $messageOne, $resultOne);
$this->assertSame(array('id' => $resultTwo['id']) + $messageTwo, $resultTwo);
$this->assertSame(array('id' => $resultThree['id']) + $messageThree, $resultThree);
}

/**
* @test
*/
Expand Down Expand Up @@ -499,6 +540,15 @@ public function ackSendWithNonIntEarliestGet()
$this->_queue->ackSend(array('id' => new \MongoId()), array(), true);
}

/**
* @test
* @expectedException \InvalidArgumentException
*/
public function ackSendWithNonBoolNewTimestamp()
{
$this->_queue->ackSend(array('id' => new \MongoId()), array(), 0, 0.0, 1);
}

/**
* @test
*/
Expand Down

0 comments on commit 4aefe27

Please sign in to comment.