Skip to content

Commit

Permalink
Process monitoring logic relies only on information in WorkerPool and…
Browse files Browse the repository at this point in the history
… does not use ServerStatus. Master process can now emit events (not just consume)
  • Loading branch information
luzrain committed Sep 21, 2024
1 parent 89d513d commit 67e9124
Show file tree
Hide file tree
Showing 14 changed files with 207 additions and 108 deletions.
11 changes: 6 additions & 5 deletions src/Internal/MasterProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Luzrain\PHPStreamServer\Exception\AlreadyRunningException;
use Luzrain\PHPStreamServer\Exception\NotRunningException;
use Luzrain\PHPStreamServer\Exception\PHPStreamServerException;
use Luzrain\PHPStreamServer\Internal\MessageBus\MessageBus;
use Luzrain\PHPStreamServer\Internal\MessageBus\MessageHandler;
use Luzrain\PHPStreamServer\Internal\MessageBus\SocketFileMessageBus;
use Luzrain\PHPStreamServer\Internal\MessageBus\SocketFileMessageHandler;
Expand Down Expand Up @@ -39,7 +40,7 @@ final class MasterProcess
private Suspension $suspension;
private Status $status = Status::STARTING;
private ServerStatus $serverStatus;
private MessageHandler $messageHandler;
private MessageHandler&MessageBus $messageHandler;
private Supervisor $supervisor;
private Scheduler $scheduler;

Expand Down Expand Up @@ -76,7 +77,7 @@ public function __construct(
$this->pidFile = $pidFile ?? \sprintf('%s/phpss%s.pid', $runDirectory, \hash('xxh32', $this->startFile));
$this->socketFile = \sprintf('%s/phpss%s.socket', $runDirectory, \hash('xxh32', $this->startFile . 'rx'));

$this->supervisor = new Supervisor($stopTimeout);
$this->supervisor = new Supervisor($this, $stopTimeout);
$this->scheduler = new Scheduler();
$this->serverStatus = new ServerStatus();
}
Expand Down Expand Up @@ -121,7 +122,7 @@ public function run(bool $daemonize = false): int
$this->saveMasterPid();
$this->start();
$this->status = Status::RUNNING;
$this->supervisor->start($this->logger, $this->suspension, $this->getStatus(...), $this->serverStatus);
$this->supervisor->start($this->suspension, );
$this->scheduler->start($this->logger, $this->suspension, $this->getStatus(...));
$this->logger->info(Server::NAME . ' has started');

Expand Down Expand Up @@ -300,7 +301,7 @@ private function isRunning(): bool
return $this->getPid() !== 0 && \posix_kill($this->getPid(), 0);
}

private function getStatus(): Status
public function getStatus(): Status
{
return $this->status;
}
Expand Down Expand Up @@ -344,7 +345,7 @@ public function getLogger(): LoggerInterface
return $this->logger;
}

public function getMessageHandler(): MessageHandler
public function getMessageHandler(): MessageHandler&MessageBus
{
return $this->messageHandler;
}
Expand Down
19 changes: 18 additions & 1 deletion src/Internal/MessageBus/SocketFileMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
namespace Luzrain\PHPStreamServer\Internal\MessageBus;

use Amp\ByteStream\StreamException;
use Amp\Future;
use Amp\Socket\ResourceServerSocket;
use Amp\Socket\ResourceServerSocketFactory;
use Revolt\EventLoop;
use function Amp\async;

final class SocketFileMessageHandler implements MessageHandler
final class SocketFileMessageHandler implements MessageHandler, MessageBus
{
private ResourceServerSocket $socket;

Expand Down Expand Up @@ -82,4 +84,19 @@ public function unsubscribe(string $class, \Closure $closure): void
{
unset($this->subscribers[$class][\spl_object_id($closure)]);
}

public function dispatch(Message $message): Future
{
$subscribers = &$this->subscribers;

return async(static function () use (&$subscribers, &$message): mixed {
foreach ($subscribers[$message::class] ?? [] as $subscriber) {
if (null !== $subscriberReturn = $subscriber($message)) {
return $subscriberReturn;
}
}

return null;
});
}
}
18 changes: 18 additions & 0 deletions src/Internal/ServerStatus/Message/Blocked.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Internal\ServerStatus\Message;

use Luzrain\PHPStreamServer\Internal\MessageBus\Message;

/**
* Process blocked by IO operations
*/
final readonly class Blocked implements Message
{
public function __construct(
public int $pid,
) {
}
}
3 changes: 3 additions & 0 deletions src/Internal/ServerStatus/Message/Detach.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Luzrain\PHPStreamServer\Internal\MessageBus\Message;

