diff --git a/src/taoensso/carmine/lua/mq/enqueue.lua b/src/taoensso/carmine/lua/mq/enqueue.lua index ec488f2c..998411ef 100644 --- a/src/taoensso/carmine/lua/mq/enqueue.lua +++ b/src/taoensso/carmine/lua/mq/enqueue.lua @@ -57,6 +57,12 @@ if state == nil then redis.call('lpush', _:qk-mid-circle, 'end-of-circle'); end + -- Set the initial backoff if requested + local initial_backoff_ms = tonumber(_:initial-backoff-ms); + if (initial_backoff_ms ~= 0) then + redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms); + end + redis.call('lpush', _:qk-mid-circle, _:mid); return {_:mid}; else diff --git a/src/taoensso/carmine/message_queue.clj b/src/taoensso/carmine/message_queue.clj index 7d41b11a..4d443824 100644 --- a/src/taoensso/carmine/message_queue.clj +++ b/src/taoensso/carmine/message_queue.clj @@ -106,24 +106,25 @@ perform a de-duplication check. If unspecified, a unique id will be auto-generated. * allow-requeue? - When true, allow buffered escrow-requeue for a - message in the :locked or :done-with-backoff state." - ;; TODO Option to enqueue something with an init backoff? + message in the :locked or :done-with-backoff state. + * initial-backoff-ms - Initial backoff in millis." (let [script (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/enqueue.lua"))] - (fn [qname message & [unique-message-id allow-requeue?]] + (fn [qname message & {:keys [unique-message-id allow-requeue? initial-backoff-ms]}] (car/parse #(if (vector? %) (get % 0) {:carmine.mq/error (keyword %)}) (car/lua script - {:qk-messages (qkey qname :messages) - :qk-locks (qkey qname :locks) - :qk-backoffs (qkey qname :backoffs) - :qk-nattempts (qkey qname :nattempts) - :qk-mid-circle (qkey qname :mid-circle) - :qk-done (qkey qname :done) - :qk-requeue (qkey qname :requeue)} - {:now (enc/now-udt) - :mid (or unique-message-id (enc/uuid-str)) - :mcontent (car/freeze message) - :allow-requeue? (if allow-requeue? "true" "false")}))))) + {:qk-messages (qkey qname :messages) + :qk-locks (qkey qname :locks) + :qk-backoffs (qkey qname :backoffs) + :qk-nattempts (qkey qname :nattempts) + :qk-mid-circle (qkey qname :mid-circle) + :qk-done (qkey qname :done) + :qk-requeue (qkey qname :requeue)} + {:now (enc/now-udt) + :mid (or unique-message-id (enc/uuid-str)) + :mcontent (car/freeze message) + :allow-requeue? (if allow-requeue? "true" "false") + :initial-backoff-ms (or initial-backoff-ms 0)}))))) (def dequeue "IMPLEMENTATION DETAIL: Use `worker` instead. diff --git a/test/taoensso/carmine/tests/message_queue.clj b/test/taoensso/carmine/tests/message_queue.clj index 0487060e..3df6efd6 100644 --- a/test/taoensso/carmine/tests/message_queue.clj +++ b/test/taoensso/carmine/tests/message_queue.clj @@ -1,6 +1,6 @@ (ns taoensso.carmine.tests.message-queue (:require - [clojure.test :as test :refer [is deftest]] + [clojure.test :as test :refer [is deftest testing]] [taoensso.carmine :as car :refer [wcar]] [taoensso.carmine.message-queue :as mq])) @@ -25,7 +25,7 @@ (deftest tests-1 ; Basic enqueuing & dequeuing (is (do (println (str "Running message queue tests")) true)) (is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq))))) - (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :mid1)))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) (is (let [status (tq-status) {:keys [messages mid-circle]} status] @@ -35,7 +35,7 @@ (is (= :queued (wcar* (mq/message-status tq :mid1)))) ;; Dupe - (is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :mid1)))) + (is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) (is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))) ; New msg @@ -45,7 +45,7 @@ ) (deftest tests-2 ; Handling: success - (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))) + (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))) ;; (is (= "eoq-backoff" (wcar* (dequeue* tq)))) ;; Handler will *not* run against eoq-backoff/nil reply: @@ -62,7 +62,7 @@ (is (= nil (wcar* (mq/message-status tq :mid1))))) (deftest tests-3 ; Handling: handler crash - (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))) + (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) ;; Simulates bad handler @@ -74,7 +74,7 @@ (wcar* (dequeue* tq)))))) (deftest tests-4 ; Handling: retry with backoff - (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))) + (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) (is (= {:qname :carmine-test-queue :mid "mid1" :message :msg1, :attempt 1} (let [p (promise)] @@ -90,7 +90,7 @@ (wcar* (dequeue* tq)))))) (deftest tests-5 ; Handling: success with backoff (dedupe) - (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))) + (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) (is (= {:qname :carmine-test-queue :mid "mid1" :message :msg1, :attempt 1} (let [p (promise)] @@ -103,12 +103,12 @@ (is (= nil (wcar* (dequeue* tq)))) ; Will gc (is (= :done-with-backoff (wcar* (mq/message-status tq :mid1)))) ; Backoff (< 3s) (is (= {:carmine.mq/error :done-with-backoff} - (wcar* (mq/enqueue tq :msg1 :mid1)))) ; Dupe + (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) ; Dupe (is (= "mid1" (do (Thread/sleep 3000) ; Wait for backoff to expire - (wcar* (mq/enqueue tq :msg1 :mid1)))))) + (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))) (deftest test-6 ; Handling: enqueue while :locked - (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))) + (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) (is (= :locked (do (future @@ -118,10 +118,10 @@ (wcar* (dequeue* tq)))) (Thread/sleep 50) (wcar* (mq/message-status tq :mid1))))) - (is (= {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :mid1)))) - (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :mid1 :allow-requeue)))) + (is (= {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :allow-requeue? true)))) (is (= {:carmine.mq/error :locked-with-requeue} - (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue)))) + (wcar* (mq/enqueue tq :msg1-requeued :unique-message-id :mid1 :allow-requeue? true)))) (is (= :queued ; cmp :done-awaiting-gc (do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!) (wcar* (mq/message-status tq :mid1))))) @@ -129,7 +129,7 @@ (is (= ["mid1" :msg1 1] (wcar* (dequeue* tq))))) (deftest test-7 ; Handling: enqueue while :done-with-backoff - (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))) + (is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) (is (= :done-with-backoff (do (mq/handle1 conn-opts tq @@ -137,10 +137,80 @@ (wcar* (dequeue* tq))) (Thread/sleep 20) (wcar* (mq/message-status tq :mid1))))) - (is (= {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :mid1)))) - (is (= "mid1" (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue)))) + (is (= {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1-requeued :unique-message-id :mid1 :allow-requeue? true)))) (is (= :queued ; cmp :done-awaiting-gc (do (Thread/sleep 3000) ; Wait for backoff to expire (wcar* (mq/message-status tq :mid1))))) (is (= "eoq-backoff" (wcar* (dequeue* tq)))) (is (= ["mid1" :msg1 1] (wcar* (dequeue* tq))))) + +(deftest test-8 ; Enqueue/dequeue with initial backoff + (is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq))))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500)))) + (is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100)))) + (is + (let [status (tq-status) + {:keys [messages mid-circle]} status] + (and + (= messages {"mid1" :msg1 "mid2" :msg2}) + (= mid-circle ["mid2" "mid1" "end-of-circle"])))) + ;; Dupes before the backoff expired + (is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) + (is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2)))) + ;; Both should be queued with backoff before the backoff expires + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2)))) + ;; Move time past second message + (is (do (Thread/sleep 150) true)) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1)))) + (is (= :queued (wcar* (mq/message-status tq :mid2)))) + ;; Move time past first message + (is (do (Thread/sleep 750) true)) + (is (= :queued (wcar* (mq/message-status tq :mid1)))) + (is (= :queued (wcar* (mq/message-status tq :mid2)))) + ;; Dupes after the backoff expired + (is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) + (is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2)))) + ;; TODO Is the order of retrieval actually predictable? + (is (= "eoq-backoff" (wcar* (dequeue* tq)))) + (is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))) + (is (= :locked (wcar* (mq/message-status tq :mid1)))) + (is (= ["mid2" :msg2 1] (wcar* (dequeue* tq)))) + (is (= :locked (wcar* (mq/message-status tq :mid2)))) + (is (= "eoq-backoff" (wcar* (dequeue* tq)))) + (is (= nil (wcar* (dequeue* tq)))) + ) + +(deftest test-message-queue-with-initial-backoff + (testing "Message status changes over time" + ;; Setup + (is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq))))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500)))) + (is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2)))) + (is (do (Thread/sleep 150) true)) ;; Move time past second message + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1)))) + (is (= :queued (wcar* (mq/message-status tq :mid2)))) + (is (do (Thread/sleep 750) true)) ;; Move time past first message + (is (= :queued (wcar* (mq/message-status tq :mid1)))) + (is (= :queued (wcar* (mq/message-status tq :mid2))))) + (testing "Errors when we enqueue with same ids" + (is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq))))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500)))) + (is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2)))) + (is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) + (is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2))))) + (testing "Errors change over time" + (is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq))))) + (is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500)))) + (is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1)))) + (is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2)))) + (is (do (Thread/sleep 150) true)) ;; Move time past second message + (is (= :queued (wcar* (mq/message-status tq :mid2)))) + (is (do (Thread/sleep 750) true)) ;; Move time past first message + (is (= :queued (wcar* (mq/message-status tq :mid1))))))