Skip to content

Commit

Permalink
Improved MessageBus to properly handle concurrent messages (when spaw…
Browse files Browse the repository at this point in the history
…ning hundreds of processes).
  • Loading branch information
luzrain committed Sep 10, 2024
1 parent e76aa73 commit eb8f498
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 26 deletions.
19 changes: 12 additions & 7 deletions src/Internal/MessageBus/SocketFileMessageBus.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,36 @@
namespace Luzrain\PHPStreamServer\Internal\MessageBus;

use Amp\Future;
use Amp\Socket\ConnectException;
use Amp\Socket\DnsSocketConnector;
use Amp\Socket\RetrySocketConnector;
use Amp\Socket\SocketConnector;
use Amp\Socket\StaticSocketConnector;
use function Amp\async;
use function Amp\delay;

final class SocketFileMessageBus implements MessageBus
{
private SocketConnector $connector;

public function __construct(string $socketFile)
{
$this->connector = new RetrySocketConnector(
delegate: new StaticSocketConnector("unix://{$socketFile}", new DnsSocketConnector()),
maxAttempts: 3,
exponentialBackoffBase: 1,
);
$this->connector = new StaticSocketConnector("unix://{$socketFile}", new DnsSocketConnector());
}

public function dispatch(Message $message): Future
{
$connector = &$this->connector;

return async(static function () use (&$connector, &$message): mixed {
$socket = $connector->connect('');
while (true) {
try {
$socket = $connector->connect('');
break;
} catch (ConnectException) {
delay(0.01);
}
}

$socket->write(\serialize($message));
$buffer = $socket->read(limit: PHP_INT_MAX);

Expand Down
15 changes: 14 additions & 1 deletion src/Internal/MessageBus/SocketFileMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Luzrain\PHPStreamServer\Internal\MessageBus;

use Amp\ByteStream\StreamException;
use Amp\Socket\ResourceServerSocket;
use Amp\Socket\ResourceServerSocketFactory;
use Revolt\EventLoop;
Expand All @@ -26,6 +27,12 @@ public function __construct(string $socketFile)
EventLoop::queue(static function () use (&$server, &$subscribers) {
while ($socket = $server->accept()) {
$data = $socket->read(limit: PHP_INT_MAX);

// if socket is not readable anymore
if ($data === null) {
continue;
}

$message = \unserialize($data);
\assert($message instanceof Message);
$return = null;
Expand All @@ -37,7 +44,13 @@ public function __construct(string $socketFile)
}
}

$socket->write(\serialize($return));
try {
$socket->write(\serialize($return));
} catch (StreamException) {
// if socket is not writable anymore
continue;
}

$socket->end();
}
});
Expand Down
28 changes: 18 additions & 10 deletions src/Internal/ServerStatus/ServerStatus.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public function __construct()

public function subscribeToWorkerMessages(MessageHandler $handler): void
{
$handler->subscribe(Spawn::class, weakClosure(function (Spawn $message) {
$handler->subscribe(Spawn::class, weakClosure(function (Spawn $message): void {
$this->processes[$message->pid] = new Process(
pid: $message->pid,
user: $message->user,
Expand All @@ -79,7 +79,11 @@ public function subscribeToWorkerMessages(MessageHandler $handler): void
);
}));

$handler->subscribe(Heartbeat::class, weakClosure(function (Heartbeat $message) {
$handler->subscribe(Heartbeat::class, weakClosure(function (Heartbeat $message): void {
if (!isset($this->processes[$message->pid]) || $this->processes[$message->pid]?->detached === true) {
return;
}

$this->processes[$message->pid]->memory = $message->memory;
$this->processes[$message->pid]->time = $message->time;

Expand All @@ -88,7 +92,11 @@ public function subscribeToWorkerMessages(MessageHandler $handler): void
}
}));

$handler->subscribe(Detach::class, weakClosure(function (Detach $message) {
$handler->subscribe(Detach::class, weakClosure(function (Detach $message): void {
if (!isset($this->processes[$message->pid])) {
return;
}

$this->processes[$message->pid]->detached = true;
$this->processes[$message->pid]->memory = 0;
$this->processes[$message->pid]->requests = 0;
Expand All @@ -99,25 +107,25 @@ public function subscribeToWorkerMessages(MessageHandler $handler): void
$this->processes[$message->pid]->connections = [];
}));

$handler->subscribe(RxtInc::class, weakClosure(function (RxtInc $message) {
$handler->subscribe(RxtInc::class, weakClosure(function (RxtInc $message): void {
$this->processes[$message->pid]->connections[$message->connectionId]->rx += $message->rx;
$this->processes[$message->pid]->rx += $message->rx;
}));

$handler->subscribe(TxtInc::class, weakClosure(function (TxtInc $message) {
$handler->subscribe(TxtInc::class, weakClosure(function (TxtInc $message): void {
$this->processes[$message->pid]->connections[$message->connectionId]->tx += $message->tx;
$this->processes[$message->pid]->tx += $message->tx;
}));

$handler->subscribe(RequestInc::class, weakClosure(function (RequestInc $message) {
$handler->subscribe(RequestInc::class, weakClosure(function (RequestInc $message): void {
$this->processes[$message->pid]->requests += $message->requests;
}));

$handler->subscribe(Connect::class, weakClosure(function (Connect $message) {
$handler->subscribe(Connect::class, weakClosure(function (Connect $message): void {
$this->processes[$message->pid]->connections[$message->connectionId] = $message->connection;
}));

$handler->subscribe(Disconnect::class, weakClosure(function (Disconnect $message) {
$handler->subscribe(Disconnect::class, weakClosure(function (Disconnect $message): void {
unset($this->processes[$message->pid]->connections[$message->connectionId]);
}));
}
Expand Down Expand Up @@ -200,12 +208,12 @@ public function getProcesses(): array

public function getTotalMemory(): int
{
return (int) \array_sum(\array_map(static fn(Process $p) => $p->memory, $this->processes));
return (int) \array_sum(\array_map(static fn(Process $p): int => $p->memory, $this->processes));
}

public function getTotalConnections(): int
{
return (int) \array_sum(\array_map(static fn(Process $p) => $p->connections, $this->processes));
return (int) \array_sum(\array_map(static fn(Process $p): array => $p->connections, $this->processes));
}

public function isDetached(int $pid): bool
Expand Down
24 changes: 16 additions & 8 deletions src/WorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Luzrain\PHPStreamServer;

use Amp\DeferredFuture;
use Luzrain\PHPStreamServer\Internal\ErrorHandler;
use Luzrain\PHPStreamServer\Internal\MessageBus\MessageBus;
use Luzrain\PHPStreamServer\Internal\MessageBus\SocketFileMessageBus;
Expand Down Expand Up @@ -31,6 +32,7 @@ final class WorkerProcess implements WorkerProcessInterface, ReloadStrategyAware
private TrafficStatus $trafficStatus;
private ReloadStrategyTrigger $reloadStrategyTrigger;
private MessageBus $messageBus;
private DeferredFuture|null $startFuture = null;

/**
* @param null|\Closure(self):void $onStart
Expand Down Expand Up @@ -61,6 +63,8 @@ private function initWorker(): void
\cli_set_process_title(\sprintf('%s: worker process %s', Server::NAME, $this->name));
}

$this->startFuture = new DeferredFuture();

/** @psalm-suppress InaccessibleProperty */
$this->pid = \posix_getpid();

Expand All @@ -83,14 +87,12 @@ private function initWorker(): void
EventLoop::onSignal(SIGTERM, fn() => $this->stop());
EventLoop::onSignal(SIGUSR1, fn() => $this->reload());

EventLoop::queue(function () {
$this->messageBus->dispatch(new Spawn(
pid: $this->pid,
user: $this->getUser(),
name: $this->name,
startedAt: new \DateTimeImmutable('now'),
));
});
$this->messageBus->dispatch(new Spawn(
pid: $this->pid,
user: $this->getUser(),
name: $this->name,
startedAt: new \DateTimeImmutable('now'),
))->await();

EventLoop::queue($heartbeat = function (): void {
$this->messageBus->dispatch(new Heartbeat(
Expand All @@ -107,10 +109,14 @@ private function initWorker(): void
\gc_collect_cycles();
\gc_mem_caches();
});

$this->startFuture->complete();
$this->startFuture = null;
}

public function stop(int $code = 0): void
{
$this->startFuture?->getFuture()->await();
$this->exitCode = $code;
try {
$this->onStop !== null && ($this->onStop)($this);
Expand All @@ -125,6 +131,7 @@ public function reload(): void
return;
}

$this->startFuture?->getFuture()->await();
$this->exitCode = self::RELOAD_EXIT_CODE;
try {
$this->onReload !== null && ($this->onReload)($this);
Expand All @@ -135,6 +142,7 @@ public function reload(): void

public function detach(): void
{
$this->startFuture?->getFuture()->await();
$this->messageBus->dispatch(new Detach($this->pid))->await();
$this->detachByTrait();
unset($this->trafficStatus);
Expand Down

0 comments on commit eb8f498

Please sign in to comment.