Skip to content

Commit

Permalink
Add DelegatingWorkerPool
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Sep 1, 2024
1 parent b4f3978 commit 7f8ca54
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 67 deletions.
128 changes: 128 additions & 0 deletions src/Worker/DelegatingWorkerPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php declare(strict_types=1);

namespace Amp\Parallel\Worker;

use Amp\Cancellation;
use Amp\DeferredFuture;
use Amp\Parallel\Worker\Internal\PooledWorker;

final class DelegatingWorkerPool implements WorkerPool
{
/** @var array<int, Worker> */
private array $workerStorage = [];

private int $pendingWorkerCount = 0;

/** @var \SplQueue<DeferredFuture<Worker|null>> */
private readonly \SplQueue $waiting;

/**
* @param int $limit Maximum number of workers to use from the delegate pool.
*/
public function __construct(private readonly WorkerPool $pool, private readonly int $limit)
{
$this->waiting = new \SplQueue();
}

public function isRunning(): bool
{
return $this->pool->isRunning();
}

public function isIdle(): bool
{
return $this->pool->isIdle();
}

public function submit(Task $task, ?Cancellation $cancellation = null): Execution
{
$worker = $this->selectWorker();

$execution = $worker->submit($task, $cancellation);

$execution->getFuture()->finally(fn () => $this->push($worker))->ignore();

return $execution;
}

private function selectWorker(): Worker
{
do {
if (\count($this->workerStorage) + $this->pendingWorkerCount < $this->limit) {
$this->pendingWorkerCount++;

try {
$worker = $this->pool->getWorker();
} finally {
$this->pendingWorkerCount--;
}
} else {
/** @var DeferredFuture<Worker|null> $waiting */
$waiting = new DeferredFuture();
$this->waiting->push($waiting);

$worker = $waiting->getFuture()->await();
if (!$worker?->isRunning()) {
continue;
}
}

$this->workerStorage[\spl_object_id($worker)] = $worker;

return $worker;
} while (true);
}

private function push(Worker $worker): void
{
unset($this->workerStorage[\spl_object_id($worker)]);

if (!$this->waiting->isEmpty()) {
$deferredFuture = $this->waiting->dequeue();
$deferredFuture->complete($worker->isRunning() ? $worker : null);
}
}

public function shutdown(): void
{
if (!$this->waiting->isEmpty()) {
$exception = new WorkerException('The pool was shutdown before a worker could be obtained');
$this->clearWaiting($exception);
}

$this->pool->shutdown();
}

public function kill(): void
{
if (!$this->waiting->isEmpty()) {
$exception = new WorkerException('The pool was killed before a worker could be obtained');
$this->clearWaiting($exception);
}

$this->pool->kill();
}

private function clearWaiting(\Throwable $exception): void
{
while (!$this->waiting->isEmpty()) {
$deferredFuture = $this->waiting->dequeue();
$deferredFuture->error($exception);
}
}

public function getWorker(): Worker
{
return new PooledWorker($this->selectWorker(), $this->push(...));
}

public function getWorkerCount(): int
{
return \min($this->limit, $this->pool->getWorkerCount());
}

public function getIdleWorkerCount(): int
{
return \min($this->limit, $this->pool->getIdleWorkerCount());
}
}
15 changes: 3 additions & 12 deletions test/Worker/AbstractPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
use Amp\Future;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Test\Worker\Fixtures\TestTask;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\Execution;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\Worker;
Expand Down Expand Up @@ -180,15 +178,8 @@ protected function createWorker(?string $autoloadPath = null): Worker
return $this->createPool(autoloadPath: $autoloadPath);
}

protected function createPool(
abstract protected function createPool(
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
?string $autoloadPath = null
): WorkerPool {
$factory = new ContextWorkerFactory(
bootstrapPath: $autoloadPath,
contextFactory: $this->createContextFactory(),
);

return new ContextWorkerPool($max, $factory);
}
?string $autoloadPath = null,
): WorkerPool;
}
14 changes: 1 addition & 13 deletions test/Worker/AbstractWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
use Amp\CancelledException;
use Amp\DeferredCancellation;
use Amp\Future;
use Amp\Parallel\Context\ContextFactory;
use Amp\Parallel\Context\StatusError;
use Amp\Parallel\Test\Worker\Fixtures\CommunicatingTask;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\TaskCancelledException;
use Amp\Parallel\Worker\TaskFailureError;
Expand Down Expand Up @@ -382,15 +380,5 @@ public function testCommunicatingJob(): void
self::assertSame('out', $execution->await($cancellation));
}

protected function createWorker(?string $autoloadPath = null): Worker
{
$factory = new ContextWorkerFactory(
bootstrapPath: $autoloadPath,
contextFactory: $this->createContextFactory(),
);

return $factory->create();
}

