Skip to content

Commit

Permalink
Merge pull request #116 from twomiao/master
Browse files Browse the repository at this point in the history
增加轮询负载均衡
  • Loading branch information
walkor authored Nov 19, 2023
2 parents 327bbce + 6a0da57 commit 3e86fba
Showing 1 changed file with 102 additions and 10 deletions.
112 changes: 102 additions & 10 deletions src/Gateway.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,28 @@
*/
class Gateway extends Worker
{


/**
* 随机负载均衡
*
*@var string
*/
const ROUTER_RANDOM = 'router_random';

/**
* 轮询负载均衡
*
* @var string
*/
const ROUTER_ROUND_ROBIN = 'router_round_robin';

/**
* 默认负载均衡模式轮询
*
* @var string $selectLoadBalancingMode
*/
public static $selectLoadBalancingMode = self::ROUTER_ROUND_ROBIN;

/**
* 本机 IP
* 单机部署默认 127.0.0.1,如果是分布式部署,需要设置成本机 IP
Expand Down Expand Up @@ -150,6 +171,14 @@ class Gateway extends Worker
*/
public $onBusinessWorkerClose = null;

/**
* 轮询负载均衡记录
* [ ip+businessworker key => 连接记录, ip+businessworker key => 连接记录, .... ]
*
* @var array
*/
protected static $roundRobinRecord = array();

/**
* 保存客户端的所有 connection 对象
*
Expand Down Expand Up @@ -440,17 +469,58 @@ protected function sendToWorker($cmd, $connection, $body = '')
}

/**
* 随机路由,返回 worker connection 对象
* 随机路由,返回 worker connection 标识
*
* @param array $worker_connections
* @param TcpConnection $client_connection
* @param int $cmd
* @param mixed $buffer
* @return TcpConnection
* @return string
*/
public static function routerRand($worker_connections, $client_connection, $cmd, $buffer)
public static function routerRand(array $worker_connections) : string
{
return $worker_connections[array_rand($worker_connections)];
return array_rand($worker_connections);
}

/**
* 轮询路由,返回 worker connection 标识
* 新上线服务器由于客户端连接数过低,会先分配给新服务器
*
* @throws \Exception
* @param array $roundRobinRecord
* @return string
*/
protected static function routerRoundRobin(array $roundRobinRecord) : string
{
if (empty($roundRobinRecord))
{
throw new \Exception("Round-robin record is empty.");
}

// min($roundRobinRecord) 返回连接数最少businessWorker 连接标识
return array_search(min($roundRobinRecord), $roundRobinRecord, true);
}

/**
* @param array $roundRobinRecord
* @param array $worker_connections
* @param string $selectLoadBalancingMode
* @return string
* @throws \Exception
*/
protected static function businessWorkerAddress(array $roundRobinRecord, array $worker_connections, string $selectLoadBalancingMode)
{
switch ($selectLoadBalancingMode)
{
case static::ROUTER_ROUND_ROBIN:
// 选择连接最少的businessWorker 服务器
$businessWorkerAddress = static::routerRoundRobin($roundRobinRecord);
// 增加轮询表记录
static::$roundRobinRecord[$businessWorkerAddress]++;
return $businessWorkerAddress;
case static::ROUTER_RANDOM:
// 随机轮询
return static::routerRand($worker_connections);
default:
throw new \Exception("This type of load balancing mode is not supported.");
}
}

/**
Expand All @@ -461,11 +531,12 @@ public static function routerRand($worker_connections, $client_connection, $cmd,
* @param int $cmd
* @param mixed $buffer
* @return TcpConnection
* @throws \Exception
*/
public static function routerBind($worker_connections, $client_connection, $cmd, $buffer)
{
if (!isset($client_connection->businessworker_address) || !isset($worker_connections[$client_connection->businessworker_address])) {
$client_connection->businessworker_address = array_rand($worker_connections);
$client_connection->businessworker_address = static::businessWorkerAddress(static::$roundRobinRecord, $worker_connections, static::$selectLoadBalancingMode);
}
return $worker_connections[$client_connection->businessworker_address];
}
Expand All @@ -479,6 +550,18 @@ public function onClientClose($connection)
{
// 尝试通知 worker,触发 Event::onClose
$this->sendToWorker(GatewayProtocol::CMD_ON_CLOSE, $connection);

// 轮询记录减少
if(static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN &&
isset($connection->businessworker_address))
{
// 轮询记录 >0,减少连接数
if((static::$roundRobinRecord[$connection->businessworker_address]) > 0)
{
static::$roundRobinRecord[$connection->businessworker_address]--;
}
}

unset($this->_clientConnections[$connection->id]);
// 清理 uid 数据
if (!empty($connection->uid)) {
Expand Down Expand Up @@ -569,7 +652,7 @@ class_alias('GatewayWorker\Protocols\GatewayProtocol', 'Protocols\GatewayProtoco
public function onWorkerConnect($connection)
{
$connection->maxSendBufferSize = $this->sendToWorkerBufferSize;
$connection->authorized = $this->secretKey ? false : true;
$connection->authorized = !$this->secretKey;
}

/**
Expand Down Expand Up @@ -608,6 +691,10 @@ public function onWorkerMessage($connection, $data)
$connection->key = $key;
$this->_workerConnections[$key] = $connection;
$connection->authorized = true;
// 轮询负载均衡初始化
if(static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN) {
static::$roundRobinRecord[$key] = 0;
}
if ($this->onBusinessWorkerConnected) {
call_user_func($this->onBusinessWorkerConnected, $connection);
}
Expand Down Expand Up @@ -969,6 +1056,11 @@ public function onWorkerMessage($connection, $data)
public function onWorkerClose($connection)
{
if (isset($connection->key)) {
// 删除轮询记录
if (static::$selectLoadBalancingMode === static::ROUTER_ROUND_ROBIN)
{
unset(static::$roundRobinRecord[$connection->key]);
}
unset($this->_workerConnections[$connection->key]);
if ($this->onBusinessWorkerClose) {
call_user_func($this->onBusinessWorkerClose, $connection);
Expand Down

0 comments on commit 3e86fba

Please sign in to comment.