diff --git a/src/Psl/IO/Internal/ResourceHandle.php b/src/Psl/IO/Internal/ResourceHandle.php index 23a186fd..8c53b20d 100644 --- a/src/Psl/IO/Internal/ResourceHandle.php +++ b/src/Psl/IO/Internal/ResourceHandle.php @@ -59,7 +59,14 @@ class ResourceHandle implements IO\Stream\CloseSeekReadWriteHandleInterface */ private string $writeWatcher = 'invalid'; + /** + * @var null|Async\Deferred + */ private ?Async\Deferred $readDeferred = null; + + /** + * @var null|Async\Deferred + */ private ?Async\Deferred $writeDeferred = null; /** @@ -91,8 +98,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek) $deferred = &$this->readDeferred; $this->readWatcher = Async\Scheduler::onReadable($stream, static function () use (&$deferred) { - /** @var Async\Deferred|null $deferred */ - $deferred?->complete(null); + /** @var Async\Deferred|null $tmp */ + $tmp = $deferred; + $deferred = null; + $tmp?->complete(null); }); Async\Scheduler::disable($this->readWatcher); @@ -109,8 +118,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek) $deferred = &$this->writeDeferred; $this->writeWatcher = Async\Scheduler::onWritable($stream, static function () use (&$deferred) { - /** @var Async\Deferred|null $deferred */ - $deferred?->complete(null); + /** @var Async\Deferred|null $tmp */ + $tmp = $deferred; + $deferred = null; + $tmp?->complete(null); }); Async\Scheduler::disable($this->writeWatcher); @@ -131,6 +142,7 @@ public function write(string $bytes, ?float $timeout = null): int $bytes = substr($bytes, $written); + /** @var Async\Deferred */ $this->writeDeferred = new Async\Deferred(); $deferred = &$this->writeDeferred; /** @psalm-suppress MissingThrowsDocblock */ @@ -142,11 +154,15 @@ public function write(string $bytes, ?float $timeout = null): int $timeout, static function () use (&$deferred) { /** @var Async\Deferred|null $deferred */ - $deferred?->error( + $tmp = $deferred; + $deferred = null; + $tmp?->error( new Exception\TimeoutException('reached timeout while the handle is still not writable.') ); } ); + + Async\Scheduler::unreference($delay_watcher); } try { @@ -235,6 +251,7 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string return $chunk; } + /** @var Async\Deferred */ $this->readDeferred = new Async\Deferred(); $deferred = &$this->readDeferred; /** @psalm-suppress MissingThrowsDocblock */ @@ -242,22 +259,28 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string $delay_watcher = null; if (null !== $timeout) { $timeout = $timeout < 0.0 ? 0.0 : $timeout; + $read_watcher = $this->readWatcher; $delay_watcher = Async\Scheduler::delay( $timeout, - static function () use (&$deferred) { - /** @var Async\Deferred|null $deferred */ - $deferred?->error( + static function () use (&$deferred, $read_watcher) { + Async\Scheduler::disable($read_watcher); + + /** @var Async\Deferred|null $tmp */ + $tmp = $deferred; + $deferred = null; + $tmp?->error( new Exception\TimeoutException('reached timeout while the handle is still not readable.') ); } ); + + Async\Scheduler::unreference($delay_watcher); } try { /** @var Async\Deferred $deferred */ $deferred->getAwaitable()->await(); } finally { - $deferred = null; Async\Scheduler::disable($this->readWatcher); if (null !== $delay_watcher) { Async\Scheduler::cancel($delay_watcher); @@ -300,32 +323,6 @@ public function readImmediately(?int $max_bytes = null): string return $result; } - /** - * {@inheritDoc} - */ - public function close(): void - { - if (is_resource($this->stream)) { - /** @psalm-suppress PossiblyInvalidArgument */ - $stream = $this->stream; - $this->stream = null; - $result = @fclose($stream); - if ($result === false) { - /** @var array{message: string} $error */ - $error = error_get_last(); - - throw new Exception\RuntimeException($error['message'] ?? 'unknown error.'); - } - } else { - // Stream could be set to a non-null closed-resource, - // if manually closed using `fclose($handle->getStream)`. - $this->stream = null; - } - - $this->readDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.')); - $this->writeDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.')); - } - /** * @return object|resource|null */ @@ -338,4 +335,44 @@ public function __destruct() { $this->close(); } + + /** + * {@inheritDoc} + */ + public function close(): void + { + if (null !== $this->stream) { + Async\Scheduler::disable($this->readWatcher); + Async\Scheduler::disable($this->writeWatcher); + + if (is_resource($this->stream)) { + /** @psalm-suppress PossiblyInvalidArgument */ + $stream = $this->stream; + $this->stream = null; + $result = @fclose($stream); + if ($result === false) { + /** @var array{message: string} $error */ + $error = error_get_last(); + + throw new Exception\RuntimeException($error['message'] ?? 'unknown error.'); + } + } else { + // Stream could be set to a non-null closed-resource, + // if manually closed using `fclose($handle->getStream)`. + $this->stream = null; + } + + if ($this->readDeferred) { + $deferred = $this->readDeferred; + $this->readDeferred = null; + $deferred->error(new Exception\AlreadyClosedException('Handle has already been closed.')); + } + + if ($this->writeDeferred) { + $deferred = $this->writeDeferred; + $this->readDeferred = null; + $deferred->error(new Exception\AlreadyClosedException('Handle has already been closed.')); + } + } + } }