From b32b7fca67efff30fac2daefd3ea3b56d073214b Mon Sep 17 00:00:00 2001 From: azjezz Date: Wed, 1 Dec 2021 21:36:23 +0100 Subject: [PATCH] feat(async): introduce more async helper functions Signed-off-by: azjezz --- docs/component/async.md | 9 ++-- examples/async/usleep.php | 2 +- examples/io/pipe.php | 2 +- examples/shell/concurrent.php | 2 +- examples/tcp/concurrent.php | 2 +- examples/unix/concurrent.php | 2 +- src/Psl/Async/all.php | 35 ++++++++++---- src/Psl/Async/any.php | 9 +++- src/Psl/Async/concurrent.php | 34 -------------- src/Psl/Async/first.php | 8 +++- src/Psl/Async/parallel.php | 77 +++++++++++++++++++++++++++++++ src/Psl/Async/reflect.php | 28 +++++++++++ src/Psl/Async/series.php | 32 +++++++++++++ src/Psl/Async/wrap.php | 24 ++++++++++ src/Psl/Internal/Loader.php | 5 +- src/Psl/Shell/execute.php | 2 +- tests/unit/Async/AllTest.php | 50 ++++++++++---------- tests/unit/Async/ParallelTest.php | 72 +++++++++++++++++++++++++++++ tests/unit/Async/ReflectTest.php | 33 +++++++++++++ tests/unit/Async/RunTest.php | 6 +-- tests/unit/Async/SeriesTest.php | 67 +++++++++++++++++++++++++++ tests/unit/Async/WrapTest.php | 32 +++++++++++++ tests/unit/IO/PipeTest.php | 8 ++-- tests/unit/TCP/ConnectTest.php | 2 +- tests/unit/TCP/ServerTest.php | 2 +- tests/unit/Unix/ConnectTest.php | 2 +- tests/unit/Unix/ServerTest.php | 2 +- 27 files changed, 459 insertions(+), 90 deletions(-) delete mode 100644 src/Psl/Async/concurrent.php create mode 100644 src/Psl/Async/parallel.php create mode 100644 src/Psl/Async/reflect.php create mode 100644 src/Psl/Async/series.php create mode 100644 src/Psl/Async/wrap.php create mode 100644 tests/unit/Async/ParallelTest.php create mode 100644 tests/unit/Async/ReflectTest.php create mode 100644 tests/unit/Async/SeriesTest.php create mode 100644 tests/unit/Async/WrapTest.php diff --git a/docs/component/async.md b/docs/component/async.md index bfb8adc2..9f625147 100644 --- a/docs/component/async.md +++ b/docs/component/async.md @@ -12,18 +12,21 @@ #### `Functions` -- [all](./../../src/Psl/Async/all.php#L22) +- [all](./../../src/Psl/Async/all.php#L26) - [any](./../../src/Psl/Async/any.php#L25) - [await](./../../src/Psl/Async/await.php#L18) - [await_readable](./../../src/Psl/Async/await_readable.php#L23) - [await_signal](./../../src/Psl/Async/await_signal.php#L18) - [await_writable](./../../src/Psl/Async/await_writable.php#L21) -- [concurrent](./../../src/Psl/Async/concurrent.php#L21) -- [first](./../../src/Psl/Async/first.php#L24) +- [first](./../../src/Psl/Async/first.php#L22) - [later](./../../src/Psl/Async/later.php#L14) - [main](./../../src/Psl/Async/main.php#L16) +- [parallel](./../../src/Psl/Async/parallel.php#L64) +- [reflect](./../../src/Psl/Async/reflect.php#L25) - [run](./../../src/Psl/Async/run.php#L20) +- [series](./../../src/Psl/Async/series.php#L21) - [sleep](./../../src/Psl/Async/sleep.php#L10) +- [wrap](./../../src/Psl/Async/wrap.php#L21) #### `Classes` diff --git a/examples/async/usleep.php b/examples/async/usleep.php index 76ca83d3..23c6eaa6 100644 --- a/examples/async/usleep.php +++ b/examples/async/usleep.php @@ -12,7 +12,7 @@ Async\main(static function (): int { $start = time(); - Async\concurrent([ + Async\parallel([ static fn() => Async\sleep(2.0), static fn() => Async\sleep(2.0), static fn() => Async\sleep(2.0), diff --git a/examples/io/pipe.php b/examples/io/pipe.php index be4fcbfc..55cf365b 100644 --- a/examples/io/pipe.php +++ b/examples/io/pipe.php @@ -14,7 +14,7 @@ $output = IO\output_handle(); - Async\concurrent([ + Async\parallel([ static function() use($read, $output): void { $output->writeAll("< sleeping.\n"); diff --git a/examples/shell/concurrent.php b/examples/shell/concurrent.php index 2eb487c9..175f9dcb 100644 --- a/examples/shell/concurrent.php +++ b/examples/shell/concurrent.php @@ -13,7 +13,7 @@ Async\main(static function (): int { $start = time(); - Async\concurrent([ + Async\parallel([ static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']), static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']), static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']), diff --git a/examples/tcp/concurrent.php b/examples/tcp/concurrent.php index e32a0572..a78ef989 100644 --- a/examples/tcp/concurrent.php +++ b/examples/tcp/concurrent.php @@ -14,7 +14,7 @@ Async\main(static function (): int { $output = IO\output_handle(); - Async\concurrent([ + Async\parallel([ 'server' => static function () use ($output): void { $server = TCP\Server::create('localhost', 91337); $output->writeAll("< server is listening\n"); diff --git a/examples/unix/concurrent.php b/examples/unix/concurrent.php index 64dc96e8..1b869983 100644 --- a/examples/unix/concurrent.php +++ b/examples/unix/concurrent.php @@ -24,7 +24,7 @@ $output = IO\output_handle(); - Async\concurrent([ + Async\parallel([ 'server' => static function () use ($file, $output): void { $server = Unix\Server::create($file); $output->writeAll("< server is listening\n"); diff --git a/src/Psl/Async/all.php b/src/Psl/Async/all.php index b11ce64a..62b53a6f 100644 --- a/src/Psl/Async/all.php +++ b/src/Psl/Async/all.php @@ -10,7 +10,11 @@ /** * Awaits all awaitables to complete concurrently. * - * If one or more awaitables fail, all awaitables will be completed before throwing. + * If one awaitable fails, the exception will be thrown immediately, and the result of the callables will be ignored. + * + * If multiple awaitables failed at once, a {@see Exception\CompositeException} will be thrown. + * + * Once the awaitables have completed, an array containing the results will be returned preserving the original awaitables order. * * @template Tk of array-key * @template Tv @@ -22,20 +26,35 @@ function all(iterable $awaitables): array { $values = []; - $errors = []; // Awaitable::iterate() to throw the first error based on completion order instead of argument order foreach (Awaitable::iterate($awaitables) as $index => $awaitable) { try { $values[$index] = $awaitable->await(); } catch (Throwable $throwable) { - $errors[] = $throwable; - } - } + $errors = []; + foreach ($awaitables as $original) { + if ($original === $awaitable) { + continue; + } - if ($errors !== []) { - /** @psalm-suppress MissingThrowsDocblock */ - throw $errors[0]; + if (!$original->isComplete()) { + $original->ignore(); + } else { + try { + $original->await(); + } catch (Throwable $error) { + $errors[] = $error; + } + } + } + + if ($errors === []) { + throw $throwable; + } + + throw new Exception\CompositeException([$throwable, ...$errors], 'Multiple exceptions thrown while waiting.'); + } } return Dict\map_with_key( diff --git a/src/Psl/Async/any.php b/src/Psl/Async/any.php index a8ab7c10..df6bb57a 100644 --- a/src/Psl/Async/any.php +++ b/src/Psl/Async/any.php @@ -27,7 +27,14 @@ function any(iterable $awaitables): mixed $errors = []; foreach (Awaitable::iterate($awaitables) as $first) { try { - return $first->await(); + $result = $first->await(); + foreach ($awaitables as $awaitable) { + if ($awaitable !== $first) { + $awaitable->ignore(); + } + } + + return $result; } catch (Throwable $throwable) { $errors[] = $throwable; } diff --git a/src/Psl/Async/concurrent.php b/src/Psl/Async/concurrent.php deleted file mode 100644 index 1968e935..00000000 --- a/src/Psl/Async/concurrent.php +++ /dev/null @@ -1,34 +0,0 @@ - $callables - * - * @return array unwrapped values with the order preserved. - */ -function concurrent(iterable $callables): array -{ - $awaitables = Dict\map( - $callables, - /** - * @param callable(): Tv $callable - * - * @return Awaitable - */ - static fn(callable $callable): Awaitable => run($callable), - ); - - return namespace\all($awaitables); -} diff --git a/src/Psl/Async/first.php b/src/Psl/Async/first.php index 878dfd9b..569303e1 100644 --- a/src/Psl/Async/first.php +++ b/src/Psl/Async/first.php @@ -18,12 +18,16 @@ * @throws Psl\Exception\InvariantViolationException If $awaitables is empty. * * @return T - * - * @codeCoverageIgnore */ function first(iterable $awaitables): mixed { foreach (Awaitable::iterate($awaitables) as $first) { + foreach ($awaitables as $awaitable) { + if ($awaitable !== $first) { + $awaitable->ignore(); + } + } + return $first->await(); } diff --git a/src/Psl/Async/parallel.php b/src/Psl/Async/parallel.php new file mode 100644 index 00000000..abeb8a02 --- /dev/null +++ b/src/Psl/Async/parallel.php @@ -0,0 +1,77 @@ + + * use Psl\Async; + * + * // execute `getOne()` and `getTwo()` functions in parallel. + * + * [$one, $two] = Async\parallel([ + * getOne(...), + * getTwo(...), + * ]); + * + * + * @note {@see parallel()} is about kicking-off I/O tasks in parallel, not about parallel execution of code. + * If your tasks do not use any timers or perform any I/O, they will actually be executed in series. + * + * + * use Psl\Async; + * + * // the following runs in series. + * + * [$one, $two] = Async\parallel([ + * fn() => file_get_contents('path/to/file1.txt'), + * fn() => file_get_contents('path/to/file2.txt'), + * ]); + * + * + * @note Use {@see reflect()} to continue the execution of other tasks when a task fails. + * + * + * use Psl\Async; + * + * // execute `getOne()` and `getTwo()` functions in parallel. + * // if either one of the given tasks fails, the other will continue execution. + * + * [$one, $two] = Async\parallel([ + * Async\reflect(getOne(...)), + * Async\reflect(getTwo(...)), + * ]); + * + * + * @template Tk of array-key + * @template Tv + * + * @param iterable $tasks + * + * @return array + */ +function parallel(iterable $tasks): array +{ + $awaitables = Dict\map( + $tasks, + /** + * @param callable(): Tv $callable + * + * @return Awaitable + */ + static fn(callable $callable): Awaitable => run($callable), + ); + + return namespace\all($awaitables); +} diff --git a/src/Psl/Async/reflect.php b/src/Psl/Async/reflect.php new file mode 100644 index 00000000..d2b2f487 --- /dev/null +++ b/src/Psl/Async/reflect.php @@ -0,0 +1,28 @@ +) + * + * @see Result\wrap() + * + * @pure + */ +function reflect(callable $task): Closure +{ + return static fn() => Result\wrap($task); +} diff --git a/src/Psl/Async/series.php b/src/Psl/Async/series.php new file mode 100644 index 00000000..f6e88a8b --- /dev/null +++ b/src/Psl/Async/series.php @@ -0,0 +1,32 @@ + $tasks + * + * @return array + */ +function series(iterable $tasks): array +{ + return Dict\map( + $tasks, + /** + * @param callable(): Tv $callable + * + * @return Tv + */ + static fn(callable $callable): mixed => run($callable)->await(), + ); +} diff --git a/src/Psl/Async/wrap.php b/src/Psl/Async/wrap.php new file mode 100644 index 00000000..412909d0 --- /dev/null +++ b/src/Psl/Async/wrap.php @@ -0,0 +1,24 @@ + + * + * @see reflect() + */ +function wrap(callable $task): Result\ResultInterface +{ + return run(reflect($task))->await(); +} diff --git a/src/Psl/Internal/Loader.php b/src/Psl/Internal/Loader.php index 480520ae..3883bb70 100644 --- a/src/Psl/Internal/Loader.php +++ b/src/Psl/Internal/Loader.php @@ -445,7 +445,10 @@ final class Loader 'Psl\Trait\defined', 'Psl\Async\main', 'Psl\Async\run', - 'Psl\Async\concurrent', + 'Psl\Async\parallel', + 'Psl\Async\reflect', + 'Psl\Async\wrap', + 'Psl\Async\series', 'Psl\Async\await', 'Psl\Async\any', 'Psl\Async\all', diff --git a/src/Psl/Shell/execute.php b/src/Psl/Shell/execute.php index f646c27d..605e61b2 100644 --- a/src/Psl/Shell/execute.php +++ b/src/Psl/Shell/execute.php @@ -164,7 +164,7 @@ static function (array $m) use ( $stderr = new Stream\CloseReadHandle($pipes[2]); try { - [$stdout_content, $stderr_content] = Async\concurrent([ + [$stdout_content, $stderr_content] = Async\parallel([ static fn(): string => $stdout->readAll(timeout: $timeout), static fn(): string => $stderr->readAll(timeout: $timeout), ]); diff --git a/tests/unit/Async/AllTest.php b/tests/unit/Async/AllTest.php index 51d9ee38..e06a486c 100644 --- a/tests/unit/Async/AllTest.php +++ b/tests/unit/Async/AllTest.php @@ -42,34 +42,34 @@ public function testAllForwardsException(): void $this->expectExceptionMessage('a'); Async\all([ - Async\run(static function (): string { - Async\sleep(0.01); - - throw new InvariantViolationException('a'); - }), - Async\run(static function (): string { - Async\sleep(0.02); - - throw new InvariantViolationException('b'); - }), - Async\run(static function (): string { - Async\sleep(0.03); + Async\Awaitable::error(new InvariantViolationException('a')), + Async\Awaitable::complete('b'), + Async\Awaitable::complete('c'), + ]); - return 'c'; - }), - Async\run(static function (): string { - Async\sleep(0.005); + Async\Scheduler::run(); + } - Async\later(); + public function testAllCompositeException(): void + { + try { + Async\all([ + Async\Awaitable::error(new InvariantViolationException('a')), + Async\Awaitable::error(new InvariantViolationException('b')), + Async\Awaitable::complete('c'), + ]); + } catch (Async\Exception\CompositeException $exception) { + $reasons = $exception->getReasons(); - Async\sleep(0.005); + static::assertCount(2, $reasons); + static::assertSame('a', $reasons[0]->getMessage()); + static::assertSame('b', $reasons[1]->getMessage()); + } - return 'c'; - }), - ]); + Async\Scheduler::run(); } - public function testAllAwaitablesAreCompleted(): void + public function testAllAwaitablesAreCompletedAtALaterTime(): void { $ref = new Psl\Ref(''); @@ -102,10 +102,12 @@ public function testAllAwaitablesAreCompleted(): void $ref->value .= 'd'; }), ]); - } catch (InvariantViolationException) { - $this->addToAssertionCount(1); + } catch (InvariantViolationException $exception) { + static::assertSame('a', $exception->getMessage()); } + Async\Scheduler::run(); + static::assertSame('adbc', $ref->value); } } diff --git a/tests/unit/Async/ParallelTest.php b/tests/unit/Async/ParallelTest.php new file mode 100644 index 00000000..e8f5a442 --- /dev/null +++ b/tests/unit/Async/ParallelTest.php @@ -0,0 +1,72 @@ +value .= '1'; + }, + static function () use ($spy): void { + Async\sleep(0.001); + + $spy->value .= '2'; + }, + static function () use ($spy): void { + Async\sleep(0.001); + + $spy->value .= '3'; + }, + ]); + + static::assertSame('231', $spy->value); + } + + public function testParallelThrowsForTheFirstAndDoesNotCallTheRest(): void + { + Async\Scheduler::run(); + + $spy = new Psl\Ref(''); + + try { + Async\parallel([ + static function (): void { + Async\sleep(0.003); + + throw new Exception('foo'); + }, + static function () use ($spy): void { + Async\sleep(0.004); + + $spy->value = 'thrown'; + + throw new Exception('bar'); + }, + ]); + } catch (Exception $exception) { + static::assertSame('foo', $exception->getMessage()); + } + + // wait for all callbacks in the event loop to finish. + // including the second callback given to parallel. + // this ensures that even after it finishes, the exception will be ignored. + Async\Scheduler::run(); + + // the second callback was finished, but it didn't throw the exception ( which is exactly what we want ) + static::assertSame('thrown', $spy->value); + } +} diff --git a/tests/unit/Async/ReflectTest.php b/tests/unit/Async/ReflectTest.php new file mode 100644 index 00000000..c01a14e8 --- /dev/null +++ b/tests/unit/Async/ReflectTest.php @@ -0,0 +1,33 @@ +getException()->getMessage()); + static::assertSame('success', $two->getResult()); + } +} diff --git a/tests/unit/Async/RunTest.php b/tests/unit/Async/RunTest.php index 21c2e0d0..b7795548 100644 --- a/tests/unit/Async/RunTest.php +++ b/tests/unit/Async/RunTest.php @@ -12,7 +12,7 @@ final class RunTest extends TestCase public function testRun(): void { $awaitable = Async\run(static function (): string { - Async\concurrent([ + Async\parallel([ static fn() => Async\sleep(0.001), static fn() => Async\sleep(0.001), static fn() => Async\sleep(0.001), @@ -27,7 +27,7 @@ public function testRun(): void public function testRunWithTimeout(): void { $awaitable = Async\run(static function (): string { - Async\concurrent([ + Async\parallel([ static fn() => Async\sleep(0.01), static fn() => Async\sleep(0.01), static fn() => Async\sleep(0.01), @@ -42,7 +42,7 @@ public function testRunWithTimeout(): void public function testRunTimedOut(): void { $awaitable = Async\run(static function (): string { - Async\concurrent([ + Async\parallel([ static fn() => Async\sleep(1), static fn() => Async\sleep(1), static fn() => Async\sleep(1), diff --git a/tests/unit/Async/SeriesTest.php b/tests/unit/Async/SeriesTest.php new file mode 100644 index 00000000..27049e27 --- /dev/null +++ b/tests/unit/Async/SeriesTest.php @@ -0,0 +1,67 @@ +value .= '1'; + }, + static function () use ($spy): void { + Async\sleep(0.001); + + $spy->value .= '2'; + }, + static function () use ($spy): void { + Async\sleep(0.001); + + $spy->value .= '3'; + }, + ]); + + static::assertSame('123', $spy->value); + } + + public function testSeriesThrowsForTheFirstAndDoesNotCallTheRest(): void + { + Async\Scheduler::run(); + + $spy = new Psl\Ref(''); + + try { + Async\series([ + static function (): void { + Async\sleep(0.003); + + throw new Exception('foo'); + }, + static function () use ($spy): void { + $spy->value = 'thrown'; + + throw new Exception('bar'); + }, + ]); + } catch (Exception $exception) { + static::assertSame('foo', $exception->getMessage()); + } + + // wait for all callbacks in the event loop to finish. + Async\Scheduler::run(); + + static::assertSame('', $spy->value); + } +} diff --git a/tests/unit/Async/WrapTest.php b/tests/unit/Async/WrapTest.php new file mode 100644 index 00000000..59a5e4b8 --- /dev/null +++ b/tests/unit/Async/WrapTest.php @@ -0,0 +1,32 @@ +getException()->getMessage()); + static::assertSame('success', $two->getResult()); + } +} diff --git a/tests/unit/IO/PipeTest.php b/tests/unit/IO/PipeTest.php index 9774e454..54a336ee 100644 --- a/tests/unit/IO/PipeTest.php +++ b/tests/unit/IO/PipeTest.php @@ -31,13 +31,13 @@ public function testReadWrite(): void $read->close(); } - public function testReadWriteConcurrently(): void + public function testReadWriteInParallel(): void { [$read, $write] = IO\pipe(); $spy = new Psl\Ref(''); - $read_result = Async\run(static function () use ($read, $spy): string { + $read_awaitable = Async\run(static function () use ($read, $spy): string { $spy->value .= '[read:sleep]'; Async\sleep(0.003); $spy->value .= '[read:start]'; @@ -58,9 +58,9 @@ public function testReadWriteConcurrently(): void $spy->value .= '[write:close]'; })->await(); - $result = $read_result->await(); + $read_result = $read_awaitable->await(); - static::assertSame('hello', $result); + static::assertSame('hello', $read_result); static::assertSame('[read:sleep][write:sleep][read:start][write:start][write:complete][write:close][read:complete][read:close]', $spy->value); } diff --git a/tests/unit/TCP/ConnectTest.php b/tests/unit/TCP/ConnectTest.php index e81cd2fa..08a9bd9c 100644 --- a/tests/unit/TCP/ConnectTest.php +++ b/tests/unit/TCP/ConnectTest.php @@ -13,7 +13,7 @@ final class ConnectTest extends TestCase { public function testConnect(): void { - Async\concurrent([ + Async\parallel([ 'server' => static function (): void { $server = TCP\Server::create('127.0.0.1', 8081); self::assertSame('tcp://127.0.0.1:8081', $server->getLocalAddress()->toString()); diff --git a/tests/unit/TCP/ServerTest.php b/tests/unit/TCP/ServerTest.php index 9d4b8922..7bb16a20 100644 --- a/tests/unit/TCP/ServerTest.php +++ b/tests/unit/TCP/ServerTest.php @@ -52,7 +52,7 @@ public function testWaitsForPendingOperation(): void $first = Async\run(static fn() => $server->nextConnection()); - [$second_connection, $client_one, $client_two] = Async\concurrent([ + [$second_connection, $client_one, $client_two] = Async\parallel([ static fn() => $server->nextConnection(), static fn() => TCP\connect('127.0.0.1', $server->getLocalAddress()->port), static fn() => TCP\connect('127.0.0.1', $server->getLocalAddress()->port), diff --git a/tests/unit/Unix/ConnectTest.php b/tests/unit/Unix/ConnectTest.php index 149114cb..82e3c759 100644 --- a/tests/unit/Unix/ConnectTest.php +++ b/tests/unit/Unix/ConnectTest.php @@ -22,7 +22,7 @@ public function testConnect(): void $sock = Filesystem\create_temporary_file(prefix: 'psl-examples') . ".sock"; - Async\concurrent([ + Async\parallel([ 'server' => static function () use ($sock): void { $server = Unix\Server::create($sock); self::assertSame("unix://{$sock}", $server->getLocalAddress()->toString()); diff --git a/tests/unit/Unix/ServerTest.php b/tests/unit/Unix/ServerTest.php index 69226f56..aada8149 100644 --- a/tests/unit/Unix/ServerTest.php +++ b/tests/unit/Unix/ServerTest.php @@ -57,7 +57,7 @@ public function testWaitsForPendingOperation(): void $first = Async\run(static fn() => $server->nextConnection()); - [$second_connection, $client_one, $client_two] = Async\concurrent([ + [$second_connection, $client_one, $client_two] = Async\parallel([ static fn() => $server->nextConnection(), static fn() => Unix\connect($sock), static fn() => Unix\connect($sock),