Skip to content

Commit

Permalink
Supervisor plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
luzrain committed Sep 10, 2024
1 parent eb8f498 commit 0538358
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/Internal/ServerStatus/ServerStatus.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public function addWorker(ProcessInterface $worker): void
$this->workerProcesses[$worker->getId()] = new WorkerProcessInfo(
user: $worker->getUser(),
name: $worker->getName(),
count: $worker->count,
count: $worker->getProcessCount(),
);
} elseif($worker instanceof PeriodicProcessInterface) {
$this->periodicProcesses[$worker->getId()] = new PeriodicProcessInfo(
Expand Down
8 changes: 6 additions & 2 deletions src/Internal/Supervisor/Supervisor.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private function spawnWorkers(): void
{
EventLoop::queue(function (): void {
foreach ($this->workerPool->getRegisteredWorkers() as $worker) {
while (\iterator_count($this->workerPool->getAliveWorkerPids($worker)) < $worker->count) {
while (\iterator_count($this->workerPool->getAliveWorkerPids($worker)) < $worker->getProcessCount()) {
if ($this->spawnWorker($worker)) {
return;
}
Expand Down Expand Up @@ -122,7 +122,11 @@ private function onChildStop(int $pid, int $exitCode): void
default => $this->logger->warning(\sprintf('Worker %s[pid:%d] exit with code %s', $worker->getName(), $pid, $exitCode)),
};
// Restart worker
$this->spawnWorker($worker);
if (0 < $delay = $worker->getProcessRestartDelay()) {
EventLoop::delay($delay, function () use ($worker) { $this->spawnWorker($worker); });
} else {
$this->spawnWorker($worker);
}
break;
case Status::SHUTDOWN:
if ($this->workerPool->getProcessesCount() === 0) {
Expand Down
1 change: 1 addition & 0 deletions src/Internal/Supervisor/WorkerPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public function getWorkerByPid(int $pid): WorkerProcessInterface|null

/**
* @return \Iterator<WorkerProcessInterface>
* @psalm-return iterable<WorkerProcessInterface>
*/
public function getRegisteredWorkers(): \Iterator
{
Expand Down
77 changes: 77 additions & 0 deletions src/Plugin/Supervisor/Supervisor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Plugin\Supervisor;

use Amp\Future;
use Luzrain\PHPStreamServer\Internal\MasterProcess;
use Luzrain\PHPStreamServer\Plugin\PluginInterface;
use Luzrain\PHPStreamServer\WorkerProcess;
use function Amp\async;

final readonly class Supervisor implements PluginInterface
{
/**
* @param string|\Closure(WorkerProcess): void $command bash command as string or php closure
*/
public function __construct(
private string|\Closure $command,
private string|null $name = null,
private int $count = 1,
private float $restartDelay = 0.5,
private bool $reloadable = true,
private string|null $user = null,
private string|null $group = null,
) {
}

public function start(MasterProcess $masterProcess): void
{
$name = match (true) {
$this->name === null && \is_string($this->command) => $this->command,
$this->name === null => 'closure',
default => $this->name,
};

$masterProcess->addWorker(new WorkerProcess(
name: $name,
count: $this->count,
reloadable: $this->reloadable,
restartDelay: $this->restartDelay,
user: $this->user,
group: $this->group,
onStart: function (WorkerProcess $worker) {
if (\is_string($this->command)) {
$worker->exec(...$this->prepareCommand($this->command));
} else {
($this->command)($worker);
}
},
));
}

/**
* @return array{0: string, 1: list<string>}
*/
private function prepareCommand(string $command): array
{
\preg_match_all('/\'[^\']*\'|"[^"]*"|\S+/', $command, $matches);
$parts = \array_map(static fn (string $part) => \trim($part, '"\''), $matches[0]);
$binary = \array_shift($parts);
$args = $parts;

if (!\str_starts_with($binary, '/')) {
if (\is_string($absoluteBinaryPath = \shell_exec("command -v $binary"))) {
$binary = \trim($absoluteBinaryPath);
}
}

return [$binary, $args];
}

public function stop(): Future
{
return async(static fn() => null);
}
}
13 changes: 12 additions & 1 deletion src/WorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ final class WorkerProcess implements WorkerProcessInterface, ReloadStrategyAware
*/
public function __construct(
string $name = 'none',
public readonly int $count = 1,
private readonly int $count = 1,
private readonly bool $reloadable = true,
private readonly float $restartDelay = 0,
string|null $user = null,
string|null $group = null,
private \Closure|null $onStart = null,
Expand Down Expand Up @@ -174,4 +175,14 @@ public function getTrafficStatus(): TrafficStatus
{
return $this->trafficStatus;
}

public function getProcessCount(): int
{
return $this->count;
}

public function getProcessRestartDelay(): float
{
return $this->restartDelay;
}
}
10 changes: 10 additions & 0 deletions src/WorkerProcessInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ public function reload(): void;
* Start worker module in this worker
*/
public function startWorkerModule(WorkerModule $module): void;

/**
* Count of processes
*/
public function getProcessCount(): int;

/**
* Delay in seconds between processes restart
*/
public function getProcessRestartDelay(): float;
}

0 comments on commit 0538358

Please sign in to comment.