Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/dynamic bind: 增加DynamicBind工作池模式,类似于Bind工作池模式,每个连接绑定一个worker, 但不像Bind模式那样会闲置很多worker #339

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions examples/zinx_dynamic_bind/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package main

import (
"os"
"os/signal"
"time"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/znet"
)

const (
PingType = 1
PongType = 2
)

// ping response router
type PongRouter struct {
znet.BaseRouter
client string
}

// Hash 工作模式下,需要等待接受到client1的pong后,才会收到client2和client3的pong
// DynamicBind工作模式下,client2, client3 都会立马收到pong, 但client1的pong会被阻塞十秒后才收到
func (p *PongRouter) Handle(request ziface.IRequest) {
//read server pong data
zlog.Infof("---------client:%s, recv from server:%s, msgId=%d, data=%s ----------\n",
p.client, request.GetConnection().RemoteAddr(), request.GetMsgID(), string(request.GetData()))
}

func onClient1Start(conn ziface.IConnection) {
zlog.Infof("client1 connection start, %s->%s\n", conn.LocalAddrString(), conn.RemoteAddrString())
//send ping
err := conn.SendMsg(PingType, []byte("Ping From Client1"))
if err != nil {
zlog.Error(err)
}
}

func onClient2Start(conn ziface.IConnection) {
zlog.Infof("client2 connection start, %s->%s\n", conn.LocalAddrString(), conn.RemoteAddrString())
//send ping
err := conn.SendMsg(PingType, []byte("Ping From Client2"))
if err != nil {
zlog.Error(err)
}
}

func onClient3Start(conn ziface.IConnection) {
zlog.Infof("client3 connection start, %s->%s\n", conn.LocalAddrString(), conn.RemoteAddrString())
//send ping
err := conn.SendMsg(PingType, []byte("Ping From Client3"))
if err != nil {
zlog.Error(err)
}
}

func main() {
//Create a client client
client1 := znet.NewClient("127.0.0.1", 8999)
client1.SetOnConnStart(onClient1Start)
client1.AddRouter(PongType, &PongRouter{client: "client1"})
client1.Start()

time.Sleep(time.Second)

client2 := znet.NewClient("127.0.0.1", 8999)
client2.SetOnConnStart(onClient2Start)
client2.AddRouter(PongType, &PongRouter{client: "client2"})
client2.Start()

time.Sleep(time.Second)

client3 := znet.NewClient("127.0.0.1", 8999)
client3.SetOnConnStart(onClient3Start)
client3.AddRouter(PongType, &PongRouter{client: "client3"})
client3.Start()

//Prevent the process from exiting, waiting for an interrupt signal
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
<-signalChan
client1.Stop()
client2.Stop()
client3.Stop()

time.Sleep(time.Second)
}
9 changes: 9 additions & 0 deletions examples/zinx_dynamic_bind/server/conf/zinx.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"Name":"zinx server DynamicBind Mode Demo",
"Host":"127.0.0.1",
"TCPPort":8999,
"MaxConn":12000,
"WorkerPoolSize":1,
"MaxWorkerTaskLen":50,
"WorkerMode":"DynamicBind"
}
58 changes: 58 additions & 0 deletions examples/zinx_dynamic_bind/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package main

import (
"sync/atomic"
"time"

"github.com/aceld/zinx/ziface"
"github.com/aceld/zinx/zlog"
"github.com/aceld/zinx/znet"
)

func OnConnectionAdd(conn ziface.IConnection) {
zlog.Debug("OnConnectionAdd:", conn.GetConnection().RemoteAddr())
}

func OnConnectionLost(conn ziface.IConnection) {
zlog.Debug("OnConnectionLost:", conn.GetConnection().RemoteAddr())
}

type blockRouter struct {
znet.BaseRouter
}

var Block = int32(1)

// 模拟阻塞操作
func (r *blockRouter) Handle(request ziface.IRequest) {
//read client data
zlog.Infof("recv from client:%s, msgId=%d, data=%s\n", request.GetConnection().RemoteAddr(), request.GetMsgID(), string(request.GetData()))

// 第一次处理时,模拟任务阻塞操作, Hash 模式下,后面的连接的任务得不到处理
// DynamicBind 模式下,看后面的连接的任务会得到即使处理,不会因为前面连接的任务阻塞而得不到处理
// 这里只模拟一次阻塞操作。
if atomic.CompareAndSwapInt32(&Block, 1, 0) {
zlog.Infof("blockRouter handle start, msgId=%d, remote:%v\n", request.GetMsgID(), request.GetConnection().RemoteAddr())
time.Sleep(time.Second * 10)
//阻塞操作结束
zlog.Infof("blockRouter handle end, msgId=%d, remote:%v\n", request.GetMsgID(), request.GetConnection().RemoteAddr())
}

err := request.GetConnection().SendMsg(2, []byte("pong from server"))
if err != nil {
zlog.Error(err)
return
}
zlog.Infof("send pong over, client:%s\n", request.GetConnection().RemoteAddr())
}

