Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add newTimestamp parameter and functionality to ackSend() and requeue(). Also add prefix check for indexes. #17

Merged
merged 2 commits into from
Feb 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/DominionEnterprises/Mongo/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public function ensureGetIndex(array $beforeSort = array(), array $afterSort = a

/**
* Ensure an index for the count() method.
* Is a no-op if the generated index is a prefix of an existing one. If you have a similar ensureGetIndex call, call it first.
*
* @param array $fields fields in count() call to index in same format as \MongoCollection::ensureIndex()
* @param bool $includeRunning whether to include the running field in the index
Expand Down Expand Up @@ -297,15 +298,17 @@ public function ack(array $message)
* @param array $payload the data to store in the message to send. Data is handled same way as \MongoCollection::insert()
* @param int $earliestGet earliest unix timestamp the message can be retreived.
* @param float $priority priority for order out of get(). 0 is higher priority than 1
* @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
*
* @return void
*
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
* @throws \InvalidArgumentException $earliestGet was not an int
* @throws \InvalidArgumentException $priority was not a float
* @throws \InvalidArgumentException $priority is NaN
* @throws \InvalidArgumentException $newTimestamp was not a bool
*/
public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0)
public function ackSend(array $message, array $payload, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
{
$id = null;
if (array_key_exists('id', $message)) {
Expand All @@ -328,23 +331,29 @@ public function ackSend(array $message, array $payload, $earliestGet = 0, $prior
throw new \InvalidArgumentException('$priority was NaN');
}

if ($newTimestamp !== true && $newTimestamp !== false) {
throw new \InvalidArgumentException('$newTimestamp was not a bool');
}

if ($earliestGet > self::MONGO_INT32_MAX) {
$earliestGet = self::MONGO_INT32_MAX;
} elseif ($earliestGet < 0) {
$earliestGet = 0;
}

$newMessage = array(
$toSet = array(
'payload' => $payload,
'running' => false,
'resetTimestamp' => new \MongoDate(self::MONGO_INT32_MAX),
'earliestGet' => new \MongoDate($earliestGet),
'priority' => $priority,
'created' => new \MongoDate(),
);
if ($newTimestamp) {
$toSet['created'] = new \MongoDate();
}

//using upsert because if no documents found then the doc was removed (SHOULD ONLY HAPPEN BY SOMEONE MANUALLY) so we can just send
$this->_collection->update(array('_id' => $id), $newMessage, array('upsert' => true));
$this->_collection->update(array('_id' => $id), array('$set' => $toSet), array('upsert' => true));
}

/**
Expand All @@ -353,19 +362,21 @@ public function ackSend(array $message, array $payload, $earliestGet = 0, $prior
* @param array $message message received from get().
* @param int $earliestGet earliest unix timestamp the message can be retreived.
* @param float $priority priority for order out of get(). 0 is higher priority than 1
* @param bool $newTimestamp true to give the payload a new timestamp or false to use given message timestamp
*
* @return void
*
* @throws \InvalidArgumentException $message does not have a field "id" that is a MongoId
* @throws \InvalidArgumentException $earliestGet was not an int
* @throws \InvalidArgumentException $priority was not a float
* @throws \InvalidArgumentException priority is NaN
* @throws \InvalidArgumentException $newTimestamp was not a bool
*/
public function requeue(array $message, $earliestGet = 0, $priority = 0.0)
public function requeue(array $message, $earliestGet = 0, $priority = 0.0, $newTimestamp = true)
{
$forRequeue = $message;
unset($forRequeue['id']);
$this->ackSend($message, $forRequeue, $earliestGet, $priority);
$this->ackSend($message, $forRequeue, $earliestGet, $priority, $newTimestamp);
}

/**
Expand Down Expand Up @@ -415,6 +426,7 @@ public function send(array $payload, $earliestGet = 0, $priority = 0.0)

/**
* Ensure index of correct specification and a unique name whether the specification or name already exist or not.
* Will not create index if $index is a prefix of an existing index
*
* @param array $index index to create in same format as \MongoCollection::ensureIndex()
*
Expand All @@ -424,6 +436,14 @@ public function send(array $payload, $earliestGet = 0, $priority = 0.0)
*/
private function _ensureIndex(array $index)
{
//if $index is a prefix of any existing index we are good
foreach ($this->_collection->getIndexInfo() as $existingIndex) {
$slice = array_slice($existingIndex['key'], 0, count($index), true);
if ($slice === $index) {
return;
}
}

for ($i = 0; $i < 5; ++$i) {
for ($name = uniqid(); strlen($name) > 0; $name = substr($name, 0, -1)) {
//creating an index with same name and different spec does nothing.
Expand Down
50 changes: 50 additions & 0 deletions tests/DominionEnterprises/Mongo/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,21 @@ public function ensureCountIndex()
$this->assertSame($expectedTwo, $resultTwo[2]['key']);
}

/**
* @test
*/
public function ensureCountIndexWithPrefixOfPrevious()
{
$this->_queue->ensureCountIndex(array('type' => 1, 'boo' => -1), false);
$this->_queue->ensureCountIndex(array('type' => 1), false);

$this->assertSame(2, count($this->_collection->getIndexInfo()));

$expected = array('payload.type' => 1, 'payload.boo' => -1);
$result = $this->_collection->getIndexInfo();
$this->assertSame($expected, $result[1]['key']);
}

/**
* @test
* @expectedException \InvalidArgumentException
Expand Down Expand Up @@ -315,6 +330,32 @@ public function getWithTimeBasedPriority()
$this->assertSame(array('id' => $resultThree['id']) + $messageThree, $resultThree);
}

/**
* @test
*/
public function getWithTimeBasedPriorityWithOldTimestamp()
{
$messageOne = array('key' => 0);
$messageTwo = array('key' => 1);
$messageThree = array('key' => 2);

$this->_queue->send($messageOne);
$this->_queue->send($messageTwo);
$this->_queue->send($messageThree);

$resultTwo = $this->_queue->get(array(), PHP_INT_MAX, 0);
//ensuring using old timestamp shouldn't affect normal time order of send()s
$this->_queue->requeue($resultTwo, 0, 0.0, false);

$resultOne = $this->_queue->get(array(), PHP_INT_MAX, 0);
$resultTwo = $this->_queue->get(array(), PHP_INT_MAX, 0);
$resultThree = $this->_queue->get(array(), PHP_INT_MAX, 0);

$this->assertSame(array('id' => $resultOne['id']) + $messageOne, $resultOne);
$this->assertSame(array('id' => $resultTwo['id']) + $messageTwo, $resultTwo);
$this->assertSame(array('id' => $resultThree['id']) + $messageThree, $resultThree);
}

/**
* @test
*/
Expand Down Expand Up @@ -499,6 +540,15 @@ public function ackSendWithNonIntEarliestGet()
$this->_queue->ackSend(array('id' => new \MongoId()), array(), true);
}

/**
* @test
* @expectedException \InvalidArgumentException
*/
public function ackSendWithNonBoolNewTimestamp()
{
$this->_queue->ackSend(array('id' => new \MongoId()), array(), 0, 0.0, 1);
}

/**
* @test
*/
Expand Down