Skip to content

Commit

Permalink
Merge pull request #339 from jursonmo/feature/DynamicBind
Browse files Browse the repository at this point in the history
Feature/dynamic bind: 增加DynamicBind工作池模式,类似于Bind工作池模式,每个连接绑定一个worker, 但不像Bind模式那样会闲置很多worker
  • Loading branch information
aceld authored Oct 11, 2024
2 parents 573502b + ea8a8c8 commit e442c38
Show file tree
Hide file tree
Showing 5 changed files with 243 additions and 3 deletions.
89 changes: 89 additions & 0 deletions examples/zinx_dynamic_bind/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"os"
"os/signal"
"time"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/znet"
)

const (
PingType = 1
PongType = 2
)

// ping response router
type PongRouter struct {
znet.BaseRouter
client string
}

// Hash 工作模式下,需要等待接受到client1的pong后,才会收到client2和client3的pong
// DynamicBind工作模式下,client2, client3 都会立马收到pong, 但client1的pong会被阻塞十秒后才收到
func (p *PongRouter) Handle(request ziface.IRequest) {
//read server pong data
zlog.Infof("---------client:%s, recv from server:%s, msgId=%d, data=%s ----------\n",
p.client, request.GetConnection().RemoteAddr(), request.GetMsgID(), string(request.GetData()))
}

func onClient1Start(conn ziface.IConnection) {
zlog.Infof("client1 connection start, %s->%s\n", conn.LocalAddrString(), conn.RemoteAddrString())
//send ping
err := conn.SendMsg(PingType, []byte("Ping From Client1"))
if err != nil {
zlog.Error(err)
}
}

func onClient2Start(conn ziface.IConnection) {
zlog.Infof("client2 connection start, %s->%s\n", conn.LocalAddrString(), conn.RemoteAddrString())
//send ping
err := conn.SendMsg(PingType, []byte("Ping From Client2"))
if err != nil {
zlog.Error(err)
}
}

func onClient3Start(conn ziface.IConnection) {
zlog.Infof("client3 connection start, %s->%s\n", conn.LocalAddrString(), conn.RemoteAddrString())
//send ping
err := conn.SendMsg(PingType, []byte("Ping From Client3"))
if err != nil {
zlog.Error(err)
}
}

func main() {
//Create a client client
client1 := znet.NewClient("127.0.0.1", 8999)
client1.SetOnConnStart(onClient1Start)
client1.AddRouter(PongType, &PongRouter{client: "client1"})
client1.Start()

time.Sleep(time.Second)

client2 := znet.NewClient("127.0.0.1", 8999)
client2.SetOnConnStart(onClient2Start)
client2.AddRouter(PongType, &PongRouter{client: "client2"})
client2.Start()

time.Sleep(time.Second)

client3 := znet.NewClient("127.0.0.1", 8999)
client3.SetOnConnStart(onClient3Start)
client3.AddRouter(PongType, &PongRouter{client: "client3"})
client3.Start()

//Prevent the process from exiting, waiting for an interrupt signal
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
<-signalChan
client1.Stop()
client2.Stop()
client3.Stop()

time.Sleep(time.Second)
}
9 changes: 9 additions & 0 deletions examples/zinx_dynamic_bind/server/conf/zinx.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Name":"zinx server DynamicBind Mode Demo",
"Host":"127.0.0.1",
"TCPPort":8999,
"MaxConn":12000,
"WorkerPoolSize":1,
"MaxWorkerTaskLen":50,
"WorkerMode":"DynamicBind"
}
58 changes: 58 additions & 0 deletions examples/zinx_dynamic_bind/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"sync/atomic"
"time"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/znet"
)

func OnConnectionAdd(conn ziface.IConnection) {
zlog.Debug("OnConnectionAdd:", conn.GetConnection().RemoteAddr())
}

func OnConnectionLost(conn ziface.IConnection) {
zlog.Debug("OnConnectionLost:", conn.GetConnection().RemoteAddr())
}

type blockRouter struct {
znet.BaseRouter
}

var Block = int32(1)

