Skip to content

Commit

Permalink
Add support for amp v3
Browse files Browse the repository at this point in the history
  • Loading branch information
veewee committed Apr 28, 2023
1 parent bf8785b commit d64e8fa
Show file tree
Hide file tree
Showing 14 changed files with 136 additions and 292 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/grumphp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
strategy:
matrix:
operating-system: [ubuntu-latest, macos-latest] #windows-latest currently not working
php-versions: ['8.0', '8.1', '8.2']
php-versions: ['8.1', '8.2']
composer-options: ['', '--prefer-lowest']
composer-versions: ['composer:v2']
fail-fast: false
Expand Down
8 changes: 3 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@
}
],
"require": {
"php": "^8.0",
"php": "^8.1",
"ext-json": "*",
"composer-plugin-api": "~2.0",
"amphp/amp": "^2.6",
"amphp/parallel": "^1.4",
"amphp/parallel-functions": "^1.1",
"amphp/amp": "^3.0",
"amphp/parallel": "^2.1",
"doctrine/collections": "^1.6.8 || ^2.0",
"gitonomy/gitlib": "^1.3",
"laravel/serializable-closure": "^1.1",
Expand All @@ -39,7 +38,6 @@
"symfony/yaml": "~5.4 || ~6.0"
},
"require-dev": {
"amphp/sync": "^v1.4",
"brianium/paratest": "^6.4",
"composer/composer": "^2.2.6",
"nikic/php-parser": "~4.13",
Expand Down
63 changes: 11 additions & 52 deletions src/Runner/Middleware/HandleRunnerMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@

namespace GrumPHP\Runner\Middleware;

use Amp\Future;
use GrumPHP\Configuration\Model\RunnerConfig;
use function Amp\call;
use Amp\CancelledException;
use Amp\LazyPromise;
use Amp\MultiReasonException;
use function Amp\Promise\wait;
use function Amp\async;
use function Amp\Future\await;
use GrumPHP\Collection\TaskResultCollection;
use GrumPHP\Runner\Promise\MultiPromise;
use GrumPHP\Runner\TaskHandler\TaskHandler;
Expand Down Expand Up @@ -37,55 +35,16 @@ public function __construct(TaskHandler $taskHandler, RunnerConfig $config)

public function handle(TaskRunnerContext $context, callable $next): TaskResultCollection
{
return new TaskResultCollection(
wait(
/**
* @return \Generator<mixed, mixed, mixed, TaskResultInterface[]>
*/
call(function () use ($context): \Generator {
/**
* @var \Throwable[] $errors
* @var TaskResultInterface[] $results
* @psalm-suppress InvalidArrayAccess
* @psalm-suppress InvalidArrayOffset
*/
[$errors, $results] = yield MultiPromise::cancelable(
$this->handleTasks($context),
function (TaskResultInterface $result) {
return $this->config->stopOnFailure() && $result->isBlocking();
}
);

// Filter out canceled items:
$errors = array_filter($errors, function (\Throwable $error): bool {
return !$error instanceof CancelledException;
});

if ($errors) {
throw new MultiReasonException($errors);
}
// TODO : CANCELLATION based on return $this->config->stopOnFailure() && $result->isBlocking();

return $results;
})
return new TaskResultCollection(
await(
array_map(
/** @return Future<TaskResultInterface> */
fn (TaskInterface $task): Future => $this->taskHandler->handle($task, $context),
$context->getTasks()->toArray()
)
)
);
}

/**
* @return array<int, LazyPromise<TaskResultInterface>>
*/
private function handleTasks(TaskRunnerContext $context): array
{
return array_map(
/**
* @return LazyPromise<TaskResultInterface>
*/
function (TaskInterface $task) use ($context) : LazyPromise {
return new LazyPromise(function () use ($task, $context) {
return $this->taskHandler->handle($task, $context);
});
},
$context->getTasks()->toArray()
);
}
}
22 changes: 12 additions & 10 deletions src/Runner/Parallel/PoolFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,28 @@

namespace GrumPHP\Runner\Parallel;

use Amp\Parallel\Worker\DefaultPool;
use Amp\Parallel\Worker\Pool;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\WorkerPool;
use GrumPHP\Configuration\Model\ParallelConfig;

class PoolFactory
{
/**
* @var ParallelConfig
*/
private $config;
private ParallelConfig $config;
private ?WorkerPool $pool = null;

public function __construct(ParallelConfig $config)
{
$this->config = $config;
}

public function create(): Pool
public function createShared(): WorkerPool
{
return new DefaultPool(
$this->config->getMaxWorkers()
);
if (!$this->pool) {
$this->pool = new ContextWorkerPool(
$this->config->getMaxWorkers()
);
}

return $this->pool;
}
}
51 changes: 51 additions & 0 deletions src/Runner/Parallel/SerializedClosureTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php
declare(strict_types=1);

namespace GrumPHP\Runner\Parallel;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use Laravel\SerializableClosure\SerializableClosure;

/**
* @template T
*/
class SerializedClosureTask implements Task
{
/**
* @param (\Closure(): T) $closure
*/
public function __construct(
private string $serializedClosure
) {
}

/**
* @template O
* @param \Closure(): O $closure
* @return self<O>
*/
public static function fromClosure(\Closure $closure): self
{
return new self(serialize(SerializableClosure::unsigned($closure)));
}

/**
* @return T
*/
public function run(Channel $channel, Cancellation $cancellation): mixed
{
$callable = \unserialize($this->serializedClosure, ['allowed_classes' => true]);

if ($callable instanceof \__PHP_Incomplete_Class) {
throw new \Error('When using a class instance as a callable, the class must be autoloadable');
}

if (!$callable instanceof \Closure) {
throw new \Error('This task can only deal with serialized Closures. You passed '.get_debug_type($callable));
}

return $callable();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@

namespace GrumPHP\Runner\TaskHandler\Middleware;

use Amp\Parallel\Worker\TaskFailureException;
use function Amp\call;
use Amp\Promise;
use function Amp\async;
use Amp\Future;
use GrumPHP\Exception\PlatformException;
use GrumPHP\Runner\TaskResult;
use GrumPHP\Runner\TaskResultInterface;
Expand All @@ -19,8 +18,8 @@ public function handle(
TaskInterface $task,
TaskRunnerContext $runnerContext,
callable $next
): Promise {
return call(
): Future {
return async(
static function () use ($task, $runnerContext): TaskResultInterface {
$taskContext = $runnerContext->getTaskContext();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace GrumPHP\Runner\TaskHandler\Middleware;

use function Amp\call;
use Amp\Promise;
use function Amp\async;
use Amp\Future;
use GrumPHP\Event\Dispatcher\EventDispatcherInterface;
use GrumPHP\Event\TaskEvent;
use GrumPHP\Event\TaskEvents;
Expand All @@ -27,18 +27,14 @@ public function __construct(EventDispatcherInterface $eventDispatcher)
$this->eventDispatcher = $eventDispatcher;
}

public function handle(TaskInterface $task, TaskRunnerContext $runnerContext, callable $next): Promise
public function handle(TaskInterface $task, TaskRunnerContext $runnerContext, callable $next): Future
{
return call(
/**
* @return \Generator<mixed, Promise<TaskResultInterface>, mixed, TaskResultInterface>
*/
function () use ($task, $runnerContext, $next): \Generator {
return async(
function () use ($task, $runnerContext, $next): TaskResultInterface {
$taskContext = $runnerContext->getTaskContext();
$this->eventDispatcher->dispatch(new TaskEvent($task, $taskContext), TaskEvents::TASK_RUN);

/** @var TaskResultInterface $result */
$result = yield $next($task, $runnerContext);
$result = $next($task, $runnerContext)->await();

if ($result->isSkipped()) {
$this->eventDispatcher->dispatch(new TaskEvent($task, $taskContext), TaskEvents::TASK_SKIPPED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace GrumPHP\Runner\TaskHandler\Middleware;

use function Amp\call;
use Amp\Promise;
use function Amp\async;
use Amp\Future;
use GrumPHP\Runner\MemoizedTaskResultMap;
use GrumPHP\Runner\TaskResult;
use GrumPHP\Runner\TaskResultInterface;
Expand All @@ -24,16 +24,12 @@ public function __construct(MemoizedTaskResultMap $resultMap)
$this->resultMap = $resultMap;
}

public function handle(TaskInterface $task, TaskRunnerContext $runnerContext, callable $next): Promise
public function handle(TaskInterface $task, TaskRunnerContext $runnerContext, callable $next): Future
{
return call(
/**
* @return \Generator<mixed, Promise<TaskResultInterface>, mixed, TaskResultInterface>
*/
function () use ($task, $runnerContext, $next) : \Generator {
return async(
function () use ($task, $runnerContext, $next) : TaskResultInterface {
try {
/** @var TaskResultInterface $result */
$result = yield $next($task, $runnerContext);
$result = $next($task, $runnerContext)->await();
} catch (\Throwable $error) {
$result = TaskResult::createFailed($task, $runnerContext->getTaskContext(), $error->getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

namespace GrumPHP\Runner\TaskHandler\Middleware;

use function Amp\call;
use Amp\Promise;
use function Amp\async;
use Amp\Future;
use GrumPHP\Runner\TaskResult;
use GrumPHP\Runner\TaskResultInterface;
use GrumPHP\Runner\TaskRunnerContext;
Expand All @@ -17,14 +17,10 @@ public function handle(
TaskInterface $task,
TaskRunnerContext $runnerContext,
callable $next
): Promise {
return call(
/**
* @return \Generator<mixed, Promise<TaskResultInterface>, mixed, TaskResultInterface>
*/
static function () use ($task, $runnerContext, $next): \Generator {
/** @var TaskResultInterface $result */
$result = yield $next($task, $runnerContext);
): Future {
return async(
static function () use ($task, $runnerContext, $next): TaskResultInterface {
$result = $next($task, $runnerContext)->await();

if ($result->isPassed() || $result->isSkipped() || $task->getConfig()->getMetadata()->isBlocking()) {
return $result;
Expand Down
Loading

0 comments on commit d64e8fa

Please sign in to comment.