From 4d1da9e7b53514fa2bb23d56cf3c15efc8482cda Mon Sep 17 00:00:00 2001 From: Saif Eddin Gmati <29315886+azjezz@users.noreply.github.com> Date: Tue, 16 Nov 2021 21:55:22 +0100 Subject: [PATCH] chore(io): simplify write/read queue implementation (#278) Signed-off-by: azjezz --- src/Psl/Async/Awaitable.php | 4 +- src/Psl/IO/Internal/ResourceHandle.php | 174 +++++++++++-------------- 2 files changed, 78 insertions(+), 100 deletions(-) diff --git a/src/Psl/Async/Awaitable.php b/src/Psl/Async/Awaitable.php index 5f61e5dd..765e5a4b 100644 --- a/src/Psl/Async/Awaitable.php +++ b/src/Psl/Async/Awaitable.php @@ -201,8 +201,10 @@ static function (?Throwable $error, mixed $value) use ($suspension): void { /** * Do not forward unhandled errors to the event loop handler. */ - public function ignore(): void + public function ignore(): self { $this->state->ignore(); + + return $this; } } diff --git a/src/Psl/IO/Internal/ResourceHandle.php b/src/Psl/IO/Internal/ResourceHandle.php index ec021b89..c1bff220 100644 --- a/src/Psl/IO/Internal/ResourceHandle.php +++ b/src/Psl/IO/Internal/ResourceHandle.php @@ -6,13 +6,11 @@ use Psl; use Psl\Async; -use Psl\Async\Awaitable; use Psl\Exception\InvariantViolationException; use Psl\IO; use Psl\IO\Exception; use Psl\Type; use Revolt\EventLoop; -use Revolt\EventLoop\Suspension; use function error_get_last; use function fclose; @@ -56,11 +54,8 @@ class ResourceHandle implements IO\Stream\CloseSeekReadWriteHandleInterface private string $readWatcher = ''; private string $writeWatcher = ''; - private ?Suspension $readSuspension = null; - private ?Suspension $writeSuspension = null; - - private ?Awaitable $readAwaitable = null; - private ?Awaitable $writeAwaitable = null; + private ?Async\Deferred $readDeferred = null; + private ?Async\Deferred $writeDeferred = null; /** * @param resource|object $stream @@ -89,10 +84,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek) Psl\invariant($readable, 'Handle is not readable.'); - $suspension = &$this->readSuspension; - $this->readWatcher = EventLoop::onReadable($stream, static function () use (&$suspension) { - /** @var Suspension|null $suspension */ - $suspension?->resume(null); + $deferred = &$this->readDeferred; + $this->readWatcher = EventLoop::onReadable($stream, static function () use (&$deferred) { + /** @var Async\Deferred|null $deferred */ + $deferred?->complete(null); }); EventLoop::disable($this->readWatcher); @@ -107,10 +102,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek) Psl\invariant($writable, 'Handle is not writeable.'); - $suspension = &$this->writeSuspension; - $this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$suspension) { - /** @var Suspension|null $suspension */ - $suspension?->resume(null); + $deferred = &$this->writeDeferred; + $this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$deferred) { + /** @var Async\Deferred|null $deferred */ + $deferred?->complete(null); }); EventLoop::disable($this->writeWatcher); @@ -138,38 +133,33 @@ public function write(string $bytes, ?float $timeout = null): int $bytes = substr($bytes, $written); - $this->writeAwaitable = $awaitable = Async\run(function () use ($timeout): void { - $this->writeSuspension = EventLoop::createSuspension(); - $suspension = &$this->writeSuspension; - /** @psalm-suppress MissingThrowsDocblock */ - EventLoop::enable($this->writeWatcher); - $delay_watcher = null; - if (null !== $timeout) { - $delay_watcher = EventLoop::delay( - $timeout, - static function () use (&$suspension) { - /** @var Suspension|null $suspension */ - $suspension?->throw( - new Exception\TimeoutException('reached timeout while the handle is still not writable.') - ); - } - ); - } - - try { - /** @var Suspension $suspension */ - $suspension->suspend(); - } finally { - $suspension = null; - EventLoop::disable($this->writeWatcher); - if (null !== $delay_watcher) { - EventLoop::cancel($delay_watcher); + $this->writeDeferred = new Async\Deferred(); + $deferred = &$this->writeDeferred; + /** @psalm-suppress MissingThrowsDocblock */ + EventLoop::enable($this->writeWatcher); + $delay_watcher = null; + if (null !== $timeout) { + $delay_watcher = EventLoop::delay( + $timeout, + static function () use (&$deferred) { + /** @var Async\Deferred|null $deferred */ + $deferred?->error( + new Exception\TimeoutException('reached timeout while the handle is still not writable.') + ); } - } - }); + ); + } - $awaitable->await(); - $this->writeAwaitable = null; + try { + /** @var Async\Deferred $deferred */ + $deferred->getAwaitable()->await(); + } finally { + $deferred = null; + EventLoop::disable($this->writeWatcher); + if (null !== $delay_watcher) { + EventLoop::cancel($delay_watcher); + } + } return $written + $this->writeImmediately($bytes); } @@ -182,16 +172,12 @@ static function () use (&$suspension) { */ public function writeImmediately(string $bytes): int { - if (null !== $this->writeAwaitable) { - // there's a pending write operation, wait for it first. - $awaitable = $this->writeAwaitable->then( - static fn() => null, - static fn() => null, - ); - - $awaitable->ignore(); - $awaitable->await(); - } + // there's a pending write operation, wait for it first. + $this->writeDeferred + ?->getAwaitable() + ->then(static fn() => null, static fn() => null) + ->ignore() + ->await(); if (!is_resource($this->stream)) { throw new Exception\AlreadyClosedException('Handle has already been closed.'); @@ -266,38 +252,33 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string return $chunk; } - $this->readAwaitable = $awaitable = Async\run(function () use ($timeout): void { - $this->readSuspension = EventLoop::createSuspension(); - $suspension = &$this->readSuspension; - /** @psalm-suppress MissingThrowsDocblock */ - EventLoop::enable($this->readWatcher); - $delay_watcher = null; - if (null !== $timeout) { - $delay_watcher = EventLoop::delay( - $timeout, - static function () use (&$suspension) { - /** @var Suspension|null $suspension */ - $suspension?->throw( - new Exception\TimeoutException('reached timeout while the handle is still not readable.') - ); - } - ); - } - - try { - /** @var Suspension $suspension */ - $suspension->suspend(); - } finally { - $suspension = null; - EventLoop::disable($this->readWatcher); - if (null !== $delay_watcher) { - EventLoop::cancel($delay_watcher); + $this->readDeferred = new Async\Deferred(); + $deferred = &$this->readDeferred; + /** @psalm-suppress MissingThrowsDocblock */ + EventLoop::enable($this->readWatcher); + $delay_watcher = null; + if (null !== $timeout) { + $delay_watcher = EventLoop::delay( + $timeout, + static function () use (&$deferred) { + /** @var Async\Deferred|null $deferred */ + $deferred?->error( + new Exception\TimeoutException('reached timeout while the handle is still not readable.') + ); } - } - }); + ); + } - $awaitable->await(); - $this->readAwaitable = null; + try { + /** @var Async\Deferred $deferred */ + $deferred->getAwaitable()->await(); + } finally { + $deferred = null; + EventLoop::disable($this->readWatcher); + if (null !== $delay_watcher) { + EventLoop::cancel($delay_watcher); + } + } return $this->readImmediately($max_bytes); } @@ -309,16 +290,13 @@ static function () use (&$suspension) { */ public function readImmediately(?int $max_bytes = null): string { - if (null !== $this->readAwaitable) { - // there's a pending read operation, wait for it. - $awaitable = $this->readAwaitable->then( - static fn() => null, - static fn() => null, - ); - - $awaitable->ignore(); - $awaitable->await(); - } + // there's a pending read operation, wait for it. + $this->readDeferred + ?->getAwaitable() + ->then(static fn() => null, static fn() => null) + ->ignore() + ->await() + ; if (!is_resource($this->stream)) { throw new Exception\AlreadyClosedException('Handle has already been closed.'); @@ -366,10 +344,8 @@ public function close(): void $this->stream = null; } - $this->readAwaitable = null; - $this->readSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.')); - $this->writeAwaitable = null; - $this->writeSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.')); + $this->readDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.')); + $this->writeDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.')); } /**