Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use checker.name to format periodic lock to distribute active check task to multiple workers #44

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 45 additions & 50 deletions lib/resty/healthcheck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -1668,7 +1668,7 @@ function _M.new(opts)
self.TARGET_LIST = SHM_PREFIX .. self.name .. ":target_list"
self.TARGET_LIST_LOCK = SHM_PREFIX .. self.name .. ":target_list_lock"
self.TARGET_LOCK = SHM_PREFIX .. self.name .. ":target_lock"
self.PERIODIC_LOCK = SHM_PREFIX .. ":period_lock:"
self.PERIODIC_LOCK = SHM_PREFIX .. self.name .. ":period_lock"
-- prepare constants
self.EVENT_SOURCE = EVENT_SOURCE_PREFIX .. " [" .. self.name .. "]"
self.LOG_PREFIX = LOG_PREFIX .. "(" .. self.name .. ") "
Expand Down Expand Up @@ -1723,7 +1723,6 @@ function _M.new(opts)

self:log(DEBUG, "worker ", ngx_worker_id(), " (pid: ", ngx_worker_pid(), ") ",
"starting active check timer")
local shm, key = self.shm, self.PERIODIC_LOCK
last_cleanup_check = ngx_now()
active_check_timer, err = resty_timer({
recurring = true,
Expand All @@ -1732,64 +1731,60 @@ function _M.new(opts)
detached = false,
expire = function()

if get_periodic_lock(shm, key) then
active_check_timer.interval = CHECK_INTERVAL
renew_periodic_lock(shm, key)
else
active_check_timer.interval = CHECK_INTERVAL * 10
return
end

local cur_time = ngx_now()
for _, checker_obj in pairs(hcs) do

if (last_cleanup_check + CLEANUP_INTERVAL) < cur_time then
-- clear targets marked for delayed removal
locking_target_list(checker_obj, function(target_list)
local removed_targets = {}
local index = 1
while index <= #target_list do
local target = target_list[index]
if target.purge_time and target.purge_time <= cur_time then
table_insert(removed_targets, target)
table_remove(target_list, index)
else
index = index + 1
local shm, key = checker_obj.shm, checker_obj.PERIODIC_LOCK
if get_periodic_lock(shm, key) then
renew_periodic_lock(shm, key)

if (last_cleanup_check + CLEANUP_INTERVAL) < cur_time then
-- clear targets marked for delayed removal
locking_target_list(checker_obj, function(target_list)
local removed_targets = {}
local index = 1
while index <= #target_list do
local target = target_list[index]
if target.purge_time and target.purge_time <= cur_time then
table_insert(removed_targets, target)
table_remove(target_list, index)
else
index = index + 1
end
end
end

if #removed_targets > 0 then
target_list = serialize(target_list)
if #removed_targets > 0 then
target_list = serialize(target_list)

local ok, err = shm:set(checker_obj.TARGET_LIST, target_list)
if not ok then
return nil, "failed to store target_list in shm: " .. err
end
local ok, err = shm:set(checker_obj.TARGET_LIST, target_list)
if not ok then
return nil, "failed to store target_list in shm: " .. err
end

for _, target in ipairs(removed_targets) do
clear_target_data_from_shm(checker_obj, target.ip, target.port, target.hostname)
checker_obj:raise_event(checker_obj.events.remove, target.ip, target.port, target.hostname)
for _, target in ipairs(removed_targets) do
clear_target_data_from_shm(checker_obj, target.ip, target.port, target.hostname)
checker_obj:raise_event(checker_obj.events.remove, target.ip, target.port, target.hostname)
end
end
end
end)
end)

last_cleanup_check = cur_time
end
last_cleanup_check = cur_time
end

if checker_obj.checks.active.healthy.active and
(checker_obj.checks.active.healthy.last_run +
checker_obj.checks.active.healthy.interval <= cur_time)
then
checker_obj.checks.active.healthy.last_run = cur_time
checker_callback(checker_obj, "healthy")
end
if checker_obj.checks.active.healthy.active and
(checker_obj.checks.active.healthy.last_run +
checker_obj.checks.active.healthy.interval <= cur_time)
then
checker_obj.checks.active.healthy.last_run = cur_time
checker_callback(checker_obj, "healthy")
end

if checker_obj.checks.active.unhealthy.active and
(checker_obj.checks.active.unhealthy.last_run +
checker_obj.checks.active.unhealthy.interval <= cur_time)
then
checker_obj.checks.active.unhealthy.last_run = cur_time
checker_callback(checker_obj, "unhealthy")
if checker_obj.checks.active.unhealthy.active and
(checker_obj.checks.active.unhealthy.last_run +
checker_obj.checks.active.unhealthy.interval <= cur_time)
then
checker_obj.checks.active.unhealthy.last_run = cur_time
checker_callback(checker_obj, "unhealthy")
end
end
end
end,
Expand Down
Loading