Skip to content

Commit

Permalink
cluster.connect has removed the retry mechanism.
Browse files Browse the repository at this point in the history
It was too complex and increased the usage burden on the caller.
  • Loading branch information
findstr committed Dec 18, 2024
1 parent b1d500f commit bf9ecad
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 66 deletions.
2 changes: 1 addition & 1 deletion lualib/core.lua
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ end,
log_info("[sys.core] SILLY_UDP fd:", fd, "closed")
end
end,
[7] = function(signum) --SILLY_ERROR = 7
[7] = function(signum) --SILLY_SIGNAL = 7
local fn = signal_dispatch[signum]
if fn then
local t = task_create(fn)
Expand Down
79 changes: 18 additions & 61 deletions lualib/core/cluster.lua
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
local core = require "core"
local mutex = require "core.sync.mutex"
local dns = require "core.dns"
local logger = require "core.logger"
local np = require "core.netpacket"
local type = type
local pairs = pairs
local assert = assert
local format = string.format
local tcp_connect = core.tcp_connect
local tcp_send = core.tcp_send
local tcp_close = core.socket_close
Expand All @@ -28,51 +28,24 @@ local mt = {
}

local function connect_wrapper(self)
local lock = self.__lock
local fdaddr = self.__fdaddr
local connecting = self.__connecting
return function(addr)
local fd = fdaddr[addr]
if fd then
return fd, "connected"
end
connecting[addr] = true
local l<close> = lock:lock(addr)
fd = fdaddr[addr]
if fd then
return fd, "connected"
end
while connecting[addr] do
local newaddr = addr
local name, port = addr:match("([^:]+):(%d+)")
if dns.isname(name) then
local ip = dns.lookup(name, dns.A)
if ip then
newaddr = ip .. ":" .. port
else
newaddr = nil
logger.error("[rpc.client] dns lookup fail", name)
end
local newaddr = addr
local name, port = addr:match("([^:]+):(%d+)")
if dns.isname(name) then
local ip = dns.lookup(name, dns.A)
if ip then
newaddr = ip .. ":" .. port
else
return nil, format("dns lookup:%s fail", name)
end
if newaddr then
local fd, errno = tcp_connect(newaddr, self.__event)
if fd then
if connecting[addr] then
connecting[addr] = nil
fdaddr[addr] = fd
fdaddr[fd] = addr
return fd, "ok"
else --already close
tcp_close(fd)
return nil, "active closed"
end
else
logger.error("[rpc.client] connect fail", addr, errno)
end
end
core.sleep(1000)
logger.info("[rpc.client] retry connect:", addr)
end
local fd, errstr = tcp_connect(newaddr, self.__event)
if not fd then
return fd, errstr
end
fdaddr[fd] = addr
return fd, "ok"
end
end

Expand All @@ -82,29 +55,18 @@ local function listen_wrapper(self)
if not fd then
return fd, errno
end
self.__fdaddr[addr] = fd
self.__fdaddr[fd] = addr
return fd, nil
end
end

local function close_wrapper(self)
return function(addr)
local connecting = self.__connecting
if connecting[addr] then
connecting[addr] = nil
return true, "connecting"
end
return function(fd)
local fdaddr = self.__fdaddr
local fd = fdaddr[addr]
if not fd then
if not fdaddr[fd] then
return false, "closed"
end
if type(addr) == "string" then
addr, fd = fd, addr
end
fdaddr[fd] = nil
fdaddr[addr] = nil
core.socket_close(fd)
return true, "connected"
end
Expand All @@ -125,7 +87,6 @@ local function init_event(self, conf)
local EVENT = {}
function EVENT.accept(fd, _, addr)
fdaddr[fd] = addr
fdaddr[addr] = fd
local ok, err = pcall(accept, fd, addr)
if not ok then
logger.error("[rpc.server] EVENT.accept", err)
Expand All @@ -135,9 +96,7 @@ local function init_event(self, conf)
end

function EVENT.close(fd, errno)
local addr = fdaddr[fd]
fdaddr[fd] = nil
fdaddr[addr] = nil
local ok, err = pcall(close, fd, errno)
if not ok then
logger.error("[rpc.server] EVENT.close", err)
Expand Down Expand Up @@ -272,19 +231,17 @@ local M = {}
function M.new(conf)
---@class core.cluster
local obj = {
__lock = mutex:new(),
__ctx = np.create(),
__fdaddr = {},
__waitpool = {},
__waitcmd = {},
__waitfor = nil,
__event = nil,
__connecting = {},
---@type async fun(addr:string):number?,string?
connect = nil,
---@type fun(addr:string, backlog?:number):number?,string?
listen = nil,
---@type fun(addr:string):boolean,string?
---@type fun(addr:string|integer):boolean,string?
close = nil,
---@type fun(fd:number, cmd:string, obj:any):any?,string?
call = nil,
Expand Down
8 changes: 4 additions & 4 deletions test/testrpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ local server = cluster.new {
end,
}

server.listen(":8989")
local listen_fd = server.listen(":8989")
local client_fd
local client = cluster.new {
timeout = 1000,
Expand Down Expand Up @@ -182,8 +182,8 @@ end

client_part()
server_part()
client.close("127.0.0.1:8989")
server.close(":8989")
server.close(accept_addr)
client.close(client_fd)
server.close(listen_fd)
server.close(accept_fd)
testaux.asserteq(next(client.__fdaddr), nil, "client fdaddr empty")
testaux.asserteq(next(server.__fdaddr), nil, "client fdaddr empty")

0 comments on commit bf9ecad

Please sign in to comment.