Skip to content

Commit

Permalink
up: update pipe message logic for handle clean ws,tcp connection
Browse files Browse the repository at this point in the history
  • Loading branch information
inhere committed Nov 2, 2019
1 parent e2f6d02 commit 1c51a8c
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 2 deletions.
5 changes: 5 additions & 0 deletions src/server/src/ServerEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ final class ServerEvent
*/
public const USER_PROCESS_START = 'swoft.process.user.start';

/**
* Pipe-message. please {@see \Swoft\Server\Swoole\PipeMessageListener}
*/
public const PIPE_MESSAGE = 'pipeMessage';

/**
* Before after event
*/
Expand Down
33 changes: 33 additions & 0 deletions src/server/src/Swoole/PipeMessageListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php declare(strict_types=1);

namespace Swoft\Server\Swoole;

use Swoft;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Log\Helper\CLog;
use Swoft\Server\Contract\PipeMessageInterface;
use Swoft\Server\ServerEvent;
use Swoole\Server;

/**
* Class PipeMessageListener
*
* @since 2.0.7
* @Bean()
*/
class PipeMessageListener implements PipeMessageInterface
{
/**
* Pipe message event
*
* @param Server $server
* @param int $srcWorkerId
* @param mixed $message
*/
public function onPipeMessage(Server $server, int $srcWorkerId, $message): void
{
CLog::debug("PipeMessage: received pipe-message fromWID=$srcWorkerId message=$message");

Swoft::trigger(ServerEvent::PIPE_MESSAGE, $message, $srcWorkerId);
}
}
23 changes: 23 additions & 0 deletions src/stdlib/src/Helper/PhpHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
use function ob_get_clean;
use function ob_start;
use function preg_replace;
use function serialize;
use function sprintf;
use function strpos;
use function unserialize;
use function var_dump;
use function var_export;
use const PHP_EOL;
Expand Down Expand Up @@ -70,6 +72,27 @@ public static function callByArray($cb, array $args = [])
return self::call($cb, ...$args);
}

/**
* @param $data
*
* @return string
*/
public static function serialize($data): string
{
return serialize($data);
}

/**
* @param string $data
* @param array $opts
*
* @return array|mixed
*/
public static function unserialize(string $data, array $opts = ['allowed_classes' => false])
{
return unserialize($data, $opts);
}

/**
* dump vars
*
Expand Down
63 changes: 63 additions & 0 deletions src/tcp-server/src/Listener/PipeMessageListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php declare(strict_types=1);

namespace Swoft\Tcp\Server\Listener;

use Swoft\Event\Annotation\Mapping\Listener;
use Swoft\Event\EventHandlerInterface;
use Swoft\Event\EventInterface;
use Swoft\Log\Helper\CLog;
use Swoft\Server\ServerEvent;
use Swoft\Session\Session;

/**
* Class PipeMessageListener
*
* @Listener(ServerEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param EventInterface $event
*/
public function handle(EventInterface $event): void
{
if (!$message = $event->getTarget()) {
return;
}

// Don't handle on data is invalid
$data = (array)json_decode($message, true);
if (JSON_ERROR_NONE !== json_last_error()) {
return;
}

// Ensure is tcp notify message
if (!isset($data['from']) || $data['from'] !== 'tcpServer') {
return;
}

// Handle
if (isset($data['event'])) {
$eventName = (string)$data['event'];

/** @see CloseListener::onClose() */
if ($eventName === 'onClose') {
$this->handleOnClose($data, $event->getParam(0));
}
}
}

/**
* @param array $data
* @param int $srcWID
*/
protected function handleOnClose(array $data, int $srcWID): void
{
$sid = $data['sid'];

if (Session::has($sid)) {
CLog::info("PipeMessage: destroy tcp connection for fd=$sid fromWID=$srcWID");
Session::destroy($sid);
}
}
}
2 changes: 1 addition & 1 deletion src/websocket-server/src/AutoLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
namespace Swoft\WebSocket\Server;

use Swoft\Helper\ComposerJSON;
use Swoft\Server\Swoole\PipeMessageListener;
use Swoft\Server\SwooleEvent;
use Swoft\SwoftComponent;
use Swoft\WebSocket\Server\Router\Router;
use Swoft\WebSocket\Server\Swoole\CloseListener;
use Swoft\WebSocket\Server\Swoole\HandshakeListener;
use Swoft\WebSocket\Server\Swoole\MessageListener;
use Swoft\WebSocket\Server\Swoole\PipeMessageListener;
use function bean;
use function dirname;
use function env;
Expand Down
63 changes: 63 additions & 0 deletions src/websocket-server/src/Listener/PipeMessageListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php declare(strict_types=1);

namespace Swoft\WebSocket\Server\Listener;

use Swoft\Event\Annotation\Mapping\Listener;
use Swoft\Event\EventHandlerInterface;
use Swoft\Event\EventInterface;
use Swoft\Log\Helper\CLog;
use Swoft\Server\ServerEvent;
use Swoft\Session\Session;

/**
* Class PipeMessageListener
*
* @Listener(ServerEvent::PIPE_MESSAGE)
*/
class PipeMessageListener implements EventHandlerInterface
{
/**
* @param EventInterface $event
*/
public function handle(EventInterface $event): void
{
if (!$message = $event->getTarget()) {
return;
}

$data = (array)json_decode($message, true);

// Don't handle on data is invalid
if (JSON_ERROR_NONE !== json_last_error()) {
return;
}

// Ensure is websocket notify message
if (!isset($data['from']) || $data['from'] !== 'wsServer') {
return;
}

// Handle
if (isset($data['event'])) {
$eventName = (string)$data['event'];

/** @see \Swoft\WebSocket\Server\Swoole\CloseListener::onClose() */
if ($eventName === 'onClose') {
$this->handleClose($data, $event->getParam(0));
}
}
}

/**
* @param array $data
* @param int $srcWID
*/
protected function handleClose(array $data, int $srcWID): void
{
$sid = $data['sid'];
if (Session::has($sid)) {
CLog::info("PipeMessage: destroy ws connection for fd=$sid fromWID=$srcWID");
Session::destroy($sid);
}
}
}
1 change: 0 additions & 1 deletion src/websocket-server/src/Swoole/CloseListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
* Class CloseListener
*
* @since 2.0
*
* @Bean()
*/
class CloseListener implements CloseInterface
Expand Down
1 change: 1 addition & 0 deletions src/websocket-server/src/Swoole/PipeMessageListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* Class PipeMessageListener
*
* @Bean()
* @deprecated will use Listener/PipeMessageListener instead.
*/
class PipeMessageListener implements PipeMessageInterface
{
Expand Down

0 comments on commit 1c51a8c

Please sign in to comment.