// 模拟阻塞操作
func (r *blockRouter) Handle(request ziface.IRequest) {
//read client data
zlog.Infof("recv from client:%s, msgId=%d, data=%s\n", request.GetConnection().RemoteAddr(), request.GetMsgID(), string(request.GetData()))

// 第一次处理时,模拟任务阻塞操作, Hash 模式下,后面的连接的任务得不到处理
// DynamicBind 模式下,看后面的连接的任务会得到即使处理,不会因为前面连接的任务阻塞而得不到处理
// 这里只模拟一次阻塞操作。
if atomic.CompareAndSwapInt32(&Block, 1, 0) {
zlog.Infof("blockRouter handle start, msgId=%d, remote:%v\n", request.GetMsgID(), request.GetConnection().RemoteAddr())
time.Sleep(time.Second * 10)
//阻塞操作结束
zlog.Infof("blockRouter handle end, msgId=%d, remote:%v\n", request.GetMsgID(), request.GetConnection().RemoteAddr())
}

err := request.GetConnection().SendMsg(2, []byte("pong from server"))
if err != nil {
zlog.Error(err)
return
}
zlog.Infof("send pong over, client:%s\n", request.GetConnection().RemoteAddr())
}

func main() {
s := znet.NewServer()

s.SetOnConnStart(OnConnectionAdd)
s.SetOnConnStop(OnConnectionLost)

s.AddRouter(1, &blockRouter{})

s.Serve()
}
8 changes: 8 additions & 0 deletions zconf/zconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ const (
const (
WorkerModeHash = "Hash" // By default, the round-robin average allocation rule is used.(默认使用取余的方式)
WorkerModeBind = "Bind" // Bind a worker to each connection.(为每个连接分配一个worker)

//Hash 模式有阻塞的问题(虽然有异步的方式可解决)。
//Bind 模式下,如果配置MaxConn值比较大的话 就会后台就会起很多worker在等着,当服务器接入连接较少时, 很多worker都是空闲; MaxConn 设置小的话,服务器能接入的连接就会受限。

//WorkerModeDynamicBind 也是一个连接对应一个worker, 给连接分配worker时, 如果工作池里初始化的worker已经用完了,就动态创建一个worker绑定到每个连接。这个临时创建的worker, 会在连接断开后销毁。
//跟WorkerModeHash的区别是,如果业务层回调有阻塞操作的话,也不影响其他连接的业务层处理。
//跟WorkerModeBind的区别是,不需要像Bind模式那样一开始就创建很多worker,而是根据连接数动态创建worker,这样可以避免闲置worker数量过多导致的资源浪费。
WorkerModeDynamicBind = "DynamicBind" // Dynamic binding of a worker to each connection when there is no worker in worker pool.(临时动态创建一个worker绑定到每个连接)
)

/*
Expand Down
82 changes: 79 additions & 3 deletions znet/msghandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type MsgHandle struct {
// (Worker负责取任务的消息队列)
TaskQueue []chan ziface.IRequest

// A collection of extra workers, used for zconf.WorkerModeDynamicBind
// (池里的工作线程不够用的时候, 可临时额外分配workerID集合, 用于zconf.WorkerModeDynamicBind)
extraFreeWorkers map[uint32]struct{}
extraFreeWorkerMu sync.Mutex

// Chain builder for the responsibility chain
// (责任链构造器)
builder *chainBuilder
Expand All @@ -48,6 +53,8 @@ type MsgHandle struct {
// zinxRole: IServer/IClient
func newMsgHandle() *MsgHandle {
var freeWorkers map[uint32]struct{}
var extraFreeWorkers map[uint32]struct{}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind {
// Assign a workder to each link, avoid interactions when multiple links are processed by the same worker
// MaxWorkerTaskLen can also be reduced, for example, 50
Expand All @@ -60,14 +67,32 @@ func newMsgHandle() *MsgHandle {
}
}

TaskQueueLen := zconf.GlobalObject.WorkerPoolSize

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind {
zlog.Ins().DebugF("WorkerMode = %s", zconf.WorkerModeDynamicBind)
freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize)
for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ {
freeWorkers[i] = struct{}{}
}

extraFreeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.MaxConn-int(zconf.GlobalObject.WorkerPoolSize))
for i := zconf.GlobalObject.WorkerPoolSize; i < uint32(zconf.GlobalObject.MaxConn); i++ {
extraFreeWorkers[i] = struct{}{}
}
TaskQueueLen = uint32(zconf.GlobalObject.MaxConn)
}

handle := &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
RouterSlices: NewRouterSlices(),
WorkerPoolSize: zconf.GlobalObject.WorkerPoolSize,
// One worker corresponds to one queue (一个worker对应一个queue)
TaskQueue: make([]chan ziface.IRequest, zconf.GlobalObject.WorkerPoolSize),
TaskQueue: make([]chan ziface.IRequest, TaskQueueLen),
freeWorkers: freeWorkers,
builder: newChainBuilder(),
// 可额外临时分配的workerID集合
extraFreeWorkers: extraFreeWorkers,
}

// It is necessary to add the MsgHandle to the responsibility chain here, and it is the last link in the responsibility chain. After decoding in the MsgHandle, data distribution is done by router
Expand Down Expand Up @@ -97,6 +122,28 @@ func useWorker(conn ziface.IConnection) uint32 {
}
}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind {
mh.freeWorkerMu.Lock()
// try to get workerID from workerPool first
// 首先尝试从工作线程池里获取一个空闲的workerID
for workerID := range mh.freeWorkers {
delete(mh.freeWorkers, workerID)
mh.freeWorkerMu.Unlock()
return workerID
}
mh.freeWorkerMu.Unlock()

// 工作池的worker用完了,临时从extraFreeWorkers取一个额外的workerID, 并相应启动一个临时的worker
mh.extraFreeWorkerMu.Lock()
defer mh.extraFreeWorkerMu.Unlock()
for workerID := range mh.extraFreeWorkers {
zlog.Ins().DebugF("start extra worker, workerID=%d", workerID)
mh.TaskQueue[workerID] = make(chan ziface.IRequest, zconf.GlobalObject.MaxWorkerTaskLen)
go mh.StartOneWorker(int(workerID), mh.TaskQueue[workerID])
return workerID
}
}

//Compatible with the situation where the client has no worker, and solve the situation divide 0
//(兼容client没有worker情况,解决除0的情况)
if mh.WorkerPoolSize == 0 {
Expand Down Expand Up @@ -128,6 +175,23 @@ func freeWorker(conn ziface.IConnection) {

mh.freeWorkers[conn.GetWorkerID()] = struct{}{}
}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind {
workerID := conn.GetWorkerID()
if workerID < mh.WorkerPoolSize {
// 说明这个是工作线程池里的workerID,回收这个workerID, workerID对应的worker不需要销毁
mh.freeWorkerMu.Lock()
mh.freeWorkers[workerID] = struct{}{}
mh.freeWorkerMu.Unlock()
} else {
// 说明这个worker是一个临时的worker,需要销毁这个worker
mh.StopOneWorker(int(workerID))
// 回收workerID, 放回额外workerID池里
mh.extraFreeWorkerMu.Lock()
mh.extraFreeWorkers[workerID] = struct{}{}
mh.extraFreeWorkerMu.Unlock()
}
}
}

// Data processing interceptor that is necessary by default in Zinx
Expand Down Expand Up @@ -280,6 +344,13 @@ func (mh *MsgHandle) doMsgHandlerSlices(request ziface.IRequest, workerID int) {
PutRequest(request)
}

func (mh *MsgHandle) StopOneWorker(workerID int) {
zlog.Ins().DebugF("stop Worker ID = %d ", workerID)
// Stop the worker by closing the corresponding taskQueue
// (停止一个Worker,通过关闭对应的taskQueue)
close(mh.TaskQueue[workerID])
}

// StartOneWorker starts a worker workflow
// (启动一个Worker工作流程)
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
Expand All @@ -290,8 +361,13 @@ func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest
select {
// If there is a message, take out the Request from the queue and execute the bound business method
// (有消息则取出队列的Request,并执行绑定的业务方法)
case request := <-taskQueue:

case request, ok := <-taskQueue:
if !ok {
// DynamicBind Mode, destroy current worker by close the taskQueue
// (DynamicBind模式下,临时创建的worker, 是通过关闭taskQueue 来销毁当前worker)
zlog.Ins().ErrorF(" taskQueue is closed, Worker ID = %d quit", workerID)
return
}
switch req := request.(type) {

case ziface.IFuncRequest:
Expand Down

0 comments on commit e442c38

Please sign in to comment.