Skip to content

Commit

Permalink
Turn Suspension into an interface
Browse files Browse the repository at this point in the history
This is equivalent to the interface change in reactphp/async#15 to allow different implementations.

This also allows decorating `Suspension` to implement listeners like proposed in #2.
  • Loading branch information
kelunik committed Nov 22, 2021
1 parent 38bda20 commit 53407b0
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 93 deletions.
2 changes: 1 addition & 1 deletion src/EventLoop/Internal/AbstractDriver.php
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public function unreference(string $callbackId): string

public function createSuspension(\Fiber $scheduler): Suspension
{
return new Suspension($this, $scheduler, $this->interruptCallback);
return new DriverSuspension($this, $scheduler, $this->interruptCallback);
}

/**
Expand Down
106 changes: 106 additions & 0 deletions src/EventLoop/Internal/DriverSuspension.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

namespace Revolt\EventLoop\Internal;

use Revolt\EventLoop\Driver;
use Revolt\EventLoop\Suspension;

/**
* @internal
*/
final class DriverSuspension implements Suspension
{
private ?\Fiber $fiber;
private \Fiber $scheduler;
private Driver $driver;
private bool $pending = false;
private ?\FiberError $error = null;
/** @var callable */
private $interrupt;

/**
* @param Driver $driver
* @param \Fiber $scheduler
* @param callable $interrupt
*
* @internal
*/
public function __construct(Driver $driver, \Fiber $scheduler, callable $interrupt)
{
$this->driver = $driver;
$this->scheduler = $scheduler;
$this->interrupt = $interrupt;
$this->fiber = \Fiber::getCurrent();

// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($this->fiber !== $this->scheduler);
}

public function throw(\Throwable $throwable): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling throw()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'throw'], $throwable);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => throw $throwable);
}
}

public function resume(mixed $value = null): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling resume()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'resume'], $value);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => $value);
}
}

public function suspend(): mixed
{
if ($this->pending) {
throw new \Error('Must call resume() or throw() before calling suspend() again');
}

if ($this->fiber !== \Fiber::getCurrent()) {
throw new \Error('Must not call suspend() from another fiber');
}

$this->pending = true;

// Awaiting from within a fiber.
if ($this->fiber) {
try {
return \Fiber::suspend();
} catch (\FiberError $exception) {
$this->pending = false;
$this->error = $exception;

throw $exception;
}
}

// Awaiting from {main}.
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// Should only be true if the event loop exited without resolving the promise.
throw new \Error('Scheduler suspended or exited unexpectedly');
}

return $lambda();
}
}
100 changes: 8 additions & 92 deletions src/EventLoop/Suspension.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,103 +10,19 @@
* ```php
* $suspension = EventLoop::createSuspension();
*
* $promise->then(fn ($value) => $suspension->resume($value), fn ($throwable) => $suspension->throw($throwable));
* $promise->then(
* fn (mixed $value) => $suspension->resume($value),
* fn (Throwable $error) => $suspension->throw($error)
* );
*
* $suspension->suspend();
* ```
*/
final class Suspension
interface Suspension
{
private ?\Fiber $fiber;
private \Fiber $scheduler;
private Driver $driver;
private bool $pending = false;
private ?\FiberError $error = null;
/** @var callable */
private $interrupt;
public function resume(mixed $value = null): void;

/**
* @param Driver $driver
* @param \Fiber $scheduler
* @param callable $interrupt
*
* @internal
*/
public function __construct(Driver $driver, \Fiber $scheduler, callable $interrupt)
{
$this->driver = $driver;
$this->scheduler = $scheduler;
$this->interrupt = $interrupt;
$this->fiber = \Fiber::getCurrent();
public function suspend(): mixed;

// User callbacks are always executed outside the event loop fiber, so this should always be false.
\assert($this->fiber !== $this->scheduler);
}

public function throw(\Throwable $throwable): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling throw()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'throw'], $throwable);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => throw $throwable);
}
}

public function resume(mixed $value = null): void
{
if (!$this->pending) {
throw $this->error ?? new \Error('Must call suspend() before calling resume()');
}

$this->pending = false;

if ($this->fiber) {
$this->driver->queue([$this->fiber, 'resume'], $value);
} else {
// Suspend event loop fiber to {main}.
($this->interrupt)(static fn () => $value);
}
}

public function suspend(): mixed
{
if ($this->pending) {
throw new \Error('Must call resume() or throw() before calling suspend() again');
}

if ($this->fiber !== \Fiber::getCurrent()) {
throw new \Error('Must not call suspend() from another fiber');
}

$this->pending = true;

// Awaiting from within a fiber.
if ($this->fiber) {
try {
return \Fiber::suspend();
} catch (\FiberError $exception) {
$this->pending = false;
$this->error = $exception;
throw $exception;
}
}

// Awaiting from {main}.
$lambda = $this->scheduler->isStarted() ? $this->scheduler->resume() : $this->scheduler->start();

/** @psalm-suppress RedundantCondition $this->pending should be changed when resumed. */
if ($this->pending) {
// Should only be true if the event loop exited without resolving the promise.
throw new \Error('Scheduler suspended or exited unexpectedly');
}

return $lambda();
}
public function throw(\Throwable $throwable): void;
}

0 comments on commit 53407b0

Please sign in to comment.