func main() {
s := znet.NewServer()

s.SetOnConnStart(OnConnectionAdd)
s.SetOnConnStop(OnConnectionLost)

s.AddRouter(1, &blockRouter{})

s.Serve()
}
8 changes: 8 additions & 0 deletions zconf/zconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ const (
const (
WorkerModeHash = "Hash" // By default, the round-robin average allocation rule is used.(默认使用取余的方式)
WorkerModeBind = "Bind" // Bind a worker to each connection.(为每个连接分配一个worker)

//Hash 模式有阻塞的问题(虽然有异步的方式可解决)。
//Bind 模式下,如果配置MaxConn值比较大的话 就会后台就会起很多worker在等着,当服务器接入连接较少时, 很多worker都是空闲; MaxConn 设置小的话,服务器能接入的连接就会受限。

//WorkerModeDynamicBind 也是一个连接对应一个worker, 给连接分配worker时, 如果工作池里初始化的worker已经用完了,就动态创建一个worker绑定到每个连接。这个临时创建的worker, 会在连接断开后销毁。
//跟WorkerModeHash的区别是,如果业务层回调有阻塞操作的话,也不影响其他连接的业务层处理。
//跟WorkerModeBind的区别是,不需要像Bind模式那样一开始就创建很多worker,而是根据连接数动态创建worker,这样可以避免闲置worker数量过多导致的资源浪费。
WorkerModeDynamicBind = "DynamicBind" // Dynamic binding of a worker to each connection when there is no worker in worker pool.(临时动态创建一个worker绑定到每个连接)
)

/*
Expand Down
82 changes: 79 additions & 3 deletions znet/msghandler.go
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	handle := &MsgHandle{
		Apis:           make(map[uint32]ziface.IRouter),
		RouterSlices:   NewRouterSlices(),
		WorkerPoolSize: zconf.GlobalObject.WorkerPoolSize,
		// One worker corresponds to one queue (一个worker对应一个queue)
		TaskQueue:   make([]chan ziface.IRequest, TaskQueueLen),
		freeWorkers: freeWorkers,
		builder:     newChainBuilder(),
		// 可额外临时分配的workerID集合
		extraFreeWorkers: extraFreeWorkers,
	}

中,TaskQueue: make([]chan ziface.IRequest, TaskQueueLen), 能否做 zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind 的区分,这样不会影响到之前的WorkerBind模式。

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskQueue 的长度 TaskQueueLen 已经有针对DynamicBind 模式的区分

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image
StartOneWorker() 中,对于!ok的处理,要不也加上模式的区分,否则会不会全部的管道读取错误都会归类到当前模式worker的关闭,这个还不确定。 @jursonmo

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

一旦关闭queue,不管什么模式下,worker都应该退出的,不然一直能从queue读到一个空的request。

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ type MsgHandle struct {
// (Worker负责取任务的消息队列)
TaskQueue []chan ziface.IRequest

// A collection of extra workers, used for zconf.WorkerModeDynamicBind
// (池里的工作线程不够用的时候, 可临时额外分配workerID集合, 用于zconf.WorkerModeDynamicBind)
extraFreeWorkers map[uint32]struct{}
extraFreeWorkerMu sync.Mutex

// Chain builder for the responsibility chain
// (责任链构造器)
builder *chainBuilder
Expand All @@ -48,6 +53,8 @@ type MsgHandle struct {
// zinxRole: IServer/IClient
func newMsgHandle() *MsgHandle {
var freeWorkers map[uint32]struct{}
var extraFreeWorkers map[uint32]struct{}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeBind {
// Assign a workder to each link, avoid interactions when multiple links are processed by the same worker
// MaxWorkerTaskLen can also be reduced, for example, 50
Expand All @@ -60,14 +67,32 @@ func newMsgHandle() *MsgHandle {
}
}

TaskQueueLen := zconf.GlobalObject.WorkerPoolSize

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind {
zlog.Ins().DebugF("WorkerMode = %s", zconf.WorkerModeDynamicBind)
freeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.WorkerPoolSize)
for i := uint32(0); i < zconf.GlobalObject.WorkerPoolSize; i++ {
freeWorkers[i] = struct{}{}
}

extraFreeWorkers = make(map[uint32]struct{}, zconf.GlobalObject.MaxConn-int(zconf.GlobalObject.WorkerPoolSize))
for i := zconf.GlobalObject.WorkerPoolSize; i < uint32(zconf.GlobalObject.MaxConn); i++ {
extraFreeWorkers[i] = struct{}{}
}
TaskQueueLen = uint32(zconf.GlobalObject.MaxConn)
}

handle := &MsgHandle{
Apis: make(map[uint32]ziface.IRouter),
RouterSlices: NewRouterSlices(),
WorkerPoolSize: zconf.GlobalObject.WorkerPoolSize,
// One worker corresponds to one queue (一个worker对应一个queue)
TaskQueue: make([]chan ziface.IRequest, zconf.GlobalObject.WorkerPoolSize),
TaskQueue: make([]chan ziface.IRequest, TaskQueueLen),
freeWorkers: freeWorkers,
builder: newChainBuilder(),
// 可额外临时分配的workerID集合
extraFreeWorkers: extraFreeWorkers,
}

// It is necessary to add the MsgHandle to the responsibility chain here, and it is the last link in the responsibility chain. After decoding in the MsgHandle, data distribution is done by router
Expand Down Expand Up @@ -97,6 +122,28 @@ func useWorker(conn ziface.IConnection) uint32 {
}
}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind {
mh.freeWorkerMu.Lock()
// try to get workerID from workerPool first
// 首先尝试从工作线程池里获取一个空闲的workerID
for workerID := range mh.freeWorkers {
delete(mh.freeWorkers, workerID)
mh.freeWorkerMu.Unlock()
return workerID
}
mh.freeWorkerMu.Unlock()

// 工作池的worker用完了,临时从extraFreeWorkers取一个额外的workerID, 并相应启动一个临时的worker
mh.extraFreeWorkerMu.Lock()
defer mh.extraFreeWorkerMu.Unlock()
for workerID := range mh.extraFreeWorkers {
zlog.Ins().DebugF("start extra worker, workerID=%d", workerID)
mh.TaskQueue[workerID] = make(chan ziface.IRequest, zconf.GlobalObject.MaxWorkerTaskLen)
go mh.StartOneWorker(int(workerID), mh.TaskQueue[workerID])
return workerID
}
}

//Compatible with the situation where the client has no worker, and solve the situation divide 0
//(兼容client没有worker情况,解决除0的情况)
if mh.WorkerPoolSize == 0 {
Expand Down Expand Up @@ -128,6 +175,23 @@ func freeWorker(conn ziface.IConnection) {

mh.freeWorkers[conn.GetWorkerID()] = struct{}{}
}

if zconf.GlobalObject.WorkerMode == zconf.WorkerModeDynamicBind {
workerID := conn.GetWorkerID()
if workerID < mh.WorkerPoolSize {
// 说明这个是工作线程池里的workerID,回收这个workerID, workerID对应的worker不需要销毁
mh.freeWorkerMu.Lock()
mh.freeWorkers[workerID] = struct{}{}
mh.freeWorkerMu.Unlock()
} else {
// 说明这个worker是一个临时的worker,需要销毁这个worker
mh.StopOneWorker(int(workerID))
// 回收workerID, 放回额外workerID池里
mh.extraFreeWorkerMu.Lock()
mh.extraFreeWorkers[workerID] = struct{}{}
mh.extraFreeWorkerMu.Unlock()
}
}
}

// Data processing interceptor that is necessary by default in Zinx
Expand Down Expand Up @@ -280,6 +344,13 @@ func (mh *MsgHandle) doMsgHandlerSlices(request ziface.IRequest, workerID int) {
PutRequest(request)
}

func (mh *MsgHandle) StopOneWorker(workerID int) {
zlog.Ins().DebugF("stop Worker ID = %d ", workerID)
// Stop the worker by closing the corresponding taskQueue
// (停止一个Worker,通过关闭对应的taskQueue)
close(mh.TaskQueue[workerID])
}

// StartOneWorker starts a worker workflow
// (启动一个Worker工作流程)
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
Expand All @@ -290,8 +361,13 @@ func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest
select {
// If there is a message, take out the Request from the queue and execute the bound business method
// (有消息则取出队列的Request,并执行绑定的业务方法)
case request := <-taskQueue:

case request, ok := <-taskQueue:
if !ok {
// DynamicBind Mode, destroy current worker by close the taskQueue
// (DynamicBind模式下,临时创建的worker, 是通过关闭taskQueue 来销毁当前worker)
zlog.Ins().ErrorF(" taskQueue is closed, Worker ID = %d quit", workerID)
return
}
switch req := request.(type) {

case ziface.IFuncRequest:
Expand Down
Loading