Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(io): simplify write/read queue implementation #278

Merged
merged 1 commit into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Psl/Async/Awaitable.php
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,10 @@ static function (?Throwable $error, mixed $value) use ($suspension): void {
/**
* Do not forward unhandled errors to the event loop handler.
*/
public function ignore(): void
public function ignore(): self
{
$this->state->ignore();

return $this;
}
}
174 changes: 75 additions & 99 deletions src/Psl/IO/Internal/ResourceHandle.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,11 @@

use Psl;
use Psl\Async;
use Psl\Async\Awaitable;
use Psl\Exception\InvariantViolationException;
use Psl\IO;
use Psl\IO\Exception;
use Psl\Type;
use Revolt\EventLoop;
use Revolt\EventLoop\Suspension;

use function error_get_last;
use function fclose;
Expand Down Expand Up @@ -56,11 +54,8 @@ class ResourceHandle implements IO\Stream\CloseSeekReadWriteHandleInterface
private string $readWatcher = '';
private string $writeWatcher = '';

private ?Suspension $readSuspension = null;
private ?Suspension $writeSuspension = null;

private ?Awaitable $readAwaitable = null;
private ?Awaitable $writeAwaitable = null;
private ?Async\Deferred $readDeferred = null;
private ?Async\Deferred $writeDeferred = null;

/**
* @param resource|object $stream
Expand Down Expand Up @@ -89,10 +84,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)

Psl\invariant($readable, 'Handle is not readable.');

$suspension = &$this->readSuspension;
$this->readWatcher = EventLoop::onReadable($stream, static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->resume(null);
$deferred = &$this->readDeferred;
$this->readWatcher = EventLoop::onReadable($stream, static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->complete(null);
});

EventLoop::disable($this->readWatcher);
Expand All @@ -107,10 +102,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)

Psl\invariant($writable, 'Handle is not writeable.');

$suspension = &$this->writeSuspension;
$this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->resume(null);
$deferred = &$this->writeDeferred;
$this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->complete(null);
});

EventLoop::disable($this->writeWatcher);
Expand Down Expand Up @@ -138,38 +133,33 @@ public function write(string $bytes, ?float $timeout = null): int

$bytes = substr($bytes, $written);

$this->writeAwaitable = $awaitable = Async\run(function () use ($timeout): void {
$this->writeSuspension = EventLoop::createSuspension();
$suspension = &$this->writeSuspension;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->throw(
new Exception\TimeoutException('reached timeout while the handle is still not writable.')
);
}
);
}

try {
/** @var Suspension $suspension */
$suspension->suspend();
} finally {
$suspension = null;
EventLoop::disable($this->writeWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
$this->writeDeferred = new Async\Deferred();
$deferred = &$this->writeDeferred;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->error(
new Exception\TimeoutException('reached timeout while the handle is still not writable.')
);
}
}
});
);
}

$awaitable->await();
$this->writeAwaitable = null;
try {
/** @var Async\Deferred $deferred */
$deferred->getAwaitable()->await();
} finally {
$deferred = null;
EventLoop::disable($this->writeWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
}
}

return $written + $this->writeImmediately($bytes);
}
Expand All @@ -182,16 +172,12 @@ static function () use (&$suspension) {
*/
public function writeImmediately(string $bytes): int
{
if (null !== $this->writeAwaitable) {
// there's a pending write operation, wait for it first.
$awaitable = $this->writeAwaitable->then(
static fn() => null,
static fn() => null,
);

$awaitable->ignore();
$awaitable->await();
}
// there's a pending write operation, wait for it first.
$this->writeDeferred
?->getAwaitable()
->then(static fn() => null, static fn() => null)
->ignore()
->await();

if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
Expand Down Expand Up @@ -266,38 +252,33 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
return $chunk;
}

$this->readAwaitable = $awaitable = Async\run(function () use ($timeout): void {
$this->readSuspension = EventLoop::createSuspension();
$suspension = &$this->readSuspension;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->throw(
new Exception\TimeoutException('reached timeout while the handle is still not readable.')
);
}
);
}

try {
/** @var Suspension $suspension */
$suspension->suspend();
} finally {
$suspension = null;
EventLoop::disable($this->readWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
$this->readDeferred = new Async\Deferred();
$deferred = &$this->readDeferred;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->error(
new Exception\TimeoutException('reached timeout while the handle is still not readable.')
);
}
}
});
);
}

$awaitable->await();
$this->readAwaitable = null;
try {
/** @var Async\Deferred $deferred */
$deferred->getAwaitable()->await();
} finally {
$deferred = null;
EventLoop::disable($this->readWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
}
}

return $this->readImmediately($max_bytes);
}
Expand All @@ -309,16 +290,13 @@ static function () use (&$suspension) {
*/
public function readImmediately(?int $max_bytes = null): string
{
if (null !== $this->readAwaitable) {
// there's a pending read operation, wait for it.
$awaitable = $this->readAwaitable->then(
static fn() => null,
static fn() => null,
);

$awaitable->ignore();
$awaitable->await();
}
// there's a pending read operation, wait for it.
$this->readDeferred
?->getAwaitable()
->then(static fn() => null, static fn() => null)
->ignore()
->await()
;

if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
Expand Down Expand Up @@ -366,10 +344,8 @@ public function close(): void
$this->stream = null;
}

$this->readAwaitable = null;
$this->readSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->writeAwaitable = null;
$this->writeSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->readDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->writeDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
}

/**
Expand Down