bigqueue
provides embedded, fast and persistent queue written in pure Go using
memory mapped (mmap
) files. bigqueue
is now thread safe as well.
go get github.com/grandecola/bigqueue
- Only works for
linux
anddarwin
OS - Only works on Little Endian architecture
Create or open a bigqueue:
bq, err := bigqueue.NewMmapQueue("path/to/queue")
defer bq.Close()
bigqueue persists the data of the queue in multiple Arenas. Each Arena is a file on disk that is mapped into memory (RAM) using mmap syscall. Default size of each Arena is set to 128MB. It is possible to create a bigqueue with custom Arena size:
bq, err := bigqueue.NewMmapQueue("path/to/queue", bigqueue.SetArenaSize(4*1024))
defer bq.Close()
Bigqueue also allows setting up the maximum possible memory that it can use. By default, the maximum memory is set to [3 x Arena Size].
bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetArenaSize(4*1024),
bigqueue.SetMaxInMemArenas(10))
defer bq.Close()
In this case, bigqueue will never allocate more memory than 4KB*10=40KB
. This
memory is above and beyond the memory used in buffers for copying data.
Bigqueue allows to set periodic flush based on either elapsed time or number of mutate (enqueue/dequeue) operations. Flush syncs the in memory changes of all memory mapped files with disk. This is a best effort flush.
This is how we can set these options:
bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushOps(2))
In this case, a flush is done after every two mutate operations.
bq, err := bigqueue.NewQueue("path/to/queue", bigqueue.SetPeriodicFlushDuration(time.Minute))
In this case, a flush is done after one minute elapses and an Enqueue/Dequeue is called.
Write to bigqueue:
err := bq.Enqueue([]byte("elem"))
bigqueue allows writing string data directly, avoiding conversion to []byte
:
err := bq.EnqueueString("elem")
Read from bigqueue:
elem, err := bq.Dequeue()
we can also read string data from bigqueue:
elem, err := bq.DequeueString()
Check whether bigqueue has non zero elements:
isEmpty := bq.IsEmpty()
bigqueue allows reading data from bigqueue using consumers similar to Kafka. This allows multiple consumers from reading data at different offsets (not in thread safe manner yet). The offsets of each consumer are persisted on disk and can be retrieved by creating a consumer with the same name. Data will be read from the same offset where it was left off.
We can create a new consumer as follows. The offsets of a new consumer are set at the start of the queue wherever the first non-deleted element is.
consumer, err := bq.NewConsumer("consumer")
We can also copy an existing consumer. This will create a consumer that will have the same offsets into the queue as that of the existing consumer.
copyConsumer, err := bq.FromConsumer("copyConsumer", consumer)
Now, read operations can be performed on the consumer:
isEmpty := consumer.IsEmpty()
elem, err := consumer.Dequeue()
elem, err := consumer.DequeueString()
goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
Go version: 1.16
BenchmarkNewMmapQueue/ArenaSize-4KB-12 312 3762844 ns/op 2506 B/op 36 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128KB-12 310 3897333 ns/op 2506 B/op 36 allocs/op
BenchmarkNewMmapQueue/ArenaSize-4MB-12 301 4033893 ns/op 2504 B/op 36 allocs/op
BenchmarkNewMmapQueue/ArenaSize-128MB-12 309 3954329 ns/op 2504 B/op 36 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12 1021855 1172 ns/op 15 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12 999122 1178 ns/op 15 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12 1000000 1027 ns/op 20 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12 258444 4602 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12 246780 4610 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12 271261 4118 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12 10000 108440 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12 10000 108159 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12 10000 104991 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12 330 3619772 ns/op 13 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12 339 3502254 ns/op 13 B/op 0 allocs/op
BenchmarkEnqueue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12 336 3478795 ns/op 13 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12 843966 1186 ns/op 15 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12 1000000 1180 ns/op 15 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12 1000000 1026 ns/op 15 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12 257824 4642 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12 256230 4621 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12 266560 4101 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12 10000 107929 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12 10000 107948 ns/op 14 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12 11434 103482 ns/op 13 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12 333 3650641 ns/op 13 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12 339 3559835 ns/op 13 B/op 0 allocs/op
BenchmarkEnqueueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12 334 3546090 ns/op 13 B/op 0 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12 1000000 3201 ns/op 142 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12 1000000 3187 ns/op 142 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12 6737412 174.0 ns/op 128 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12 502522 3478 ns/op 4109 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12 516555 3509 ns/op 4109 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12 1000000 1156 ns/op 4096 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12 29844 39677 ns/op 131085 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12 30626 39388 ns/op 131085 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12 45805 26247 ns/op 131072 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12 1005 1241554 ns/op 4194316 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12 1257 1164477 ns/op 4194314 B/op 1 allocs/op
BenchmarkDequeue/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12 1260 884842 ns/op 4194304 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-12KB-12 1000000 3200 ns/op 142 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-40KB-12 1000000 3206 ns/op 142 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-4KB/MessageSize-128B/MaxMem-NoLimit-12 6239718 188.8 ns/op 128 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-384KB-12 501561 3511 ns/op 4109 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-1.25MB-12 507860 3535 ns/op 4109 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-128KB/MessageSize-4KB/MaxMem-NoLimit-12 1000000 1236 ns/op 4096 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-12MB-12 29692 39532 ns/op 131085 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-40MB-12 30268 39709 ns/op 131085 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-4MB/MessageSize-128KB/MaxMem-NoLimit-12 46911 25956 ns/op 131072 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-256MB-12 968 1254574 ns/op 4194316 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-1.25GB-12 1429 1175763 ns/op 4194314 B/op 1 allocs/op
BenchmarkDequeueString/ArenaSize-128MB/MessageSize-4MB/MaxMem-NoLimit-12 1364 865977 ns/op 4194304 B/op 1 allocs/op$$
Note: Before running benchmarks ulimit
and vm.max_map_count
parameters should be adjusted using below commands:
ulimit -n 50000
echo 262144 > /proc/sys/vm/max_map_count