abstract protected function createContextFactory(): ContextFactory;
abstract protected function createWorker(?string $autoloadPath = null): Worker;
}
24 changes: 24 additions & 0 deletions test/Worker/DelegatingWorkerPoolTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php declare(strict_types=1);

namespace Amp\Parallel\Test\Worker;

use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\DelegatingWorkerPool;
use Amp\Parallel\Worker\WorkerPool;

class DelegatingWorkerPoolTest extends AbstractPoolTest
{
protected function createPool(
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
?string $autoloadPath = null,
): WorkerPool {
$pool = new ContextWorkerPool(
limit: $max * 2,
factory: new ContextWorkerFactory($autoloadPath, contextFactory: new ProcessContextFactory()),
);

return new DelegatingWorkerPool($pool, $max);
}
}
24 changes: 13 additions & 11 deletions test/Worker/ProcessPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@

namespace Amp\Parallel\Test\Worker;

use Amp\Cancellation;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextFactory;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\WorkerPool;

class ProcessPoolTest extends AbstractPoolTest
{
public function createContextFactory(): ContextFactory
{
return new class implements ContextFactory {
public function start(array|string $script, ?Cancellation $cancellation = null): Context
{
return (new ProcessContextFactory())->start($script, cancellation: $cancellation);
}
};
protected function createPool(
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
?string $autoloadPath = null,
): WorkerPool {
$factory = new ContextWorkerFactory(
bootstrapPath: $autoloadPath,
contextFactory: new ProcessContextFactory(),
);

return new ContextWorkerPool($max, $factory);
}
}
19 changes: 9 additions & 10 deletions test/Worker/ProcessWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@

namespace Amp\Parallel\Test\Worker;

use Amp\Cancellation;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextFactory;
use Amp\Parallel\Context\ProcessContextFactory;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\Worker;

class ProcessWorkerTest extends AbstractWorkerTest
{
public function createContextFactory(): ContextFactory
protected function createWorker(?string $autoloadPath = null): Worker
{
return new class implements ContextFactory {
public function start(array|string $script, ?Cancellation $cancellation = null): Context
{
return (new ProcessContextFactory())->start($script, cancellation: $cancellation);
}
};
$factory = new ContextWorkerFactory(
bootstrapPath: $autoloadPath,
contextFactory: new ProcessContextFactory(),
);

return $factory->create();
}
}
24 changes: 13 additions & 11 deletions test/Worker/ThreadPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,27 @@

namespace Amp\Parallel\Test\Worker;

use Amp\Cancellation;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextFactory;
use Amp\Parallel\Context\ThreadContext;
use Amp\Parallel\Context\ThreadContextFactory;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\ContextWorkerPool;
use Amp\Parallel\Worker\WorkerPool;

class ThreadPoolTest extends AbstractPoolTest
{
public function createContextFactory(): ContextFactory
{
protected function createPool(
int $max = WorkerPool::DEFAULT_WORKER_LIMIT,
?string $autoloadPath = null,
): WorkerPool {
if (!ThreadContext::isSupported()) {
$this->markTestSkipped('ext-parallel required');
}

return new class implements ContextFactory {
public function start(array|string $script, ?Cancellation $cancellation = null): Context
{
return (new ThreadContextFactory())->start($script, cancellation: $cancellation);
}
};
$factory = new ContextWorkerFactory(
bootstrapPath: $autoloadPath,
contextFactory: new ThreadContextFactory(),
);

return new ContextWorkerPool($max, $factory);
}
}
19 changes: 9 additions & 10 deletions test/Worker/ThreadWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,24 @@

namespace Amp\Parallel\Test\Worker;

use Amp\Cancellation;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextFactory;
use Amp\Parallel\Context\ThreadContext;
use Amp\Parallel\Context\ThreadContextFactory;
use Amp\Parallel\Worker\ContextWorkerFactory;
use Amp\Parallel\Worker\Worker;

class ThreadWorkerTest extends AbstractWorkerTest
{
public function createContextFactory(): ContextFactory
protected function createWorker(?string $autoloadPath = null): Worker
{
if (!ThreadContext::isSupported()) {
$this->markTestSkipped('ext-parallel required');
}

return new class implements ContextFactory {
public function start(array|string $script, ?Cancellation $cancellation = null): Context
{
return (new ThreadContextFactory())->start($script, cancellation: $cancellation);
}
};
$factory = new ContextWorkerFactory(
bootstrapPath: $autoloadPath,
contextFactory: new ThreadContextFactory(),
);

return $factory->create();
}
}

0 comments on commit 7f8ca54

Please sign in to comment.