From 8a440ede9e0e8b7d0b8b9c05c696470ef1a31351 Mon Sep 17 00:00:00 2001 From: DerekBum Date: Thu, 4 Apr 2024 16:41:40 +0300 Subject: [PATCH] api: fix `INIT` state stuck Sometimes, instance could enter the queue initialization while still not running (for example, left in the orphan mode). This resulted in "lazy start". But Tarantool does not call `box.cfg {}` after leaving orphan mode, so queue could stuck in the `INIT` state. Now we wait in the background for instances, that are not running. It is similar to lazy init for read-only instances. Note that this fix works only for Tarantool versions >= 2.10.0. This is because of used watchers. Closes #226 --- CHANGELOG.md | 7 +++ queue/init.lua | 33 ++++++++-- t/230-orphan-not-stalling-init.t | 102 +++++++++++++++++++++++++++++++ 3 files changed, 138 insertions(+), 4 deletions(-) create mode 100755 t/230-orphan-not-stalling-init.t diff --git a/CHANGELOG.md b/CHANGELOG.md index ae8bf024..0c39331a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html). +## [Unreleased] + +### Fixed + +- Stuck in `INIT` state if an instance failed to enter the `running` mode + in time (#226). This fix works only for Tarantool versions >= 2.10.0. + ## [1.3.3] - 2023-09-13 ### Fixed diff --git a/queue/init.lua b/queue/init.lua index 40b9268c..8f3f79c9 100644 --- a/queue/init.lua +++ b/queue/init.lua @@ -1,5 +1,8 @@ +local fiber = require('fiber') + local abstract = require('queue.abstract') local queue_state = require('queue.abstract.queue_state') +local qc = require('queue.compat') local queue = nil -- load all core drivers @@ -11,6 +14,10 @@ local core_drivers = { limfifottl = require('queue.abstract.driver.limfifottl') } +-- since: +-- https://github.com/locker/tarantool/commit/8cf5151cb4f05cee3fd0ea831add2b3187a01fe4 +local watchers_supported = qc.check_version({2, 10, 0}) + local function register_driver(driver_name, tube_ctr) if type(tube_ctr.create_space) ~= 'function' or type(tube_ctr.new) ~= 'function' then @@ -62,6 +69,19 @@ local orig_call = nil local wrapper_impl +local function running_waiter() + fiber.name('queue running waiter') + local wait_cond = fiber.cond() + local w = box.watch('box.status', function(_, new_status) + if new_status.status == 'running' then + wait_cond:signal() + end + end) + wait_cond:wait() + w:unregister() + return wrapper_impl() +end + local function cfg_wrapper(...) box.cfg = orig_cfg return wrapper_impl(...) @@ -79,10 +99,15 @@ local function wrap_box_cfg() orig_cfg = box.cfg box.cfg = cfg_wrapper elseif type(box.cfg) == 'table' then - -- box.cfg after the first box.cfg call - local cfg_mt = getmetatable(box.cfg) - orig_call = cfg_mt.__call - cfg_mt.__call = cfg_call_wrapper + if watchers_supported and box.info.status ~= 'running' then + -- Wait for the running state and initialize the queue. + fiber.new(running_waiter) + else + -- box.cfg after the first box.cfg call + local cfg_mt = getmetatable(box.cfg) + orig_call = cfg_mt.__call + cfg_mt.__call = cfg_call_wrapper + end else error('The box.cfg type is unexpected: ' .. type(box.cfg)) end diff --git a/t/230-orphan-not-stalling-init.t b/t/230-orphan-not-stalling-init.t new file mode 100755 index 00000000..48bd7f69 --- /dev/null +++ b/t/230-orphan-not-stalling-init.t @@ -0,0 +1,102 @@ +#!/usr/bin/env tarantool + +local test = require('tap').test('') +local queue = require('queue') +local tnt = require('t.tnt') +local fio = require('fio') +local fiber = require('fiber') + +rawset(_G, 'queue', require('queue')) + +local qc = require('queue.compat') +if not qc.check_version({2, 10, 0}) then + require('log').info('Tests skipped, tarantool version < 2.10.0') + return +end + +local snapdir_optname = qc.snapdir_optname +local logger_optname = qc.logger_optname + +test:plan(1) + +test:test('Check orphan mode not stalling queue', function(test) + test:plan(4) + local engine = os.getenv('ENGINE') or 'memtx' + tnt.cluster.cfg{} + + local dir_replica = fio.tempdir() + local cmd_replica = { + arg[-1], + '-e', + [[ + box.cfg { + replication = { + 'replicator:password@127.0.0.1:3399', + 'replicator:password@127.0.0.1:3398', + }, + listen = '127.0.0.1:3396', + wal_dir = ']] .. dir_replica .. '\'' .. + ',' .. snapdir_optname() .. ' = \'' .. dir_replica .. '\'' .. + ',' .. logger_optname() .. ' = \'' .. + fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' .. + '}' + } + + replica = require('popen').new(cmd_replica, { + stdin = 'devnull', + stdout = 'devnull', + stderr = 'devnull', + }) + + local attempts = 0 + -- Wait for replica to connect. + while box.info.replication[3] == nil or box.info.replication[3].downstream.status ~= 'follow' do + attempts = attempts + 1 + if attempts == 30 then + error('wait for replica connection') + end + fiber.sleep(0.1) + end + + local conn = require('net.box').connect('127.0.0.1:3396') + + conn:eval([[ + box.cfg{ + replication = { + 'replicator:password@127.0.0.1:3399', + 'replicator:password@127.0.0.1:3398', + 'replicator:password@127.0.0.1:3396', + }, + listen = '127.0.0.1:3397', + replication_connect_quorum = 4, + } + ]]) + + conn:eval('rawset(_G, "queue", require("queue"))') + + test:is(conn:call('queue.state'), 'INIT', 'check queue state') + test:is(conn:call('box.info').ro, true, 'check read only') + test:is(conn:call('box.info').ro_reason, 'orphan', 'check ro reason') + + conn:eval('box.cfg{replication_connect_quorum = 2}') + + local attempts = 0 + while true do + if conn:call('queue.state') == 'RUNNING' then + test:is(conn:call('queue.state'), 'RUNNING', + 'check queue state after orphan') + return + end + attempts = attempts + 1 + if attempts == 10 then + break + end + fiber.sleep(0.1) + end + test:is(conn:call('queue.state'), 'RUNNING', 'check queue state after orphan') +end) + +rawset(_G, 'queue', nil) +tnt.finish() +os.exit(test:check() and 0 or 1) +-- vim: set ft=lua :