Skip to content

Commit

Permalink
Processes statistic collection fully rewrited. Now all the statistics…
Browse files Browse the repository at this point in the history
… about the all processes collected in master process now. Workers send it asyncroniuosly each time that some information updated.
  • Loading branch information
luzrain committed Jun 18, 2024
1 parent 1322a39 commit 2ab9450
Show file tree
Hide file tree
Showing 36 changed files with 679 additions and 617 deletions.
20 changes: 4 additions & 16 deletions src/Command/ConnectionsCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
use Luzrain\PHPStreamServer\Console\Table;
use Luzrain\PHPStreamServer\Internal\Functions;
use Luzrain\PHPStreamServer\Internal\MasterProcess;
use Luzrain\PHPStreamServer\Server\Connection;
use Luzrain\PHPStreamServer\Server\Connection\ActiveConnection;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Connection;

final class ConnectionsCommand implements Command
{
Expand All @@ -30,32 +29,21 @@ public function getHelp(): string

public function run(array $arguments): int
{
$status = $this->masterProcess->getStatus();
$connections = [];
$pidMap = new \WeakMap();

foreach ($status->processes as $process) {
foreach ($process->connections ?? [] as $connection) {
$connections[] = $connection;
$pidMap[$connection] = $process->pid;
}
}
$connections = $this->masterProcess->getServerStatus()->connections;

echo "❯ Connections\n";

if (\count($connections) > 0) {
echo (new Table(indent: 1))
->setHeaderRow([
'Pid',
'Transport',
'Local address',
'Remote address',
'Bytes (RX / TX)',
])
->addRows(\array_map(array: $connections, callback: function (Connection $c) use ($pidMap) {
->addRows(\array_map(array: $connections, callback: function (Connection $c) {
return [
(string) $pidMap[$c],
'tcp',
$c->pid,
$c->localIp . ':' . $c->localPort,
$c->remoteIp . ':' . $c->remotePort,
\sprintf('(%s / %s)', Functions::humanFileSize($c->rx), Functions::humanFileSize($c->tx)),
Expand Down
23 changes: 9 additions & 14 deletions src/Command/ProcessesCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Luzrain\PHPStreamServer\Console\Table;
use Luzrain\PHPStreamServer\Internal\Functions;
use Luzrain\PHPStreamServer\Internal\MasterProcess;
use Luzrain\PHPStreamServer\Internal\Status\WorkerProcessStatus;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Process;

final class ProcessesCommand implements Command
{
Expand All @@ -29,11 +29,11 @@ public function getHelp(): string

public function run(array $arguments): int
{
$status = $this->masterProcess->getStatus();
$status = $this->masterProcess->getServerStatus();

echo "❯ Processes\n";

if ($status->processesCount > 0) {
if ($status->getProcessesCount() > 0) {
echo (new Table(indent: 1))
->setHeaderRow([
'Pid',
Expand All @@ -45,23 +45,18 @@ public function run(array $arguments): int
'Requests',
'Bytes (RX / TX)',
])
->addRows(\array_map(array: $status->processes, callback: function (WorkerProcessStatus $w) {
$connections = \count($w->connections ?? []);
$packages = $w->connectionStatistics?->getPackages() ?? 0;
$rx = $w->connectionStatistics?->getRx() ?? 0;
$tx = $w->connectionStatistics?->getTx() ?? 0;

->addRows(\array_map(array: $status->processes, callback: function (Process $w) {
return [
$w->pid,
$w->user === 'root' ? $w->user : "<color;fg=gray>{$w->user}</>",
$w->memory > 0 ? Functions::humanFileSize($w->memory) : '<color;fg=gray>??</>',
$w->name,
$w->listen ?? '<color;fg=gray>-</>',
$connections === 0 ? '<color;fg=gray>0</>' : $connections,
$packages === 0 ? '<color;fg=gray>0</>' : $packages,
$rx === 0 && $tx === 0
? \sprintf('<color;fg=gray>(%s / %s)</>', Functions::humanFileSize($rx), Functions::humanFileSize($tx))
: \sprintf('(%s / %s)', Functions::humanFileSize($rx), Functions::humanFileSize($tx)),
$w->connections === 0 ? '<color;fg=gray>0</>' : $w->connections,
$w->requests === 0 ? '<color;fg=gray>0</>' : $w->requests,
$w->rx === 0 && $w->tx === 0
? \sprintf('<color;fg=gray>(%s / %s)</>', Functions::humanFileSize($w->rx), Functions::humanFileSize($w->tx))
: \sprintf('(%s / %s)', Functions::humanFileSize($w->rx), Functions::humanFileSize($w->tx)),
];
}));
} else {
Expand Down
10 changes: 5 additions & 5 deletions src/Command/StartCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Luzrain\PHPStreamServer\Console\Command;
use Luzrain\PHPStreamServer\Console\Table;
use Luzrain\PHPStreamServer\Internal\MasterProcess;
use Luzrain\PHPStreamServer\Internal\Status\WorkerStatus;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Worker;
use Luzrain\PHPStreamServer\Server;

final class StartCommand implements Command
Expand All @@ -30,28 +30,28 @@ public function getHelp(): string
public function run(array $arguments): int
{
$isDaemon = \in_array('-d', $arguments, true) || \in_array('--daemon', $arguments, true);
$status = $this->masterProcess->getStatus();
$status = $this->masterProcess->getServerStatus();

echo "" . Server::TITLE . "\n";
echo (new Table(indent: 1))
->addRows([
['PHP version:', $status->phpVersion],
[Server::NAME . ' version:', $status->version],
['Event loop driver:', $status->eventLoop],
['Workers count:', $status->workersCount],
['Workers count:', $status->getWorkersCount()],
])
;

echo "❯ Workers\n";

if ($status->workersCount > 0) {
if ($status->getWorkersCount() > 0) {
echo (new Table(indent: 1))
->setHeaderRow([
'User',
'Worker',
'Count',
])
->addRows(\array_map(function (WorkerStatus $w) {
->addRows(\array_map(function (Worker $w) {
return [
$w->user,
$w->name,
Expand Down
10 changes: 6 additions & 4 deletions src/Command/StatusCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public function getHelp(): string

public function run(array $arguments): int
{
$status = $this->masterProcess->getStatus();
$status = $this->masterProcess->getServerStatus();
$processesCount = $status->getProcessesCount();
$totalMemory = $status->getTotalMemory();

echo ($status->isRunning ? '<color;fg=green>●</> ' : '') . Server::TITLE . "\n";

Expand All @@ -43,9 +45,9 @@ public function run(array $arguments): int
? '<color;fg=green>active</> since ' . ($status->startedAt?->format(\DateTimeInterface::RFC7231) ?? '?')
: 'inactive',
],
['Workers count:', $status->workersCount],
['Processes count:', $status->processesCount > 0 || $status->isRunning ? $status->processesCount : '<color;fg=gray>0</>'],
['Memory usage:', $status->totalMemory > 0 || $status->isRunning ? Functions::humanFileSize($status->totalMemory) : '<color;fg=gray>0</>'],
['Workers count:', $status->getWorkersCount()],
['Processes count:', $processesCount > 0 || $status->isRunning ? $processesCount : '<color;fg=gray>0</>'],
['Memory usage:', $totalMemory > 0 || $status->isRunning ? Functions::humanFileSize($totalMemory) : '<color;fg=gray>0</>'],
]);

return 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Command/StatusJsonCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public function getHelp(): string

public function run(array $arguments): int
{
$status = $this->masterProcess->getStatus();
$status = $this->masterProcess->getServerStatus();
echo \json_encode($status);

return 0;
Expand Down
8 changes: 4 additions & 4 deletions src/Command/WorkersCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Luzrain\PHPStreamServer\Console\Command;
use Luzrain\PHPStreamServer\Console\Table;
use Luzrain\PHPStreamServer\Internal\MasterProcess;
use Luzrain\PHPStreamServer\Internal\Status\WorkerStatus;
use Luzrain\PHPStreamServer\Internal\ServerStatus\Worker;

final class WorkersCommand implements Command
{
Expand All @@ -28,18 +28,18 @@ public function getHelp(): string

public function run(array $arguments): int
{
$status = $this->masterProcess->getStatus();
$status = $this->masterProcess->getServerStatus();

echo "❯ Workers\n";

if ($status->workersCount > 0) {
if ($status->getWorkersCount() > 0) {
echo (new Table(indent: 1))
->setHeaderRow([
'User',
'Worker',
'Count',
])
->addRows(\array_map(array: $status->workers, callback: fn(WorkerStatus $w) => [
->addRows(\array_map(array: $status->workers, callback: fn(Worker $w) => [
$w->user,
$w->name,
$w->count,
Expand Down
12 changes: 6 additions & 6 deletions src/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use Amp\Socket\InternetAddress;
use Amp\Socket\ServerTlsContext;
use Luzrain\PHPStreamServer\Internal\ReloadStrategyTrigger;
use Luzrain\PHPStreamServer\Server\Http\ClientExceptionHandleMiddleware;
use Luzrain\PHPStreamServer\Server\Http\ReloadStrategyTriggerMiddleware;
use Luzrain\PHPStreamServer\Server\Http\RequestsCounterMiddleware;
use Luzrain\PHPStreamServer\Server\TrafficStatisticStore;
use Luzrain\PHPStreamServer\Internal\ServerStatus\TrafficStatus;
use Luzrain\PHPStreamServer\Server\Http\AddServerHeadersMiddleware;
use Luzrain\PHPStreamServer\Server\Http\HttpErrorHandler;
use Luzrain\PHPStreamServer\Server\Http\ClientExceptionHandleMiddleware;
use Luzrain\PHPStreamServer\Server\Http\HttpClientFactory;
use Luzrain\PHPStreamServer\Server\Http\HttpErrorHandler;
use Luzrain\PHPStreamServer\Server\Http\HttpServerSocketFactory;
use Luzrain\PHPStreamServer\Server\Http\ReloadStrategyTriggerMiddleware;
use Luzrain\PHPStreamServer\Server\Http\RequestsCounterMiddleware;
use Psr\Log\LoggerInterface;

final readonly class HttpServer
Expand Down Expand Up @@ -67,7 +67,7 @@ public function __construct(

public function start(
LoggerInterface $logger,
TrafficStatisticStore $trafficStatisticStore,
TrafficStatus $trafficStatisticStore,
ReloadStrategyTrigger $reloadStrategyTrigger,
): void {
$serverSocketFactory = new HttpServerSocketFactory($this->connectionLimit, $trafficStatisticStore);
Expand Down
126 changes: 126 additions & 0 deletions src/Internal/InterprocessPipe/Interprocess.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php

declare(strict_types=1);

namespace Luzrain\PHPStreamServer\Internal\InterprocessPipe;

use Revolt\EventLoop;

final class Interprocess
{
private const READ_BUFFER = 65536;

/**
* @var resource
*/
private mixed $readerResource;

/**
* @var resource
*/
private mixed $writerResource;

/**
* @var array<\Closure>
*/
private array $subscribers = [];

private string $readBuffer = '';

private string $writeBuffer = '';

private string $callbackId;

public function __construct()
{
[$this->readerResource, $this->writerResource] = \stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);

\stream_set_blocking($this->readerResource, false);
\stream_set_blocking($this->writerResource, false);
\stream_set_read_buffer($this->readerResource, 0);
\stream_set_write_buffer($this->writerResource, 0);

EventLoop::onReadable($this->readerResource, function () {
foreach ($this->read() as $payload) {
/** @var object $message */
$message = \unserialize($payload);
foreach ($this->subscribers[$message::class] ?? [] as $subscriber) {
$subscriber($message);
}
}
});
}

/**
* @return \Generator<object>
*/
private function read(): \Generator
{
while (false !== $chunk = \stream_get_line($this->readerResource, self::READ_BUFFER, "\r\n")) {
if (\str_ends_with($chunk, 'END')) {
yield \substr($this->readBuffer . $chunk, 0, -3);
$this->readBuffer = '';
} else {
$this->readBuffer .= $chunk;
}
}
}

/**
* @param non-empty-string $bytes
*/
private function write(string $bytes): void
{
if ($this->writeBuffer !== '') {
$this->writeBuffer .= $bytes;

return;
}

$length = \strlen($bytes);
$written = (int) \fwrite($this->writerResource, $bytes);

if ($length === $written) {
return;
}

if (!isset($this->callbackId)) {
$writeBuffer = &$this->writeBuffer;
$this->callbackId = EventLoop::disable(EventLoop::onWritable(
$this->writerResource,
static function ($callbackId, $writeResource) use (&$writeBuffer) {
$written = (int) \fwrite($writeResource, $writeBuffer);
$writeBuffer = \substr($writeBuffer, $written);
if ($writeBuffer === '') {
EventLoop::disable($callbackId);
}
},
));
}

$this->writeBuffer = \substr($bytes, $written);
EventLoop::enable($this->callbackId);
}

/**
* @return \Closure(object): void
*/
public function createPublisherForWorkerProcess(): \Closure
{
$this->subscribers = [];

return function (object $message): void {
$this->write(\serialize($message) . "END\r\n");
};
}

/**
* @template T of object
* @param class-string<T> $class
* @param \Closure(T): void $closure
*/
public function subscribe(string $class, \Closure $closure): void
{
$this->subscribers[$class][] = $closure;
}
}
Loading

0 comments on commit 2ab9450

Please sign in to comment.