Skip to content

Commit

Permalink
Merge pull request #28 from fruux/promise-loop-integration
Browse files Browse the repository at this point in the history
Promise loop integration
  • Loading branch information
evert committed Oct 24, 2015
2 parents 1c1c0b1 + a6373d9 commit efee7cc
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 33 deletions.
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
"Sabre\\Event\\": "lib/"
},
"files" : [
"lib/coroutine.php"
"lib/coroutine.php",
"lib/Loop/functions.php"
]
},
"require-dev": {
Expand Down
49 changes: 36 additions & 13 deletions lib/Loop.php → lib/Loop/Loop.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Sabre\Event;
namespace Sabre\Event\Loop;

/**
* A simple eventloop implementation.
Expand Down Expand Up @@ -201,28 +201,51 @@ function run() {

do {

$this->runNextTicks();
$hasEvents = $this->tick(true);

$nextTimeout = $this->runTimers();
$pollTimeout = $this->nextTick ? 0 : $nextTimeout;
$this->runStreams($pollTimeout);

} while ($this->running && ($this->readStreams || $this->writeStreams || $this->nextTick || $this->timers));
} while ($this->running && $hasEvents);
$this->running = false;

}

/**
* Executes all pending events, and immediately exists if there were no
* pending events.
* Executes all pending events.
*
* @return void
* If $block is turned true, this function will block until any event is
* triggered.
*
* If there are now timeouts, nextTick callbacks or events in the loop at
* all, this function will exit immediately.
*
* This function will return true if there are _any_ events left in the
* loop after the tick.
*
* @param bool $block
* @return bool
*/
function runOnce() {
function tick($block = false) {

$this->runNextTicks();
$this->runTimers();
$this->runStreams(0);
$nextTimeout = $this->runTimers();

// Calculating how long runStreams should at most wait.
if (!$block) {
// Don't wait
$streamWait = 0;
} elseif ($this->nextTick) {
// There's a pending 'nextTick'. Don't wait.
$streamWait = 0;
} elseif (is_numeric($nextTimeout)) {
// Wait until the next Timeout should trigger.
$streamWait = $nextTimeout;
} else {
// Wait indefinitely
$streamWait = null;
}

$this->runStreams($streamWait);

return ($this->readStreams || $this->writeStreams || $this->nextTick || $this->timers);

}

Expand Down
183 changes: 183 additions & 0 deletions lib/Loop/functions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
<?php

namespace Sabre\Event\Loop;

/**
* Executes a function after x seconds.
*
* @param callable $cb
* @param float $timeout timeout in seconds
* @return void
*/
function setTimeout(callable $cb, $timeout) {

instance()->setTimeout($cb, $timeout);

}

/**
* Executes a function every x seconds.
*
* The value this function returns can be used to stop the interval with
* clearInterval.
*
* @param callable $cb
* @param float $timeout
* @return array
*/
function setInterval(callable $cb, $timeout) {

return instance()->setInterval($cb, $timeout);

}

/**
* Stops a running internval.
*
* @param array $intervalId
* @return void
*/
function clearInterval($intervalId) {

instance()->clearInterval($intervalId);

}

/**
* Runs a function immediately at the next iteration of the loop.
*
* @param callable $cb
* @return void
*/
function nextTick(callable $cb) {

instance()->nextTick($cb);

}


/**
* Adds a read stream.
*
* The callback will be called as soon as there is something to read from
* the stream.
*
* You MUST call removeReadStream after you are done with the stream, to
* prevent the eventloop from never stopping.
*
* @param resource $stream
* @param callable $cb
* @return void
*/
function addReadStream($stream, callable $cb) {

instance()->addReadStream($stream, $cb);

}

/**
* Adds a write stream.
*
* The callback will be called as soon as the system reports it's ready to
* receive writes on the stream.
*
* You MUST call removeWriteStream after you are done with the stream, to
* prevent the eventloop from never stopping.
*
* @param resource $stream
* @param callable $cb
* @return void
*/
function addWriteStream($stream, callable $cb) {

instance()->addWriteStream($stream, $cb);

}

/**
* Stop watching a stream for reads.
*
* @param resource $stream
* @return void
*/
function removeReadStream($stream) {

instance()->removeReadStream($stream);

}

/**
* Stop watching a stream for writes.
*
* @param resource $stream
* @return void
*/
function removeWriteStream($stream) {

instance()->removeWriteStream($stream);

}


/**
* Runs the loop.
*
* This function will run continiously, until there's no more events to
* handle.
*
* @return void
*/
function run() {

instance()->run();

}

/**
* Executes all pending events.
*
* If $block is turned true, this function will block until any event is
* triggered.
*
* If there are now timeouts, nextTick callbacks or events in the loop at
* all, this function will exit immediately.
*
* This function will return true if there are _any_ events left in the
* loop after the tick.
*
* @param bool $block
* @return bool
*/
function tick($block = false) {

return instance()->tick($block);

}

/**
* Stops a running eventloop
*
* @return void
*/
function stop() {

instance()->stop();

}

/**
* Retrieves or sets the global Loop object.
*
* @param Loop $newLoop
*/
function instance(Loop $newLoop = null) {

static $loop;
if ($newLoop) {
$loop = $newLoop;
} elseif (!$loop) {
$loop = new Loop();
}
return $loop;

}
72 changes: 57 additions & 15 deletions lib/Promise.php
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,46 @@ function reject($reason = null) {

}

/**
* Stops execution until this promise is resolved.
*
* This method stops exection completely. If the promise is successful with
* a value, this method will return this value. If the promise was
* rejected, this method will throw an exception.
*
* @throws Exception
* @return mixed
*/
function wait() {

$hasEvents = true;
while ($this->state === self::PENDING) {

if (!$hasEvents) {
throw new \LogicException('There were no more events in the loop. This promise will never be fulfilled.');
}
$hasEvents = Loop\tick(true);

}

if ($this->state === self::FULFILLED) {
return $this->value;
} else {
$reason = $this->value;
// Rejected
if ($reason instanceof Exception) {
throw $reason;
} elseif (is_scalar($reason)) {
throw new Exception($reason);
} else {
$type = is_object($reason) ? get_class($reason) : gettype($reason);
throw new Exception('Promise was rejected with reason of type: ' . $type);
}
}


}

/**
* It's possible to send an array of promises to the all method. This
* method returns a promise that will be fulfilled, only if all the passed
Expand Down Expand Up @@ -234,24 +274,26 @@ function($reason) use ($fail) {
*/
protected function invokeCallback(Promise $subPromise, callable $callBack = null) {

if (is_callable($callBack)) {
try {
$result = $callBack($this->value);
if ($result instanceof self) {
$result->then([$subPromise, 'fulfill'], [$subPromise, 'reject']);
} else {
$subPromise->fulfill($result);
Loop\nextTick(function() use ($callBack, $subPromise) {
if (is_callable($callBack)) {
try {
$result = $callBack($this->value);
if ($result instanceof self) {
$result->then([$subPromise, 'fulfill'], [$subPromise, 'reject']);
} else {
$subPromise->fulfill($result);
}
} catch (Exception $e) {
$subPromise->reject($e);
}
} catch (Exception $e) {
$subPromise->reject($e);
}
} else {
if ($this->state === self::FULFILLED) {
$subPromise->fulfill($this->value);
} else {
$subPromise->reject($this->value);
if ($this->state === self::FULFILLED) {
$subPromise->fulfill($this->value);
} else {
$subPromise->reject($this->value);
}
}
}
});
}


Expand Down
Loading

0 comments on commit efee7cc

Please sign in to comment.