Skip to content

Commit

Permalink
Make Launcher the starting point in all cases (#909)
Browse files Browse the repository at this point in the history
Fixes leadership release on graceful shutdown
  • Loading branch information
spuun authored Jan 16, 2025
1 parent fd150d0 commit d46c3b8
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 56 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Rescue more exceptions while reading msg store on startup [#865](https://github.com/cloudamqp/lavinmq/pull/865)
- Crystal 1.15 support [#905](https://github.com/cloudamqp/lavinmq/pull/905)
- lavinmqctl now handles pagination of large result sets [#904](https://github.com/cloudamqp/lavinmq/pull/904)
- Make clustering more reliable [#879](https://github.com/cloudamqp/lavinmq/pull/879)
- Make clustering more reliable [#879](https://github.com/cloudamqp/lavinmq/pull/879), [#909](https://github.com/cloudamqp/lavinmq/pull/909), [#906](https://github.com/cloudamqp/lavinmq/pull/906)
- Strip newlines from logs [#896](https://github.com/cloudamqp/lavinmq/pull/896)

### Added
Expand Down
47 changes: 44 additions & 3 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./spec_helper"
require "../src/lavinmq/launcher"
require "../src/lavinmq/clustering/client"
require "../src/lavinmq/clustering/controller"

Expand Down Expand Up @@ -36,7 +37,7 @@ describe LavinMQ::Clustering::Client do
begin
spec.run
ensure
p.terminate(graceful: false)
p.terminate(graceful: false) rescue nil
FileUtils.rm_rf "/tmp/clustering-spec.etcd"
FileUtils.rm_rf follower_data_dir
end
Expand Down Expand Up @@ -134,10 +135,10 @@ describe LavinMQ::Clustering::Client do
end
sleep 0.5.seconds
spawn(name: "failover1") do
controller1.run
controller1.run { }
end
spawn(name: "failover2") do
controller2.run
controller2.run { }
end
sleep 0.1.seconds
leader = listen.receive
Expand All @@ -155,4 +156,44 @@ describe LavinMQ::Clustering::Client do
else fail("no leader elected")
end
end

it "will release lease on shutdown" do
config = LavinMQ::Config.new
config.data_dir = "/tmp/release-lease"
config.clustering = true
config.clustering_etcd_endpoints = "localhost:12379"
config.clustering_advertised_uri = "tcp://localhost:5681"
launcher = LavinMQ::Launcher.new(config)

election_done = Channel(Nil).new
etcd = LavinMQ::Etcd.new(config.clustering_etcd_endpoints)
spawn do
etcd.elect_listen("lavinmq/leader") { election_done.close }
end

spawn { launcher.run }

# Wait until our "launcher" is leader
election_done.receive?

# The spec gets a lease to use in an election campaign
lease_id, _ttl = etcd.lease_grant(5)

# graceful stop...
spawn { launcher.stop }

# Let the spec campaign for leadership...
elected = Channel(Nil).new
spawn do
etcd.election_campaign("lavinmq/leader", "spec", lease_id)
elected.close
end

# ... and verify spec is elected
select
when elected.receive?
when timeout(1.seconds)
fail("election campaign did not finish in time, leadership not released on launcher stop?")
end
end
end
8 changes: 1 addition & 7 deletions src/lavinmq.cr
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,4 @@ config.parse # both ARGV and config file

# config has to be loaded before we require vhost/queue, byte_format is a constant
require "./lavinmq/launcher"
require "./lavinmq/clustering/controller"

if config.clustering?
LavinMQ::Clustering::Controller.new(config).run
else
LavinMQ::Launcher.new(config).run
end
LavinMQ::Launcher.new(config).run
9 changes: 0 additions & 9 deletions src/lavinmq/clustering/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ module LavinMQ
@unix_http_proxy = Proxy.new(@config.http_unix_path) unless @config.http_unix_path.empty?
end
HTTP::Server.follower_internal_socket_http_server

Signal::INT.trap { close_and_exit }
Signal::TERM.trap { close_and_exit }
end

private def close_and_exit
Log.info { "Received termination signal, shutting down..." }
close
exit 0
end

def follow(uri : String)
Expand Down
26 changes: 18 additions & 8 deletions src/lavinmq/clustering/controller.cr
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
require "../launcher"
require "../etcd"
require "./client"

class LavinMQ::Clustering::Controller
Log = LavinMQ::Log.for "clustering.controller"

@id : Int32
getter id : Int32

def initialize(@config = Config.instance)
@etcd = Etcd.new(@config.clustering_etcd_endpoints)
@repli_client : Client? = nil

def self.new(config : Config)
etcd = Etcd.new(config.clustering_etcd_endpoints)
new(config, etcd)
end

def initialize(@config : Config, @etcd : Etcd)
@id = clustering_id
@advertised_uri = @config.clustering_advertised_uri ||
"tcp://#{System.hostname}:#{@config.clustering_port}"
end

def run
# This method is called by the Launcher#run.
# The block will be yielded when the controller's prerequisites for a leader
# to start are met, i.e when the current node has been elected leader.
# The method is blocking.
def run(&)
spawn(follow_leader, name: "Follower monitor")
wait_to_be_insync
@lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader
@repli_client.try &.close
# TODO: make sure we still are in the ISR set
@launcher = Launcher.new(@config, @etcd).start
yield
loop do
if lease.wait(30.seconds)
break if @stopped
Expand All @@ -35,7 +45,7 @@ class LavinMQ::Clustering::Controller

def stop
@stopped = true
@launcher.try &.stop
@repli_client.try &.close
@lease.try &.release
end

Expand Down Expand Up @@ -74,7 +84,7 @@ class LavinMQ::Clustering::Controller
break
end
end
repli_client = r = Clustering::Client.new(@config, @id, secret)
@repli_client = repli_client = r = Clustering::Client.new(@config, @id, secret)
spawn r.follow(uri), name: "Clustering client #{uri}"
SystemD.notify_ready
end
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/clustering/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module LavinMQ
@id : Int32
@config : Config

def initialize(config : Config, @etcd : Etcd, @id = File.read(File.join(config.data_dir, ".clustering_id")).to_i(36))
def initialize(config : Config, @etcd : Etcd, @id : Int32)
Log.info { "ID: #{@id.to_s(36)}" }
@config = config
@data_dir = @config.data_dir
Expand Down
80 changes: 53 additions & 27 deletions src/lavinmq/launcher.cr
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,33 @@ require "./http/http_server"
require "./in_memory_backend"
require "./data_dir_lock"
require "./etcd"
require "./clustering/controller"

module LavinMQ
struct StandaloneRunner
# The block will be yielded when the runner's prerequisites for a leader
# to start are met. For the standalone runner, this is immediately.
# The method is blocking.
def run(&)
yield
loop do
sleep 30.seconds
GC.collect
end
end

def stop
end
end

class Launcher
Log = LavinMQ::Log.for "launcher"
@tls_context : OpenSSL::SSL::Context::Server?
@first_shutdown_attempt = true
@data_dir_lock : DataDirLock?
@closed = false

def initialize(@config : Config, etcd : Etcd? = nil)
def initialize(@config : Config)
print_environment_info
print_max_map_count
fd_limit = System.maximize_fd_limit
Expand All @@ -27,28 +44,36 @@ module LavinMQ
end
Dir.mkdir_p @config.data_dir
if @config.data_dir_lock?
@data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire
@data_dir_lock = DataDirLock.new(@config.data_dir)
end
@replicator = replicator = etcd ? Clustering::Server.new(@config, etcd) : Clustering::NoopServer.new
@amqp_server = LavinMQ::Server.new(@config, replicator)
@http_server = LavinMQ::HTTP::Server.new(@amqp_server)

if @config.clustering?
etcd = Etcd.new(@config.clustering_etcd_endpoints)
@runner = controller = Clustering::Controller.new(@config, etcd)
@replicator = Clustering::Server.new(@config, etcd, controller.id)
else
@replicator = Clustering::NoopServer.new
@runner = StandaloneRunner.new
end

@tls_context = create_tls_context if @config.tls_configured?
reload_tls_context
setup_signal_traps
setup_log_exchange
end

def start : self
listen
private def start : self
@data_dir_lock.try &.acquire
@amqp_server = amqp_server = LavinMQ::Server.new(@config, @replicator)
@http_server = http_server = LavinMQ::HTTP::Server.new(amqp_server)
setup_log_exchange(amqp_server)
start_listeners(amqp_server, http_server)
SystemD.notify_ready
self
end

def run
start
loop do
sleep 30.seconds
GC.collect
@runner.run do
start
end
end

Expand All @@ -57,10 +82,11 @@ module LavinMQ
@closed = true
Log.warn { "Stopping" }
SystemD.notify_stopping
@http_server.close rescue nil
@amqp_server.close rescue nil
@data_dir_lock.try &.release
@http_server.try &.close rescue nil
@amqp_server.try &.close rescue nil
@runner.stop
@replicator.close
@data_dir_lock.try &.release
end

private def print_environment_info
Expand Down Expand Up @@ -90,10 +116,10 @@ module LavinMQ
{% end %}
end

private def setup_log_exchange
private def setup_log_exchange(amqp_server)
return unless @config.log_exchange?
exchange_name = "amq.lavinmq.log"
vhost = @amqp_server.vhosts["/"]
vhost = amqp_server.vhosts["/"]
vhost.declare_exchange(exchange_name, "topic", true, false, true)
spawn(name: "Log Exchange") do
log_channel = ::Log::InMemoryBackend.instance.add_channel
Expand All @@ -108,42 +134,42 @@ module LavinMQ
end
end

private def listen
private def start_listeners(amqp_server, http_server)
if @config.amqp_port > 0
spawn @amqp_server.listen(@config.amqp_bind, @config.amqp_port),
spawn amqp_server.listen(@config.amqp_bind, @config.amqp_port),
name: "AMQP listening on #{@config.amqp_port}"
end

if @config.amqps_port > 0
if ctx = @tls_context
spawn @amqp_server.listen_tls(@config.amqp_bind, @config.amqps_port, ctx),
spawn amqp_server.listen_tls(@config.amqp_bind, @config.amqps_port, ctx),
name: "AMQPS listening on #{@config.amqps_port}"
end
end

if clustering_bind = @config.clustering_bind
spawn @amqp_server.listen_clustering(clustering_bind, @config.clustering_port), name: "Clustering listener"
spawn amqp_server.listen_clustering(clustering_bind, @config.clustering_port), name: "Clustering listener"
end

unless @config.unix_path.empty?
spawn @amqp_server.listen_unix(@config.unix_path), name: "AMQP listening at #{@config.unix_path}"
spawn amqp_server.listen_unix(@config.unix_path), name: "AMQP listening at #{@config.unix_path}"
end

if @config.http_port > 0
@http_server.bind_tcp(@config.http_bind, @config.http_port)
http_server.bind_tcp(@config.http_bind, @config.http_port)
end
if @config.https_port > 0
if ctx = @tls_context
@http_server.bind_tls(@config.http_bind, @config.https_port, ctx)
http_server.bind_tls(@config.http_bind, @config.https_port, ctx)
end
end
unless @config.http_unix_path.empty?
@http_server.bind_unix(@config.http_unix_path)
http_server.bind_unix(@config.http_unix_path)
end

@http_server.bind_internal_unix
http_server.bind_internal_unix
spawn(name: "HTTP listener") do
@http_server.not_nil!.listen
http_server.listen
end
end

Expand Down
4 changes: 4 additions & 0 deletions src/lavinmq/reporter.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
module LavinMQ
class Reporter
def self.report(s)
if s.nil?
puts "No server instance to report"
return
end
puts_size_capacity s.@users
s.users.each do |name, user|
puts "User #{name}"
Expand Down

0 comments on commit d46c3b8

Please sign in to comment.