Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Enable WebSockets via Swoole #977

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/createSwooleServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

$sock = filter_var($host, FILTER_VALIDATE_IP, FILTER_FLAG_IPV4) ? SWOOLE_SOCK_TCP : SWOOLE_SOCK_TCP6;

$server = new Swoole\Http\Server(
$serverClass = ($config['swoole']['enableWebSockets'] ?? false) ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';

$server = new $serverClass(
$host,
$serverState['port'] ?? 8000,
$config['swoole']['mode'] ?? SWOOLE_PROCESS,
Expand Down
63 changes: 59 additions & 4 deletions bin/swoole-server
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use Laravel\Octane\Swoole\ServerStateFile;
use Laravel\Octane\Swoole\SwooleExtension;
use Laravel\Octane\Swoole\WorkerState;
use Swoole\Http\Server;
use Swoole\WebSocket\Server as WServer;
use Swoole\Timer;

ini_set('display_errors', 'stderr');
Expand Down Expand Up @@ -49,7 +50,7 @@ $timerTable = require __DIR__.'/createSwooleTimerTable.php';
|
*/

$server->on('start', fn (Server $server) => $bootstrap($serverState) && (new OnServerStart(
$server->on('start', fn (Server|WServer $server) => $bootstrap($serverState) && (new OnServerStart(
new ServerStateFile($serverStateFile),
new SwooleExtension,
$serverState['appName'],
Expand Down Expand Up @@ -91,7 +92,7 @@ $workerState->cacheTable = require __DIR__.'/createSwooleCacheTable.php';
$workerState->timerTable = $timerTable;
$workerState->tables = require __DIR__.'/createSwooleTables.php';

$server->on('workerstart', fn (Server $server, $workerId) =>
$server->on('workerstart', fn (Server|WServer $server, $workerId) =>
(fn ($basePath) => (new OnWorkerStart(
new SwooleExtension, $basePath, $serverState, $workerState
))($server, $workerId))($bootstrap($serverState))
Expand Down Expand Up @@ -142,13 +143,67 @@ $server->on('request', function ($request, $response) use ($server, $workerState
|
*/

$server->on('task', fn (Server $server, int $taskId, int $fromWorkerId, $data) =>
$server->on('task', fn (Server|WServer $server, int $taskId, int $fromWorkerId, $data) =>
$data === 'octane-tick'
? $workerState->worker->handleTick()
: $workerState->worker->handleTask($data)
);

$server->on('finish', fn (Server $server, int $taskId, $result) => $result);
$server->on('finish', fn (Server|WServer $server, int $taskId, $result) => $result);


if($serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false){

/*
|--------------------------------------------------------------------------
| Handle Incoming WebSocket Connections
|--------------------------------------------------------------------------
*/
$server->on('handshake', function ($request, $response) use ($server, $workerState, $serverState) {
$workerState->lastRequestTime = microtime(true);

if ($workerState->timerTable) {
$workerState->timerTable->set($workerState->workerId, [
'worker_pid' => $workerState->workerPid,
'time' => time(),
'fd' => $request->fd,
]);
}

$workerState->worker->handle(...$workerState->client->marshalRequest(new RequestContext([
'swooleRequest' => $request,
'swooleResponse' => $response,
'publicPath' => $serverState['publicPath'],
'octaneConfig' => $serverState['octaneConfig'],
])));

if ($workerState->timerTable) {
$workerState->timerTable->del($workerState->workerId);
}
});


/*
|--------------------------------------------------------------------------
| Handle Incoming WebSocket Messages
|--------------------------------------------------------------------------
*/

$server->on('message', function (WServer $server, Swoole\WebSocket\Frame $frame) use ($workerState) {
$workerState->worker->handleWebSocketMessage($server, $frame);
});

/*
|--------------------------------------------------------------------------
| Handle Closed WebSocket Connections
|--------------------------------------------------------------------------
*/

$server->on('close', function (WServer $server, int $fd) use ($workerState) {
$workerState->worker->handleWebSocketDisconnect($server, $fd);
});

}

/*
|--------------------------------------------------------------------------
Expand Down
12 changes: 12 additions & 0 deletions config/octane.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Laravel\Octane\Events\TaskTerminated;
use Laravel\Octane\Events\TickReceived;
use Laravel\Octane\Events\TickTerminated;
use Laravel\Octane\Events\WebSocketMessageReceived;
use Laravel\Octane\Events\WebSocketDisconnectReceived;
use Laravel\Octane\Events\WorkerErrorOccurred;
use Laravel\Octane\Events\WorkerStarting;
use Laravel\Octane\Events\WorkerStopping;
Expand Down Expand Up @@ -102,6 +104,16 @@
//
],

WebSocketMessageReceived::class => [
...Octane::prepareApplicationForNextOperation(),
//
],

WebSocketDisconnectReceived::class => [
...Octane::prepareApplicationForNextOperation(),
//
],

OperationTerminated::class => [
FlushOnce::class,
FlushTemporaryContainerInstances::class,
Expand Down
2 changes: 1 addition & 1 deletion src/Commands/Concerns/InteractsWithServers.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ protected function writeServerRunningMessage()
$this->output->writeln([
'',
' Local: <fg=white;options=bold>'.($this->hasOption('https') && $this->option('https') ? 'https://' : 'http://').$this->getHost().':'.$this->getPort().' </>',
'',
config('octane.swoole.enableWebSockets') ? (' <fg=white;options=bold>'.($this->hasOption('https') && $this->option('https') ? 'wss://' : 'ws://').$this->getHost().':'.$this->getPort()." </>\n") : '',
' <fg=yellow>Press Ctrl+C to stop the server</>',
'',
]);
Expand Down
6 changes: 3 additions & 3 deletions src/Concerns/ProvidesConcurrencySupport.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use Laravel\Octane\Swoole\ServerStateFile;
use Laravel\Octane\Swoole\SwooleHttpTaskDispatcher;
use Laravel\Octane\Swoole\SwooleTaskDispatcher;
use Swoole\Http\Server;

trait ProvidesConcurrencySupport
{
Expand All @@ -33,10 +32,11 @@ public function concurrently(array $tasks, int $waitMilliseconds = 3000)
*/
public function tasks()
{
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
return match (true) {
app()->bound(DispatchesTasks::class) => app(DispatchesTasks::class),
app()->bound(Server::class) => new SwooleTaskDispatcher,
class_exists(Server::class) => (fn (array $serverState) => new SwooleHttpTaskDispatcher(
app()->bound($serverClass) => new SwooleTaskDispatcher,
class_exists($serverClass) => (fn (array $serverState) => new SwooleHttpTaskDispatcher(
$serverState['state']['host'] ?? '127.0.0.1',
$serverState['state']['port'] ?? '8000',
new SequentialTaskDispatcher
Expand Down
17 changes: 17 additions & 0 deletions src/Events/WebSocketDisconnectReceived.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

namespace Laravel\Octane\Events;

use Illuminate\Foundation\Application;
use Swoole\WebSocket\Server;

class WebSocketDisconnectReceived
{
public function __construct(
public Application $app,
public Application $sandbox,
public Server $server,
public int $fd
) {
}
}
18 changes: 18 additions & 0 deletions src/Events/WebSocketMessageReceived.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Laravel\Octane\Events;

use Illuminate\Foundation\Application;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;

class WebSocketMessageReceived
{
public function __construct(
public Application $app,
public Application $sandbox,
public Server $server,
public Frame $frame
) {
}
}
4 changes: 2 additions & 2 deletions src/Octane.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Exception;
use Laravel\Octane\Swoole\WorkerState;
use Swoole\Http\Server;
use Swoole\Table;
use Throwable;

Expand All @@ -20,7 +19,8 @@ class Octane
*/
public function table(string $table): Table
{
if (! app()->bound(Server::class)) {
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
if (! app()->bound($serverClass)) {
throw new Exception('Tables may only be accessed when using the Swoole server.');
}

Expand Down
5 changes: 3 additions & 2 deletions src/OctaneServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ public function register()
});

$this->app->bind(DispatchesCoroutines::class, function ($app) {
return class_exists('Swoole\Http\Server')
? new SwooleCoroutineDispatcher($app->bound('Swoole\Http\Server'))
$serverClass = $app['config']->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
return class_exists($serverClass)
? new SwooleCoroutineDispatcher($app->bound($serverClass))
: $app->make(SequentialCoroutineDispatcher::class);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __invoke($swooleRequest, string $phpSapi): Request
$request = new SymfonyRequest(
$swooleRequest->get ?? [],
$swooleRequest->post ?? [],
[],
['fd' => $swooleRequest->fd],
$swooleRequest->cookie ?? [],
$swooleRequest->files ?? [],
$serverVariables,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
use Laravel\Octane\Swoole\SwooleExtension;
use Swoole\Http\Response;
use Swoole\Http\Server;
use Swoole\WebSocket\Server as WServer;

class EnsureRequestsDontExceedMaxExecutionTime
{
public function __construct(
protected SwooleExtension $extension,
protected $timerTable,
protected $maxExecutionTime,
protected ?Server $server = null
protected Server|WServer|null $server = null
) {
}

Expand All @@ -38,13 +39,13 @@ public function __invoke()

$this->timerTable->del($workerId);

if ($this->server instanceof Server && ! $this->server->exists($row['fd'])) {
if (($this->server instanceof Server || $this->server instanceof WServer) && ! $this->server->exists($row['fd'])) {
continue;
}

$this->extension->dispatchProcessSignal($row['worker_pid'], SIGKILL);

if ($this->server instanceof Server) {
if ($this->server instanceof Server || $this->server instanceof WServer) {
$response = Response::create($this->server, $row['fd']);

if ($response) {
Expand Down
2 changes: 1 addition & 1 deletion src/Swoole/Handlers/OnServerStart.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public function __construct(
/**
* Handle the "start" Swoole event.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
public function __invoke($server)
Expand Down
12 changes: 6 additions & 6 deletions src/Swoole/Handlers/OnWorkerStart.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Laravel\Octane\Swoole\SwooleExtension;
use Laravel\Octane\Swoole\WorkerState;
use Laravel\Octane\Worker;
use Swoole\Http\Server;
use Throwable;

class OnWorkerStart
Expand All @@ -25,7 +24,7 @@ public function __construct(
/**
* Handle the "workerstart" Swoole event.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
public function __invoke($server, int $workerId)
Expand Down Expand Up @@ -53,18 +52,19 @@ public function __invoke($server, int $workerId)
/**
* Boot the Octane worker and application.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return \Laravel\Octane\Worker|null
*/
protected function bootWorker($server)
{
try {
$serverClass = ($this->serverState['octaneConfig']['swoole']['enableWebSockets'] ?? false) ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
return tap(new Worker(
new ApplicationFactory($this->basePath),
$this->workerState->client = new SwooleClient
))->boot([
'octane.cacheTable' => $this->workerState->cacheTable,
Server::class => $server,
$serverClass => $server,
WorkerState::class => $this->workerState,
]);
} catch (Throwable $e) {
Expand All @@ -77,7 +77,7 @@ protected function bootWorker($server)
/**
* Start the Octane server tick to dispatch the tick task every second.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
protected function dispatchServerTickTaskEverySecond($server)
Expand All @@ -88,7 +88,7 @@ protected function dispatchServerTickTaskEverySecond($server)
/**
* Register the request handled listener that will output request information per request.
*
* @param \Swoole\Http\Server $server
* @param \Swoole\Http\Server|\Swoole\WebSocket\Server $server
* @return void
*/
protected function streamRequestsToConsole($server)
Expand Down
11 changes: 6 additions & 5 deletions src/Swoole/SwooleTaskDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use Laravel\Octane\Exceptions\TaskExceptionResult;
use Laravel\Octane\Exceptions\TaskTimeoutException;
use Laravel\SerializableClosure\SerializableClosure;
use Swoole\Http\Server;

class SwooleTaskDispatcher implements DispatchesTasks
{
Expand All @@ -23,11 +22,12 @@ class SwooleTaskDispatcher implements DispatchesTasks
*/
public function resolve(array $tasks, int $waitMilliseconds = 3000): array
{
if (! app()->bound(Server::class)) {
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
if (! app()->bound($serverClass)) {
throw new InvalidArgumentException('Tasks can only be resolved within a Swoole server context / web request.');
}

$results = app(Server::class)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) {
$results = app($serverClass)->taskWaitMulti(collect($tasks)->mapWithKeys(function ($task, $key) {
return [$key => $task instanceof Closure
? new SerializableClosure($task)
: $task, ];
Expand Down Expand Up @@ -61,11 +61,12 @@ public function resolve(array $tasks, int $waitMilliseconds = 3000): array
*/
public function dispatch(array $tasks): void
{
if (! app()->bound(Server::class)) {
$serverClass = app('config')->get('octane.swoole.enableWebSockets') ? 'Swoole\WebSocket\Server' : 'Swoole\Http\Server';
if (! app()->bound($serverClass)) {
throw new InvalidArgumentException('Tasks can only be dispatched within a Swoole server context / web request.');
}

$server = app(Server::class);
$server = app($serverClass);

collect($tasks)->each(function ($task) use ($server) {
$server->task($task instanceof Closure ? new SerializableClosure($task) : $task);
Expand Down
Loading
Loading