/**
* Process detached
*/
final readonly class Detach implements Message
{
public function __construct(
Expand Down
3 changes: 3 additions & 0 deletions src/Internal/ServerStatus/Message/Heartbeat.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Luzrain\PHPStreamServer\Internal\MessageBus\Message;

/**
* Process sends this message periodically
*/
final readonly class Heartbeat implements Message
{
public function __construct(
Expand Down
18 changes: 18 additions & 0 deletions src/Internal/ServerStatus/Message/Killed.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Internal\ServerStatus\Message;

use Luzrain\PHPStreamServer\Internal\MessageBus\Message;

/**
* Process killed
*/
final readonly class Killed implements Message
{
public function __construct(
public int $pid,
) {
}
}
3 changes: 3 additions & 0 deletions src/Internal/ServerStatus/Message/Spawn.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

use Luzrain\PHPStreamServer\Internal\MessageBus\Message;

/**
* Process spawned
*/
final readonly class Spawn implements Message
{
public function __construct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Luzrain\PHPStreamServer\Internal\ServerStatus;

final class Process
final class RunningProcess
{
/**
* @param array<int, Connection> $connections
Expand All @@ -14,7 +14,6 @@ public function __construct(
public string $user,
public string $name,
public \DateTimeImmutable $startedAt,
public int $time = 0,
public int $memory = 0,
public bool $detached = false,
public int $requests = 0,
Expand Down
56 changes: 18 additions & 38 deletions src/Internal/ServerStatus/ServerStatus.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

use Luzrain\PHPStreamServer\Internal\Functions;
use Luzrain\PHPStreamServer\Internal\MessageBus\MessageHandler;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Blocked;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Connect;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Detach;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Disconnect;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Heartbeat;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Killed;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\RequestInc;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\RxtInc;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Message\Spawn;
Expand All @@ -18,7 +20,6 @@
use Luzrain\PHPStreamServer\ProcessInterface;
use Luzrain\PHPStreamServer\Server;
use Luzrain\PHPStreamServer\WorkerProcessInterface;
use Revolt\EventLoop;
use Revolt\EventLoop\DriverFactory;
use function Amp\weakClosure;

Expand All @@ -28,9 +29,6 @@
*/
final class ServerStatus
{
private const BLOCKED_LABEL_PERSISTENCE = 30;
public const BLOCK_WARNING_TRESHOLD = 6;

public readonly string $version;
public readonly string $phpVersion;
public readonly string $eventLoop;
Expand All @@ -49,15 +47,10 @@ final class ServerStatus
private array $periodicProcesses = [];

/**
* @var array<int, Process>
* @var array<int, RunningProcess>
*/
private array $processes = [];

/**
* @var array<int, true>
*/
private array $blockedProcesses = [];

public function __construct()
{
$this->version = Server::VERSION;
Expand All @@ -71,7 +64,7 @@ public function __construct()
public function subscribeToWorkerMessages(MessageHandler $handler): void
{
$handler->subscribe(Spawn::class, weakClosure(function (Spawn $message): void {
$this->processes[$message->pid] = new Process(
$this->processes[$message->pid] = new RunningProcess(
pid: $message->pid,
user: $message->user,
name: $message->name,
Expand All @@ -85,11 +78,19 @@ public function subscribeToWorkerMessages(MessageHandler $handler): void
}

$this->processes[$message->pid]->memory = $message->memory;
$this->processes[$message->pid]->time = $message->time;
$this->processes[$message->pid]->blocked = false;
}));

if (!isset($this->blockedProcesses[$message->pid])) {
$this->processes[$message->pid]->blocked = false;
$handler->subscribe(Blocked::class, weakClosure(function (Blocked $message): void {
if (!isset($this->processes[$message->pid]) || $this->processes[$message->pid]?->detached === true) {
return;
}

$this->processes[$message->pid]->blocked = true;
}));

$handler->subscribe(Killed::class, weakClosure(function (Killed $message): void {
unset($this->processes[$message->pid]);
}));

$handler->subscribe(Detach::class, weakClosure(function (Detach $message): void {
Expand All @@ -102,7 +103,6 @@ public function subscribeToWorkerMessages(MessageHandler $handler): void
$this->processes[$message->pid]->requests = 0;
$this->processes[$message->pid]->rx = 0;
$this->processes[$message->pid]->tx = 0;
$this->processes[$message->pid]->time = 0;
$this->processes[$message->pid]->blocked = false;
$this->processes[$message->pid]->connections = [];
}));
Expand Down Expand Up @@ -152,21 +152,6 @@ public function setRunning(bool $isRunning = true): void
$this->isRunning = $isRunning;
}

public function deleteProcess(int $pid): void
{
unset($this->processes[$pid]);
}

public function markProcessAsBlocked(int $pid): void
{
$this->blockedProcesses[$pid] = true;
$this->processes[$pid]->blocked = true;

EventLoop::delay(self::BLOCKED_LABEL_PERSISTENCE, function () use ($pid) {
unset($this->blockedProcesses[$pid]);
});
}

public function getWorkersCount(): int
{
return \count($this->workerProcesses);
Expand Down Expand Up @@ -199,7 +184,7 @@ public function getProcessesCount(): int
}

/**
* @return list<Process>
* @return list<RunningProcess>
*/
public function getProcesses(): array
{
Expand All @@ -208,16 +193,11 @@ public function getProcesses(): array

public function getTotalMemory(): int
{
return (int) \array_sum(\array_map(static fn(Process $p): int => $p->memory, $this->processes));
return (int) \array_sum(\array_map(static fn(RunningProcess $p): int => $p->memory, $this->processes));
}

public function getTotalConnections(): int
{
return (int) \array_sum(\array_map(static fn(Process $p): array => $p->connections, $this->processes));
}

public function isDetached(int $pid): bool
{
return $this->processes[$pid]->detached ?? false;
return (int) \array_sum(\array_map(static fn(RunningProcess $p): array => $p->connections, $this->processes));
}
}
22 changes: 22 additions & 0 deletions src/Internal/Supervisor/Process.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Internal\Supervisor;

/**
* @internal
*/
final class Process
{
public int $pid;
public bool $detached = false;
public bool $blocked = false;
public int $time;

public function __construct(int $pid)
{
$this->pid = $pid;
$this->time = \hrtime(true);
}
}
Loading

0 comments on commit 67e9124

Please sign in to comment.