Skip to content

Commit

Permalink
Merge branch 'main' into crc32
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg authored Jan 14, 2025
2 parents e3d3e34 + 543c1a6 commit 999c3a4
Show file tree
Hide file tree
Showing 29 changed files with 347 additions and 311 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ jobs:
steps:
- name: Install dependencies
run: |
brew update
brew install crystal etcd
echo "/opt/homebrew/opt/etcd/bin" >> $GITHUB_PATH
env:
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- New UI for management interface [#821](https://github.com/cloudamqp/lavinmq/pull/821)
- Use sent/received bytes instead of messages to trigger when other tasks can run [#863](https://github.com/cloudamqp/lavinmq/pull/863)
- Spread out stream queues GC-loop over time [#876](https://github.com/cloudamqp/lavinmq/pull/876)
- Don't unmap files on USR2 or when last consumer disconnects [#876](https://github.com/cloudamqp/lavinmq/pull/876)
- Unmap files when they are no longer in use [#876](https://github.com/cloudamqp/lavinmq/pull/876)

### Fixed

Expand All @@ -19,6 +22,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Removed duplicate metric rss_bytes [#881](https://github.com/cloudamqp/lavinmq/pull/881)
- Release leadership on graceful shutdown [#871](https://github.com/cloudamqp/lavinmq/pull/871)
- Rescue more exceptions while reading msg store on startup [#865](https://github.com/cloudamqp/lavinmq/pull/865)
- Crystal 1.15 support [#905](https://github.com/cloudamqp/lavinmq/pull/905)
- lavinmqctl now handles pagination of large result sets [#904](https://github.com/cloudamqp/lavinmq/pull/904)
- Make clustering more reliable [#879](https://github.com/cloudamqp/lavinmq/pull/879)
- Strip newlines from logs [#896](https://github.com/cloudamqp/lavinmq/pull/896)

### Added

Expand Down
18 changes: 3 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,25 +269,13 @@ Stream queues are like append-only logs and can be consumed multiple times. Each

Messages are only deleted when `max-length`, `max-length-bytes` or `max-age` are applied, either as queue arguments or as policies. The limits are checked only when new messages are published to the queue, and only act on whole segments (which by default are 8MiB), so the limits aren't necessarily exact. So even if a `max-age` limit is set, but no messages are published to the queue, messages might still be available in the stream queue that is way older that the limit specified.

## Contributors

- [Carl Hörberg](mailto:carl@84codes.com)
- [Anders Bälter](mailto:anders@84codes.com)
- [Magnus Landerblom](mailto:mange@cloudamqp.com)
- [Magnus Hörberg](mailto:magnus@cloudamqp.com)
- [Johan Eckerström](mailto:johan.e@cloudamqp.com)
- [Anton Dalgren](mailto:anton@cloudamqp.com)
- [Patrik Ragnarsson](mailto:patrik@84codes.com)
- [Oskar Gustafsson](mailto:oskar@84codes.com)
- [Tobias Brodén](mailto:tobias@84codes.com)
- [Christina Dahlén](mailto:christina@84codes.com)
- [Erica Weistrand](mailto:erica@84codes.com)
- [Viktor Erlingsson](mailto:viktor@84codes.com)

## License

The software is licensed under the [Apache License 2.0](LICENSE).

Copyright 2018-2024 84codes AB

LavinMQ is a trademark of 84codes AB

## Contact Us
Do you want to learn more? [Talk with our product experts](https://webforms.pipedrive.com/f/64JnLsqIMAdF2BDQ06ioKLhC2NuNmkwNplNhRxtIqlm0nFnuIeX97eb7fZKej0vFHZ).
21 changes: 14 additions & 7 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@ describe LavinMQ::Clustering::Client do
"--log-level=error",
"--unsafe-no-fsync=true",
"--force-new-cluster=true",
"--listen-peer-urls=http://127.0.0.1:12380",
"--listen-client-urls=http://127.0.0.1:12379",
"--advertise-client-urls=http://127.0.0.1:12379",
}, output: STDOUT, error: STDERR)

client = HTTP::Client.new("127.0.0.1", 2379)
client = HTTP::Client.new("127.0.0.1", 12379)
i = 0
loop do
sleep 0.02.seconds
Expand All @@ -26,7 +29,7 @@ describe LavinMQ::Clustering::Client do
end
rescue e : Socket::ConnectError
i += 1
raise "Cant connect to etcd on port 2379. Giving up after 100 tries. (#{e.message})" if i >= 100
raise "Cant connect to etcd on port 12379. Giving up after 100 tries. (#{e.message})" if i >= 100
next
end
client.close
Expand All @@ -40,7 +43,7 @@ describe LavinMQ::Clustering::Client do
end

it "can stream changes" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
Expand All @@ -60,7 +63,7 @@ describe LavinMQ::Clustering::Client do
done.receive
end

server = LavinMQ::Server.new(follower_data_dir)
server = LavinMQ::Server.new(config)
begin
q = server.vhosts["/"].queues["repli"].as(LavinMQ::AMQP::DurableQueue)
q.message_count.should eq 1
Expand All @@ -70,10 +73,12 @@ describe LavinMQ::Clustering::Client do
ensure
server.close
end
ensure
replicator.try &.close
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new("localhost:12379"), 0)
tcp_server = TCPServer.new("localhost", 0)
spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
Expand All @@ -91,7 +96,7 @@ describe LavinMQ::Clustering::Client do
done.receive
end

server = LavinMQ::Server.new(follower_data_dir)
server = LavinMQ::Server.new(config)
begin
server.users["u1"].should_not be_nil
ensure
Expand All @@ -102,6 +107,7 @@ describe LavinMQ::Clustering::Client do
it "will failover" do
config1 = LavinMQ::Config.new
config1.data_dir = "/tmp/failover1"
config1.clustering_etcd_endpoints = "localhost:12379"
config1.clustering_advertised_uri = "tcp://localhost:5681"
config1.clustering_port = 5681
config1.amqp_port = 5671
Expand All @@ -110,6 +116,7 @@ describe LavinMQ::Clustering::Client do

config2 = LavinMQ::Config.new
config2.data_dir = "/tmp/failover2"
config2.clustering_etcd_endpoints = "localhost:12379"
config2.clustering_advertised_uri = "tcp://localhost:5682"
config2.clustering_port = 5682
config2.amqp_port = 5672
Expand All @@ -118,7 +125,7 @@ describe LavinMQ::Clustering::Client do

listen = Channel(String).new
spawn(name: "etcd elect leader spec") do
etcd = LavinMQ::Etcd.new
etcd = LavinMQ::Etcd.new("localhost:12379")
etcd.elect_listen("lavinmq/leader") do |value|
listen.send value
end
Expand Down
1 change: 1 addition & 0 deletions spec/etcd_spec.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
require "spec"
require "../src/lavinmq/etcd"
require "file_utils"
require "http/client"

describe LavinMQ::Etcd do
it "can put and get" do
Expand Down
17 changes: 17 additions & 0 deletions spec/mfile_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,21 @@ describe MFile do
file.delete
end
end

it "can be read" do
file = File.tempfile "mfile_spec"
file.print "hello world"
file.flush
begin
MFile.open(file.path) do |mfile|
buf = Bytes.new(6)
cnt = mfile.read(buf)
String.new(buf[0, cnt]).should eq "hello "
cnt = mfile.read(buf)
String.new(buf[0, cnt]).should eq "world"
end
ensure
file.delete
end
end
end
24 changes: 14 additions & 10 deletions spec/schema_version_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,30 @@ describe LavinMQ::SchemaVersion do
it "Empty file should raise IO::EOFError" do
with_datadir do |data_dir|
path = File.join(data_dir, "test_schema_version")
file = MFile.new(path, 12)
expect_raises(IO::EOFError) do
LavinMQ::SchemaVersion.verify(file, :message)
MFile.open(path, 12) do |file|
expect_raises(IO::EOFError) do
LavinMQ::SchemaVersion.verify(file, :message)
end
end
end
end

it "Should verify schema version" do
with_datadir do |data_dir|
path = File.join(data_dir, "test_schema_version")
file = MFile.new(path, 12)
file.write_bytes LavinMQ::Schema::VERSION
LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message]
MFile.open(path, 12) do |file|
file.write_bytes LavinMQ::Schema::VERSION
LavinMQ::SchemaVersion.verify(file, :message).should eq LavinMQ::SchemaVersion::VERSIONS[:message]
end
end
end

it "Deletes empty file and creates a new when it is the first file" do
with_datadir do |data_dir|
path = File.join(data_dir, "msgs.0000000001")
file = MFile.new(path, LavinMQ::Config.instance.segment_size)
file.resize(LavinMQ::Config.instance.segment_size)
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil)
msg_store.@segments.first_value.size.should eq 4
Expand All @@ -39,8 +42,9 @@ describe LavinMQ::SchemaVersion do
v.declare_queue("q", true, false)
data_dir = s.vhosts["/"].queues["q"].as(LavinMQ::AMQP::Queue).@msg_store.@queue_data_dir
path = File.join(data_dir, "msgs.0000000002")
file = MFile.new(path, LavinMQ::Config.instance.segment_size)
file.resize(LavinMQ::Config.instance.segment_size)
MFile.open(path, LavinMQ::Config.instance.segment_size) do |file|
file.resize(LavinMQ::Config.instance.segment_size)
end
# init new message store
msg_store = LavinMQ::Queue::MessageStore.new(data_dir, nil)
msg_store.@segments.size.should eq 1
Expand Down
2 changes: 1 addition & 1 deletion spec/spec_helper.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ end

def with_amqp_server(tls = false, replicator = LavinMQ::Clustering::NoopServer.new, & : LavinMQ::Server -> Nil)
tcp_server = TCPServer.new("localhost", 0)
s = LavinMQ::Server.new(LavinMQ::Config.instance.data_dir, replicator)
s = LavinMQ::Server.new(LavinMQ::Config.instance, replicator)
begin
if tls
ctx = OpenSSL::SSL::Context::Server.new
Expand Down
5 changes: 3 additions & 2 deletions spec/storage_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ describe LavinMQ::AMQP::DurableQueue do
end

it "should succefully convert queue index" do
server = LavinMQ::Server.new("/tmp/lavinmq-spec-index-v2")
config = LavinMQ::Config.new.tap &.data_dir = "/tmp/lavinmq-spec-index-v2"
server = LavinMQ::Server.new(config)
begin
q = server.vhosts["/"].queues["queue"].as(LavinMQ::AMQP::DurableQueue)
q.basic_get(true) do |env|
Expand Down Expand Up @@ -176,7 +177,7 @@ describe LavinMQ::VHost do
overhead = 21
body = Bytes.new(msg_size)

segments = ->{ Dir.new(vhost.data_dir).children.select!(/^msgs\./) }
segments = -> { Dir.new(vhost.data_dir).children.select!(/^msgs\./) }

size_of_current_segment = File.size(File.join(vhost.data_dir, segments.call.last))

Expand Down
2 changes: 1 addition & 1 deletion spec/upstream_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ describe LavinMQ::Federation::Upstream do

select
when ch.receive?
when timeout 100.milliseconds
when timeout 3.seconds
fail "federation didn't resume? timeout waiting for message on downstream queue"
end

Expand Down
6 changes: 3 additions & 3 deletions spec/vhost_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe LavinMQ::VHost do
end

it "should be able to persist durable delayed exchanges when type = x-delayed-message" do
data_dir = ""
config = LavinMQ::Config.new
with_amqp_server do |s|
# This spec is to verify a fix where a server couldn't start again after a crash if
# an delayed exchange had been declared by specifiying the type as "x-delayed-message".
Expand All @@ -46,11 +46,11 @@ describe LavinMQ::VHost do

# Start a new server with the same data dir as `Server` without stopping
# `Server` first, because stopping would compact definitions and therefore "rewrite"
data_dir = s.data_dir
config.data_dir = s.data_dir
end
# the definitions file. This is to simulate a start after a "crash".
# If this succeeds we assume it worked...?
LavinMQ::Server.new(data_dir)
LavinMQ::Server.new(config)
end

it "should be able to persist durable queues" do
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/amqp/connection_factory.cr
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ module LavinMQ
elsif proto != AMQP::PROTOCOL_START_0_9_1 && proto != AMQP::PROTOCOL_START_0_9
socket.write AMQP::PROTOCOL_START_0_9_1.to_slice
socket.flush
log.warn { "Unexpected protocol '#{String.new(proto.to_slice)}', closing socket" }
log.warn { "Unexpected protocol #{String.new(proto.to_unsafe, count).inspect}, closing socket" }
false
else
true
Expand Down
8 changes: 2 additions & 6 deletions src/lavinmq/amqp/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ module LavinMQ
bytesize = BytesMessage.skip(mfile)
count += 1
next if deleted?(seg, pos)
update_stats_per_msg(seg, ts, bytesize)
@bytesize += bytesize
@size += 1
rescue ex : IO::EOFError
break
rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode
Expand All @@ -402,11 +403,6 @@ module LavinMQ
@log.info { "Loaded #{counter} segments, #{@size} messages" }
end

private def update_stats_per_msg(seg, ts, bytesize)
@bytesize += bytesize
@size += 1
end

private def delete_unused_segments : Nil
current_seg = @segments.last_key
@segments.reject! do |seg, mfile|
Expand Down
3 changes: 0 additions & 3 deletions src/lavinmq/amqp/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -854,9 +854,6 @@ module LavinMQ::AMQP
delete
else
notify_consumers_empty(true)
@msg_store_lock.synchronize do
@msg_store.unmap_segments
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions src/lavinmq/amqp/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ module LavinMQ::AMQP
end

private def unmap_and_remove_segments_loop
sleep rand(60).seconds
until closed?
sleep 60.seconds
unmap_and_remove_segments
Expand Down
Loading

0 comments on commit 999c3a4

Please sign in to comment.