From bb0e205dda157f43c1a41a7a0403867e864a9ead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 13 Dec 2024 12:42:06 +0100 Subject: [PATCH 01/25] More reliable clustering Catch and explicity reraise IO::Errors in etcd, otherwise when an Etcd method yielded, and that inner call raised IO::Error that was interpreted as a Etcd error. Extra logging related to Following Start etcd lease keepalive after won election Apprently there's no need to update lease TTL until the election is won Refactor Leadership lease keepalive dont log Lost leadership if manually revoked etcd error are sometimes json, sometimes not don't let Launcher know about clustering/leases Let it be a concern for Clustering Controller No need to poll the data dir lock, because it's only required for NFS disks. --- spec/etcd_spec.cr | 1 + src/lavinmq/clustering/client.cr | 1 + src/lavinmq/clustering/controller.cr | 22 ++++- src/lavinmq/data_dir_lock.cr | 4 +- src/lavinmq/etcd.cr | 137 ++++++++++++++++----------- src/lavinmq/launcher.cr | 27 ++---- 6 files changed, 113 insertions(+), 79 deletions(-) diff --git a/spec/etcd_spec.cr b/spec/etcd_spec.cr index a2e73d9359..e0dad4d477 100644 --- a/spec/etcd_spec.cr +++ b/spec/etcd_spec.cr @@ -1,6 +1,7 @@ require "spec" require "../src/lavinmq/etcd" require "file_utils" +require "http/client" describe LavinMQ::Etcd do it "can put and get" do diff --git a/src/lavinmq/clustering/client.cr b/src/lavinmq/clustering/client.cr index 26c97ece42..6e5e88c092 100644 --- a/src/lavinmq/clustering/client.cr +++ b/src/lavinmq/clustering/client.cr @@ -57,6 +57,7 @@ module LavinMQ end def follow(host : String, port : Int32) + Log.info { "Following #{host}:#{port}" } @host = host @port = port if amqp_proxy = @amqp_proxy diff --git a/src/lavinmq/clustering/controller.cr b/src/lavinmq/clustering/controller.cr index 19f81b7c4d..d39fd1cd88 100644 --- a/src/lavinmq/clustering/controller.cr +++ b/src/lavinmq/clustering/controller.cr @@ -17,14 +17,27 @@ class LavinMQ::Clustering::Controller def run spawn(follow_leader, name: "Follower monitor") wait_to_be_insync - lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + @lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader + # TODO: make sure we still are in the ISR set replicator = Clustering::Server.new(@config, @etcd) - @launcher = l = Launcher.new(@config, replicator, lease) - l.run + @launcher = Launcher.new(@config, replicator).start + loop do + if lease.wait(30.seconds) + break if @stopped + Log.fatal { "Lost cluster leadership" } + exit 3 + else + GC.collect + end + end end + @stopped = false + def stop + @stopped = true @launcher.try &.stop + @lease.try &.release end # Each node in a cluster has an unique id, for tracking ISR @@ -66,6 +79,9 @@ class LavinMQ::Clustering::Controller spawn r.follow(uri), name: "Clustering client #{uri}" SystemD.notify_ready end + rescue ex + Log.fatal(exception: ex) { "Unhandled exception while following leader" } + exit 36 # 36 for CF (Cluster Follower) end def wait_to_be_insync diff --git a/src/lavinmq/data_dir_lock.cr b/src/lavinmq/data_dir_lock.cr index 87d752a4f6..2654b0499f 100644 --- a/src/lavinmq/data_dir_lock.cr +++ b/src/lavinmq/data_dir_lock.cr @@ -21,6 +21,7 @@ module LavinMQ @lock.flock_exclusive(blocking: true) Log.info { "Lock acquired" } end + Log.debug { "Data directory lock aquired" } @lock.truncate @lock.print "PID #{Process.pid} @ #{System.hostname}" @lock.fsync @@ -37,7 +38,8 @@ module LavinMQ def poll @lock.read_at(0, 1, &.read_byte) || raise IO::EOFError.new rescue ex : IO::Error | ArgumentError - abort "ERROR: Lost data directory lock! #{ex.inspect}" + Log.fatal(exception: ex) { "Lost data dir lock" } + exit 4 # 4 for D(dataDir) end end end diff --git a/src/lavinmq/etcd.cr b/src/lavinmq/etcd.cr index fab7a3728b..e9acad2135 100644 --- a/src/lavinmq/etcd.cr +++ b/src/lavinmq/etcd.cr @@ -1,4 +1,4 @@ -require "http/client" +require "socket" require "wait_group" require "json" require "./logger" @@ -35,7 +35,7 @@ module LavinMQ def watch(key, &) body = %({"create_request":{"key":"#{Base64.strict_encode key}"}}) - post_stream("/v3/watch", body) do |json| + stream("/v3/watch", body) do |json| next if json.dig?("result", "created") == true # "watch created" is first event if value = json.dig?("result", "events", 0, "kv", "value") @@ -61,10 +61,17 @@ module LavinMQ if ttl = json.dig?("result", "TTL") ttl.as_s.to_i else - raise Error.new("Lost lease #{id}") + raise Error.new("Lease #{id} expired") end end + def lease_ttl(id) : Int32 + json = post("/v3/lease/timetolive", body: %({"ID":"#{id}"})) + ttl = json["TTL"].as_s.to_i + raise Error.new("Lease #{id} expired") if ttl < 0 + ttl + end + def lease_revoke(id) : Nil post("/v3/lease/revoke", body: %({"ID":"#{id}"})) end @@ -81,56 +88,51 @@ module LavinMQ # Returns when elected leader # Returns a `Leadership` instance def elect(name, value, ttl = 10) : Leadership - channel = Channel(Nil).new - lease_id, ttl = lease_grant(ttl) - wg = WaitGroup.new(1) - spawn(name: "Etcd lease keepalive #{lease_id}") do - wg.done - loop do - select - when channel.receive? - lease_revoke(lease_id) - channel.close - break - when timeout((ttl * 0.7).seconds) - ttl = lease_keepalive(lease_id) - end - rescue ex - Log.warn { "Lost leadership of #{name}: #{ex}" } - channel.close - break - end - end + lease_id, _ttl = lease_grant(ttl) election_campaign(name, value, lease_id) - wg.wait - Leadership.new(self, lease_id, channel) + Leadership.new(self, lease_id) end - # Represents a holding a Leadership + # Represents holding a Leadership # Can be revoked or wait until lost class Leadership - def initialize(@etcd : Etcd, @lease_id : Int64, @lost_leadership_channel : Channel(Nil)) + def initialize(@etcd : Etcd, @lease_id : Int64) + @lost_leadership = Channel(Nil).new + spawn(keepalive_loop, name: "Etcd lease keepalive #{@lease_id}") end # Force release leadership def release @etcd.lease_revoke(@lease_id) + @lost_leadership.close end # Wait until looses leadership # Returns true when lost leadership, false when timeout occured def wait(timeout : Time::Span) : Bool select - when @lost_leadership_channel.receive? - return true + when @lost_leadership.receive? + true when timeout(timeout) - return false + false end end + + private def keepalive_loop + ttl = @etcd.lease_ttl(@lease_id) + loop do + sleep (ttl * 0.7).seconds + ttl = @etcd.lease_keepalive(@lease_id) + end + rescue ex + Log.error(exception: ex) { "Lost leadership" } unless @lost_leadership.closed? + ensure + @lost_leadership.close + end end def elect_listen(name, &) - post_stream("/v3/election/observe", %({"name":"#{Base64.strict_encode name}"})) do |json| + stream("/v3/election/observe", %({"name":"#{Base64.strict_encode name}"})) do |json| if value = json.dig?("result", "kv", "value") yield Base64.decode_string(value.as_s) else @@ -153,31 +155,37 @@ module LavinMQ chunks = read_chunks(tcp) parse_json! chunks else - body = tcp.read_string(content_length) + body = read_string(tcp, content_length) parse_json! body end end - private def post_stream(path, body, & : JSON::Any -> _) + private def stream(path, body, & : JSON::Any -> _) with_tcp do |tcp, address| - send_request(tcp, address, path, body) - content_length = read_headers(tcp) - if content_length == -1 # Chunked response - read_chunks(tcp) do |chunk| - yield parse_json! chunk - end - else - body = tcp.read_string(content_length) - yield parse_json! body + post_stream(tcp, address, path, body) do |chunk| + yield chunk end end end + private def post_stream(tcp, address, path, body, & : JSON::Any -> _) + send_request(tcp, address, path, body) + content_length = read_headers(tcp) + if content_length == -1 # Chunked response + read_chunks(tcp) do |chunk| + yield parse_json! chunk + end + else + body = read_string(tcp, content_length) + yield parse_json! body + end + end + private def read_chunks(tcp, & : String -> _) : Nil response_finished = false loop do - bytesize = tcp.read_line.to_i(16) - chunk = tcp.read_string(bytesize) + bytesize = read_chunk_size(tcp) + chunk = read_string(tcp, bytesize) tcp.skip(2) # each chunk ends with \r\n break if bytesize.zero? yield chunk @@ -196,6 +204,20 @@ module LavinMQ break if bytesize.zero? end end + rescue ex : IO::Error + raise Error.new("Read chunked response error", cause: ex) + end + + def read_string(tcp, content_length) : String + tcp.read_string(content_length) + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) + end + + def read_chunk_size(tcp) : Int32 + tcp.read_line.to_i(16) + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) end private def send_request(tcp : IO, address : String, path : String, body : String) @@ -205,6 +227,8 @@ module LavinMQ tcp << "\r\n" tcp << body tcp.flush + rescue ex : IO::Error + raise Error.new("Send request error", cause: ex) end # Parse response headers, return Content-Length (-1 implies chunked response) @@ -233,6 +257,8 @@ module LavinMQ end end content_length + rescue ex : IO::Error + raise Error.new("Read response error", cause: ex) end @connections = Deque(Tuple(TCPSocket, String)).new @@ -248,10 +274,6 @@ module LavinMQ Log.warn { "Service Unavailable at #{address}, #{ex.message}, retrying" } socket.close rescue nil sleep 0.1.seconds - rescue IO::Error - Log.warn { "Lost connection to #{address}, retrying" } - socket.close rescue nil - sleep 0.1.seconds ensure @connections.push({socket, address}) unless socket.closed? end @@ -313,18 +335,19 @@ module LavinMQ private def raise_if_error(json) if error = json["error"]? Log.debug { "etcd error: #{error}" } - if errorh = error.as_h? - error_msg = errorh["message"].as_s - case error_msg - when "error reading from server: EOF" - raise IO::EOFError.new(error_msg) - when "etcdserver: no leader" - raise NoLeader.new(error_msg) + error_msg = + if errorh = error.as_h? + errorh["message"].as_s else - raise Error.new error_msg + error.as_s end + case error_msg + when "error reading from server: EOF" + raise IO::EOFError.new(error_msg) + when "etcdserver: no leader" + raise NoLeader.new(error_msg) else - raise Error.new error.as_s + raise Error.new error_msg end end end diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index ec1ada68e3..64c6b101f9 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -6,7 +6,6 @@ require "./server" require "./http/http_server" require "./in_memory_backend" require "./data_dir_lock" -require "./etcd" module LavinMQ class Launcher @@ -15,9 +14,8 @@ module LavinMQ @first_shutdown_attempt = true @data_dir_lock : DataDirLock? @closed = false - @leadership : Etcd::Leadership? - def initialize(@config : Config, replicator = Clustering::NoopServer.new, @leadership = nil) + def initialize(@config : Config, replicator = Clustering::NoopServer.new) print_environment_info print_max_map_count fd_limit = System.maximize_fd_limit @@ -38,23 +36,17 @@ module LavinMQ setup_log_exchange end - def run + def start : self listen SystemD.notify_ready + self + end + + def run + start loop do - if leadership = @leadership - if leadership.wait(30.seconds) - Log.fatal { "Lost cluster leadership" } - exit 3 # 3rd character in the alphabet is C(lustering) - else - @data_dir_lock.try &.poll - GC.collect - end - else - sleep 30.seconds - @data_dir_lock.try &.poll - GC.collect - end + sleep 30.seconds + GC.collect end end @@ -66,7 +58,6 @@ module LavinMQ @http_server.close rescue nil @amqp_server.close rescue nil @data_dir_lock.try &.release - @leadership.try &.release end private def print_environment_info From 863f0b5badd15a12a6b20c08f520b92fac93cc13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 13 Dec 2024 21:18:20 +0100 Subject: [PATCH 02/25] always have a read timeout on Follower sockets we want to timeout when waiting for acks, if the follower is unresponsive --- src/lavinmq/clustering/follower.cr | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index 7e72ea6722..1b47a4568d 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -26,7 +26,6 @@ module LavinMQ validate_header! authenticate!(password) @id = @socket.read_bytes Int32, IO::ByteFormat::LittleEndian - @socket.read_timeout = nil @socket.tcp_nodelay = true @socket.read_buffering = false @socket.sync = true # Use buffering in lz4 From ce062096052d8997558db08036f5e74cdfc8d435 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 25 Dec 2024 00:49:18 +0100 Subject: [PATCH 03/25] make it possible to run a local etcd while running specs use custom ports for the specs --- spec/clustering_spec.cr | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index d1859ee6bb..480cb8dd09 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -13,9 +13,12 @@ describe LavinMQ::Clustering::Client do "--log-level=error", "--unsafe-no-fsync=true", "--force-new-cluster=true", + "--listen-peer-urls=http://127.0.0.1:12380", + "--listen-client-urls=http://127.0.0.1:12379", + "--advertise-client-urls=http://127.0.0.1:12379", }, output: STDOUT, error: STDERR) - client = HTTP::Client.new("127.0.0.1", 2379) + client = HTTP::Client.new("127.0.0.1", 12379) i = 0 loop do sleep 0.02.seconds @@ -26,7 +29,7 @@ describe LavinMQ::Clustering::Client do end rescue e : Socket::ConnectError i += 1 - raise "Cant connect to etcd on port 2379. Giving up after 100 tries. (#{e.message})" if i >= 100 + raise "Cant connect to etcd on port 12379. Giving up after 100 tries. (#{e.message})" if i >= 100 next end client.close @@ -40,7 +43,7 @@ describe LavinMQ::Clustering::Client do end it "can stream changes" do - replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) + replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir @@ -73,7 +76,7 @@ describe LavinMQ::Clustering::Client do end it "can stream full file" do - replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0) + replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0) tcp_server = TCPServer.new("localhost", 0) spawn(replicator.listen(tcp_server), name: "repli server spec") config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir @@ -102,6 +105,7 @@ describe LavinMQ::Clustering::Client do it "will failover" do config1 = LavinMQ::Config.new config1.data_dir = "/tmp/failover1" + config1.clustering_etcd_endpoints = "localhost:12379" config1.clustering_advertised_uri = "tcp://localhost:5681" config1.clustering_port = 5681 config1.amqp_port = 5671 @@ -110,6 +114,7 @@ describe LavinMQ::Clustering::Client do config2 = LavinMQ::Config.new config2.data_dir = "/tmp/failover2" + config2.clustering_etcd_endpoints = "localhost:12379" config2.clustering_advertised_uri = "tcp://localhost:5682" config2.clustering_port = 5682 config2.amqp_port = 5672 @@ -118,7 +123,7 @@ describe LavinMQ::Clustering::Client do listen = Channel(String).new spawn(name: "etcd elect leader spec") do - etcd = LavinMQ::Etcd.new + etcd = LavinMQ::Etcd.new("localhost:12379") etcd.elect_listen("lavinmq/leader") do |value| listen.send value end From 4cfec8fa30f7debabe115860af9a6abfc27eceae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 27 Dec 2024 02:11:49 +0100 Subject: [PATCH 04/25] don't fsync in DataDirLock can't see the need --- src/lavinmq/data_dir_lock.cr | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lavinmq/data_dir_lock.cr b/src/lavinmq/data_dir_lock.cr index 2654b0499f..cbd63db05a 100644 --- a/src/lavinmq/data_dir_lock.cr +++ b/src/lavinmq/data_dir_lock.cr @@ -24,12 +24,10 @@ module LavinMQ Log.debug { "Data directory lock aquired" } @lock.truncate @lock.print "PID #{Process.pid} @ #{System.hostname}" - @lock.fsync end def release @lock.truncate - @lock.fsync @lock.flock_unlock end From a33ea1543b0bbc0455566969b004b0d7cb3c747e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Fri, 27 Dec 2024 21:52:45 +0100 Subject: [PATCH 05/25] make federation upstream spec more stable in slow CI --- spec/upstream_spec.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/upstream_spec.cr b/spec/upstream_spec.cr index 35a1b50cb2..9f7e53efa8 100644 --- a/spec/upstream_spec.cr +++ b/spec/upstream_spec.cr @@ -392,7 +392,7 @@ describe LavinMQ::Federation::Upstream do select when ch.receive? - when timeout 100.milliseconds + when timeout 3.seconds fail "federation didn't resume? timeout waiting for message on downstream queue" end From 9cbd5e942dfcd47aaabf901b133a6ddffc748fb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 29 Dec 2024 23:05:07 +0100 Subject: [PATCH 06/25] Pass Config to Server, not just data_dir Config.instance was used heavily in Server --- spec/clustering_spec.cr | 4 ++-- spec/spec_helper.cr | 2 +- spec/storage_spec.cr | 3 ++- spec/vhost_spec.cr | 6 +++--- src/lavinmq/launcher.cr | 2 +- src/lavinmq/server.cr | 33 +++++++++++++++++---------------- 6 files changed, 26 insertions(+), 24 deletions(-) diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index 480cb8dd09..35380dc4fa 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -63,7 +63,7 @@ describe LavinMQ::Clustering::Client do done.receive end - server = LavinMQ::Server.new(follower_data_dir) + server = LavinMQ::Server.new(config) begin q = server.vhosts["/"].queues["repli"].as(LavinMQ::AMQP::DurableQueue) q.message_count.should eq 1 @@ -94,7 +94,7 @@ describe LavinMQ::Clustering::Client do done.receive end - server = LavinMQ::Server.new(follower_data_dir) + server = LavinMQ::Server.new(config) begin server.users["u1"].should_not be_nil ensure diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index f4bf0652e1..f89124c64d 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -74,7 +74,7 @@ end def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.new, & : LavinMQ::Server -> Nil) tcp_server = TCPServer.new("localhost", 0) - s = LavinMQ::Server.new(LavinMQ::Config.instance.data_dir, replicator) + s = LavinMQ::Server.new(LavinMQ::Config.instance, replicator) begin if tls ctx = OpenSSL::SSL::Context::Server.new diff --git a/spec/storage_spec.cr b/spec/storage_spec.cr index fd0ed74e87..89946d9a28 100644 --- a/spec/storage_spec.cr +++ b/spec/storage_spec.cr @@ -11,7 +11,8 @@ describe LavinMQ::AMQP::DurableQueue do end it "should succefully convert queue index" do - server = LavinMQ::Server.new("/tmp/lavinmq-spec-index-v2") + config = LavinMQ::Config.new.tap &.data_dir = "/tmp/lavinmq-spec-index-v2" + server = LavinMQ::Server.new(config) begin q = server.vhosts["/"].queues["queue"].as(LavinMQ::AMQP::DurableQueue) q.basic_get(true) do |env| diff --git a/spec/vhost_spec.cr b/spec/vhost_spec.cr index 81272856e8..03bca2d000 100644 --- a/spec/vhost_spec.cr +++ b/spec/vhost_spec.cr @@ -35,7 +35,7 @@ describe LavinMQ::VHost do end it "should be able to persist durable delayed exchanges when type = x-delayed-message" do - data_dir = "" + config = LavinMQ::Config.new with_amqp_server do |s| # This spec is to verify a fix where a server couldn't start again after a crash if # an delayed exchange had been declared by specifiying the type as "x-delayed-message". @@ -46,11 +46,11 @@ describe LavinMQ::VHost do # Start a new server with the same data dir as `Server` without stopping # `Server` first, because stopping would compact definitions and therefore "rewrite" - data_dir = s.data_dir + config.data_dir = s.data_dir end # the definitions file. This is to simulate a start after a "crash". # If this succeeds we assume it worked...? - LavinMQ::Server.new(data_dir) + LavinMQ::Server.new(config) end it "should be able to persist durable queues" do diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 64c6b101f9..2fb2ea4bd0 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -28,7 +28,7 @@ module LavinMQ if @config.data_dir_lock? @data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire end - @amqp_server = LavinMQ::Server.new(@config.data_dir, replicator) + @amqp_server = LavinMQ::Server.new(@config, replicator) @http_server = LavinMQ::HTTP::Server.new(@amqp_server) @tls_context = create_tls_context if @config.tls_configured? reload_tls_context diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 69b729079d..f381fd0577 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -30,7 +30,8 @@ module LavinMQ @replicator : Clustering::Replicator Log = LavinMQ::Log.for "server" - def initialize(@data_dir : String, @replicator = Clustering::NoopServer.new) + def initialize(@config : Config, @replicator = Clustering::NoopServer.new) + @data_dir = @config.data_dir Dir.mkdir_p @data_dir Schema.migrate(@data_dir, @replicator) @users = UserStore.new(@data_dir, @replicator) @@ -99,17 +100,17 @@ module LavinMQ private def extract_conn_info(client) : ConnectionInfo remote_address = client.remote_address - case Config.instance.tcp_proxy_protocol + case @config.tcp_proxy_protocol when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else # Allow proxy connection from followers - if Config.instance.clustering? && + if @config.clustering? && client.peek[0, 5]? == "PROXY".to_slice && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower ProxyProtocol::V1.parse(client) - elsif Config.instance.clustering? && + elsif @config.clustering? && client.peek[0, 8]? == ProxyProtocol::V2::Signature.to_slice[0, 8] && followers.any? { |f| f.remote_address.address == remote_address.address } # Expect PROXY protocol header if remote address is a follower @@ -130,7 +131,7 @@ module LavinMQ remote_address = client.remote_address set_buffer_size(client) conn_info = - case Config.instance.unix_proxy_protocol + case @config.unix_proxy_protocol when 1 then ProxyProtocol::V1.parse(client) when 2 then ProxyProtocol::V2.parse(client) else ConnectionInfo.local # TODO: use unix socket address, don't fake local @@ -252,21 +253,21 @@ module LavinMQ private def set_socket_options(socket) unless socket.remote_address.loopback? - if keepalive = Config.instance.tcp_keepalive + if keepalive = @config.tcp_keepalive socket.keepalive = true socket.tcp_keepalive_idle = keepalive[0] socket.tcp_keepalive_interval = keepalive[1] socket.tcp_keepalive_count = keepalive[2] end end - socket.tcp_nodelay = true if Config.instance.tcp_nodelay? - Config.instance.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } - Config.instance.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } + socket.tcp_nodelay = true if @config.tcp_nodelay? + @config.tcp_recv_buffer_size.try { |v| socket.recv_buffer_size = v } + @config.tcp_send_buffer_size.try { |v| socket.send_buffer_size = v } end private def set_buffer_size(socket) - if Config.instance.socket_buffer_size.positive? - socket.buffer_size = Config.instance.socket_buffer_size + if @config.socket_buffer_size.positive? + socket.buffer_size = @config.socket_buffer_size socket.sync = false socket.read_buffering = true else @@ -288,8 +289,8 @@ module LavinMQ end def update_system_metrics(statm) - interval = Config.instance.stats_interval.milliseconds.to_i - log_size = Config.instance.stats_log_size + interval = @config.stats_interval.milliseconds.to_i + log_size = @config.stats_log_size rusage = System.resource_usage {% for m in METRICS %} @@ -353,7 +354,7 @@ module LavinMQ end control_flow! - sleep Config.instance.stats_interval.milliseconds + sleep @config.stats_interval.milliseconds end ensure statm.try &.close @@ -435,11 +436,11 @@ module LavinMQ end def disk_full? - @disk_free < 3_i64 * Config.instance.segment_size || @disk_free < Config.instance.free_disk_min + @disk_free < 3_i64 * @config.segment_size || @disk_free < @config.free_disk_min end def disk_usage_over_warning_level? - @disk_free < 6_i64 * Config.instance.segment_size || @disk_free < Config.instance.free_disk_warn + @disk_free < 6_i64 * @config.segment_size || @disk_free < @config.free_disk_warn end def flow(active : Bool) From e7ec5a241c03a810652ce8fea3ec49e48a30a504 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 29 Dec 2024 23:06:52 +0100 Subject: [PATCH 07/25] Move responsibility of Clustering to Launcher If the Launcher receives an Etcd, Launcher creates, and later closes, the ClusteringServer instance. --- spec/clustering_spec.cr | 2 ++ src/lavinmq/clustering/controller.cr | 3 +-- src/lavinmq/launcher.cr | 5 ++++- src/lavinmq/server.cr | 1 - 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/spec/clustering_spec.cr b/spec/clustering_spec.cr index 35380dc4fa..9aff334e84 100644 --- a/spec/clustering_spec.cr +++ b/spec/clustering_spec.cr @@ -73,6 +73,8 @@ describe LavinMQ::Clustering::Client do ensure server.close end + ensure + replicator.try &.close end it "can stream full file" do diff --git a/src/lavinmq/clustering/controller.cr b/src/lavinmq/clustering/controller.cr index d39fd1cd88..1988d33c5c 100644 --- a/src/lavinmq/clustering/controller.cr +++ b/src/lavinmq/clustering/controller.cr @@ -19,8 +19,7 @@ class LavinMQ::Clustering::Controller wait_to_be_insync @lease = lease = @etcd.elect("#{@config.clustering_etcd_prefix}/leader", @advertised_uri) # blocks until becoming leader # TODO: make sure we still are in the ISR set - replicator = Clustering::Server.new(@config, @etcd) - @launcher = Launcher.new(@config, replicator).start + @launcher = Launcher.new(@config, @etcd).start loop do if lease.wait(30.seconds) break if @stopped diff --git a/src/lavinmq/launcher.cr b/src/lavinmq/launcher.cr index 2fb2ea4bd0..51e4671524 100644 --- a/src/lavinmq/launcher.cr +++ b/src/lavinmq/launcher.cr @@ -6,6 +6,7 @@ require "./server" require "./http/http_server" require "./in_memory_backend" require "./data_dir_lock" +require "./etcd" module LavinMQ class Launcher @@ -15,7 +16,7 @@ module LavinMQ @data_dir_lock : DataDirLock? @closed = false - def initialize(@config : Config, replicator = Clustering::NoopServer.new) + def initialize(@config : Config, etcd : Etcd? = nil) print_environment_info print_max_map_count fd_limit = System.maximize_fd_limit @@ -28,6 +29,7 @@ module LavinMQ if @config.data_dir_lock? @data_dir_lock = DataDirLock.new(@config.data_dir).tap &.acquire end + @replicator = replicator = etcd ? Clustering::Server.new(@config, etcd) : Clustering::NoopServer.new @amqp_server = LavinMQ::Server.new(@config, replicator) @http_server = LavinMQ::HTTP::Server.new(@amqp_server) @tls_context = create_tls_context if @config.tls_configured? @@ -58,6 +60,7 @@ module LavinMQ @http_server.close rescue nil @amqp_server.close rescue nil @data_dir_lock.try &.release + @replicator.close end private def print_environment_info diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index f381fd0577..4d9e39cfb4 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -206,7 +206,6 @@ module LavinMQ @listeners.each_key &.close Log.debug { "Closing vhosts" } @vhosts.close - @replicator.close end def add_parameter(parameter : Parameter) From 3c0813cecf501ee6b188ecfadfba306090dcd6d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Sun, 29 Dec 2024 23:10:29 +0100 Subject: [PATCH 08/25] In Actions, get filename via instance variable No need to use the getter --- src/lavinmq/clustering/actions.cr | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/lavinmq/clustering/actions.cr b/src/lavinmq/clustering/actions.cr index 04277fbe2c..2062573343 100644 --- a/src/lavinmq/clustering/actions.cr +++ b/src/lavinmq/clustering/actions.cr @@ -29,11 +29,11 @@ module LavinMQ def lag_size : Int64 if mfile = @mfile - 0i64 + sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + @filename.bytesize + sizeof(Int64) + mfile.size.to_i64 else - 0i64 + sizeof(Int32) + filename.bytesize + - sizeof(Int64) + File.size(File.join(@data_dir, filename)).to_i64 + 0i64 + sizeof(Int32) + @filename.bytesize + + sizeof(Int64) + File.size(File.join(@data_dir, @filename)).to_i64 end end @@ -69,7 +69,7 @@ module LavinMQ in UInt32, Int32 4i64 end - 0i64 + sizeof(Int32) + filename.bytesize + + 0i64 + sizeof(Int32) + @filename.bytesize + sizeof(Int64) + datasize end @@ -100,7 +100,7 @@ module LavinMQ # Maybe it would be ok to not include delete action in lag, because # the follower should have all info necessary to GC the file during # startup? - (sizeof(Int32) + filename.bytesize + sizeof(Int64)).to_i64 + (sizeof(Int32) + @filename.bytesize + sizeof(Int64)).to_i64 end def send(socket, log = Log) : Int64 From 205184a2990431306d25a660b19716ba0b5b0caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Carl=20H=C3=B6rberg?= Date: Wed, 1 Jan 2025 23:16:51 +0100 Subject: [PATCH 09/25] MFile#read should not raise IO::EOFError IO#read reads at most slice.size bytes from the IO into slice. Returns the number of bytes read, which is 0 if and only if there is no more data to read. --- spec/mfile_spec.cr | 17 +++++++++++++++++ src/lavinmq/mfile.cr | 9 ++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/spec/mfile_spec.cr b/spec/mfile_spec.cr index 6bab024658..0fddbecd3b 100644 --- a/spec/mfile_spec.cr +++ b/spec/mfile_spec.cr @@ -12,4 +12,21 @@ describe MFile do file.delete end end + + it "can be read" do + file = File.tempfile "mfile_spec" + file.print "hello world" + file.flush + begin + MFile.open(file.path) do |mfile| + buf = Bytes.new(6) + cnt = mfile.read(buf) + String.new(buf[0, cnt]).should eq "hello " + cnt = mfile.read(buf) + String.new(buf[0, cnt]).should eq "world" + end + ensure + file.delete + end + end end diff --git a/src/lavinmq/mfile.cr b/src/lavinmq/mfile.cr index 98dc429d21..bc2ae90af3 100644 --- a/src/lavinmq/mfile.cr +++ b/src/lavinmq/mfile.cr @@ -185,11 +185,10 @@ class MFile < IO def read(slice : Bytes) pos = @pos - new_pos = pos + slice.size - raise IO::EOFError.new if new_pos > @size - slice.copy_from(buffer + pos, slice.size) - @pos = new_pos - slice.size + len = Math.min(slice.size, @size - pos) + slice.copy_from(buffer + pos, len) + @pos = pos + len + len end def rewind From c7b7e188c6baf6316bcae5db11859482af1d3797 Mon Sep 17 00:00:00 2001 From: Johan Rhodin Date: Thu, 2 Jan 2025 04:23:33 -0600 Subject: [PATCH 10/25] Update README.md (#894) - Remove outdated contributors list (a better version of it is listed by Github) - Add way to contact the people behind LavinMQ --- README.md | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 28cd2ef228..0ab38adc4b 100644 --- a/README.md +++ b/README.md @@ -269,21 +269,6 @@ Stream queues are like append-only logs and can be consumed multiple times. Each Messages are only deleted when `max-length`, `max-length-bytes` or `max-age` are applied, either as queue arguments or as policies. The limits are checked only when new messages are published to the queue, and only act on whole segments (which by default are 8MiB), so the limits aren't necessarily exact. So even if a `max-age` limit is set, but no messages are published to the queue, messages might still be available in the stream queue that is way older that the limit specified. -## Contributors - -- [Carl Hörberg](mailto:carl@84codes.com) -- [Anders Bälter](mailto:anders@84codes.com) -- [Magnus Landerblom](mailto:mange@cloudamqp.com) -- [Magnus Hörberg](mailto:magnus@cloudamqp.com) -- [Johan Eckerström](mailto:johan.e@cloudamqp.com) -- [Anton Dalgren](mailto:anton@cloudamqp.com) -- [Patrik Ragnarsson](mailto:patrik@84codes.com) -- [Oskar Gustafsson](mailto:oskar@84codes.com) -- [Tobias Brodén](mailto:tobias@84codes.com) -- [Christina Dahlén](mailto:christina@84codes.com) -- [Erica Weistrand](mailto:erica@84codes.com) -- [Viktor Erlingsson](mailto:viktor@84codes.com) - ## License The software is licensed under the [Apache License 2.0](LICENSE). @@ -291,3 +276,6 @@ The software is licensed under the [Apache License 2.0](LICENSE). Copyright 2018-2024 84codes AB LavinMQ is a trademark of 84codes AB + +## Contact Us +Do you want to learn more? [Talk with our product experts](https://webforms.pipedrive.com/f/64JnLsqIMAdF2BDQ06ioKLhC2NuNmkwNplNhRxtIqlm0nFnuIeX97eb7fZKej0vFHZ). From 4b56d4009780cc9c3235701ee0bdc05ed5bbdcb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christina=20Dahl=C3=A9n?= <85930202+kickster97@users.noreply.github.com> Date: Thu, 2 Jan 2025 11:27:52 +0100 Subject: [PATCH 11/25] Escape all non-printable characters in invalid protocol log (#896) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ``` Dec 19 06:07:33Z lmq.amqp.connection_factory: "Unexpected protocol '# 003 Dec 19 06:07:33Z *%� Dec 19 06:07:33Z ', closing socket ``` is now printed like: ``` Dec 19 06:07:33Z lmq.amqp.connection_factory: "Unexpected protocol "\x{003}*%\x{000}", closing socket ``` --------- Co-authored-by: Carl Hörberg --- src/lavinmq/amqp/connection_factory.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/connection_factory.cr b/src/lavinmq/amqp/connection_factory.cr index a427ba5eee..a9774da4ae 100644 --- a/src/lavinmq/amqp/connection_factory.cr +++ b/src/lavinmq/amqp/connection_factory.cr @@ -47,7 +47,7 @@ module LavinMQ elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9 socket.write AMQP::PROTOCOL_START_0_9_1.to_slice socket.flush - log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" } + log.warn { "Unexpected protocol #{String.new(proto.to_unsafe, count).inspect}, closing socket" } false else true From b43929793afd6ead9576777ebdcb19fc4f0338a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Eckerstr=C3=B6m?= Date: Thu, 2 Jan 2025 23:17:33 +0100 Subject: [PATCH 12/25] UI: Remove reset vhost button (feature gone) (#899) Reset vhost was removed in 33147bf59d4fe4db33001923282209d1789db434 #822 --- views/vhost.ecr | 6 ------ 1 file changed, 6 deletions(-) diff --git a/views/vhost.ecr b/views/vhost.ecr index 9abc448391..0533d8c4f1 100644 --- a/views/vhost.ecr +++ b/views/vhost.ecr @@ -100,12 +100,6 @@

Danger zone

-
- -
-