Skip to content

Commit

Permalink
feat(io): refactor IO component to use Async component
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 2, 2021
1 parent 5bafc4b commit 513a6ae
Show file tree
Hide file tree
Showing 42 changed files with 1,945 additions and 570 deletions.
12 changes: 8 additions & 4 deletions docs/component/io.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
- [CloseSeekWriteHandleInterface](./../../src/Psl/IO/CloseSeekWriteHandleInterface.php#L7)
- [CloseWriteHandleInterface](./../../src/Psl/IO/CloseWriteHandleInterface.php#L7)
- [HandleInterface](./../../src/Psl/IO/HandleInterface.php#L21)
- [ReadHandleInterface](./../../src/Psl/IO/ReadHandleInterface.php#L12)
- [ReadHandleInterface](./../../src/Psl/IO/ReadHandleInterface.php#L10)
- [ReadWriteHandleInterface](./../../src/Psl/IO/ReadWriteHandleInterface.php#L7)
- [SeekHandleInterface](./../../src/Psl/IO/SeekHandleInterface.php#L12)
- [SeekReadHandleInterface](./../../src/Psl/IO/SeekReadHandleInterface.php#L7)
Expand All @@ -37,8 +37,12 @@

#### `Classes`

- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L13)
- [Reader](./../../src/Psl/IO/Reader.php#L11)
- [Writer](./../../src/Psl/IO/Writer.php#L9)
- [MemoryHandle](./../../src/Psl/IO/MemoryHandle.php#L15)
- [Reader](./../../src/Psl/IO/Reader.php#L15)

#### `Traits`

- [ReadHandleConvenienceMethodsTrait](./../../src/Psl/IO/ReadHandleConvenienceMethodsTrait.php#L15)
- [WriteHandleConvenienceMethodsTrait](./../../src/Psl/IO/WriteHandleConvenienceMethodsTrait.php#L16)


23 changes: 13 additions & 10 deletions src/Psl/Async/await_readable.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,30 @@ function await_readable(mixed $resource, bool $reference = true, ?int $timeout_m
{
$suspension = Scheduler::createSuspension();

$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException()));
Scheduler::unreference($timeout_watcher);
}

$watcher = EventLoop::onReadable(
$resource,
static fn(string $_watcher, mixed $resource) => $suspension->resume($resource)
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
);

if (!$reference) {
Scheduler::unreference($watcher);
}

$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException()));
}

try {
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
23 changes: 13 additions & 10 deletions src/Psl/Async/await_signal.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,30 @@ function await_signal(int $signal, bool $reference = true, ?int $timeout_ms = nu
{
$suspension = Scheduler::createSuspension();

$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException()));
Scheduler::unreference($timeout_watcher);
}

$watcher = EventLoop::onSignal(
$signal,
static fn(string $_watcher, int $signal_number) => $suspension->resume($signal_number)
static function (string $_watcher, int $signal) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($signal);
},
);

if (!$reference) {
Scheduler::unreference($watcher);
}

$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException()));
}

try {
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
22 changes: 13 additions & 9 deletions src/Psl/Async/await_writable.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,31 @@ function await_writable(mixed $resource, bool $reference = true, ?int $timeout_m
{
$suspension = Scheduler::createSuspension();

$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException()));
Scheduler::unreference($timeout_watcher);
}

$watcher = EventLoop::onWritable(
$resource,
static fn(string $_watcher, mixed $resource) => $suspension->resume($resource)
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
);

if (!$reference) {
Scheduler::unreference($watcher);
}

$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException()));
}

try {
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
22 changes: 10 additions & 12 deletions src/Psl/Async/concurrently.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
*/
function concurrently(iterable $callables): Awaitable
{
return run(static function () use ($callables): array {
$awaitables = Dict\map(
$callables,
/**
* @param callable(): Tv $callable
*
* @return Awaitable<Tv>
*/
static fn(callable $callable): Awaitable => run($callable),
);
$awaitables = Dict\map(
$callables,
/**
* @param callable(): Tv $callable
*
* @return Awaitable<Tv>
*/
static fn(callable $callable): Awaitable => run($callable),
);

return namespace\all($awaitables);
});
return run(static fn(): array => namespace\all($awaitables));
}
32 changes: 21 additions & 11 deletions src/Psl/Async/run.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,33 @@ function run(callable $callable, ?int $timeout_ms = null): Awaitable
{
$state = new Internal\State();

Scheduler::defer(static function () use ($callable, $state): void {
$timeout_watcher = null;
if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static function () use ($state): void {
$state->error(new TimeoutException());
});

Scheduler::unreference($timeout_watcher);
}

Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
try {
$state->complete($callable());
$result = $callable();
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

if ($state->isComplete()) {
// timed-out.
return;
}

$state->complete($result);
} catch (Throwable $throwable) {
$state->error($throwable);
}
});

if (null !== $timeout_ms) {
$id = Scheduler::delay($timeout_ms, static function () use ($state): void {
$state->error(new TimeoutException());
});

$state->subscribe(static function () use ($id): void {
Scheduler::cancel($id);
});
}

return new Awaitable($state);
}
9 changes: 9 additions & 0 deletions src/Psl/IO/Exception/TimeoutException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

declare(strict_types=1);

namespace Psl\IO\Exception;

final class TimeoutException extends RuntimeException
{
}
50 changes: 50 additions & 0 deletions src/Psl/IO/Internal/OptionalIncrementalTimeout.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

declare(strict_types=1);

namespace Psl\IO\Internal;

use function microtime;

/**
* @internal
*
* @codeCoverageIgnore
*/
final class OptionalIncrementalTimeout
{
private ?float $end;
/**
* @var (callable(): ?int)
*/
private $handler;

/**
* @param (callable(): ?int) $handler
*/
public function __construct(?int $timeout_ms, callable $handler)
{
$this->handler = $handler;
if ($timeout_ms === null) {
$this->end = null;
return;
}

$this->end = microtime(true) + (float) $timeout_ms;
}

public function getRemaining(): ?int
{
if ($this->end === null) {
return null;
}

$remaining = $this->end - microtime(true);
if ($remaining <= 0) {
$th = $this->handler;
return $th();
}

return $remaining < 1.0 ? 1 : (int) $remaining;
}
}
32 changes: 0 additions & 32 deletions src/Psl/IO/Internal/ReadOnlyHandleDecorator.php

This file was deleted.

Loading

0 comments on commit 513a6ae

Please sign in to comment.