Skip to content

Commit

Permalink
[taoensso#244] MQ: Add initial-backoff option to enqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
st3fan committed Nov 24, 2020
1 parent fe83a68 commit 9af62e4
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 30 deletions.
6 changes: 6 additions & 0 deletions src/taoensso/carmine/lua/mq/enqueue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ if state == nil then
redis.call('lpush', _:qk-mid-circle, 'end-of-circle');
end

-- Set the initial backoff if requested
local initial_backoff_ms = tonumber(_:initial-backoff-ms);
if (initial_backoff_ms ~= 0) then
redis.call('hset', _:qk-backoffs, _:mid, now + initial_backoff_ms);
end

redis.call('lpush', _:qk-mid-circle, _:mid);
return {_:mid};
else
Expand Down
29 changes: 15 additions & 14 deletions src/taoensso/carmine/message_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -106,24 +106,25 @@
perform a de-duplication check. If unspecified, a
unique id will be auto-generated.
* allow-requeue? - When true, allow buffered escrow-requeue for a
message in the :locked or :done-with-backoff state."
;; TODO Option to enqueue something with an init backoff?
message in the :locked or :done-with-backoff state.
* initial-backoff-ms - Initial backoff in millis."
(let [script (enc/have (enc/slurp-resource "taoensso/carmine/lua/mq/enqueue.lua"))]
(fn [qname message & [unique-message-id allow-requeue?]]
(fn [qname message & {:keys [unique-message-id allow-requeue? initial-backoff-ms]}]
(car/parse
#(if (vector? %) (get % 0) {:carmine.mq/error (keyword %)})
(car/lua script
{:qk-messages (qkey qname :messages)
:qk-locks (qkey qname :locks)
:qk-backoffs (qkey qname :backoffs)
:qk-nattempts (qkey qname :nattempts)
:qk-mid-circle (qkey qname :mid-circle)
:qk-done (qkey qname :done)
:qk-requeue (qkey qname :requeue)}
{:now (enc/now-udt)
:mid (or unique-message-id (enc/uuid-str))
:mcontent (car/freeze message)
:allow-requeue? (if allow-requeue? "true" "false")})))))
{:qk-messages (qkey qname :messages)
:qk-locks (qkey qname :locks)
:qk-backoffs (qkey qname :backoffs)
:qk-nattempts (qkey qname :nattempts)
:qk-mid-circle (qkey qname :mid-circle)
:qk-done (qkey qname :done)
:qk-requeue (qkey qname :requeue)}
{:now (enc/now-udt)
:mid (or unique-message-id (enc/uuid-str))
:mcontent (car/freeze message)
:allow-requeue? (if allow-requeue? "true" "false")
:initial-backoff-ms (or initial-backoff-ms 0)})))))

(def dequeue
"IMPLEMENTATION DETAIL: Use `worker` instead.
Expand Down
102 changes: 86 additions & 16 deletions test/taoensso/carmine/tests/message_queue.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns taoensso.carmine.tests.message-queue
(:require
[clojure.test :as test :refer [is deftest]]
[clojure.test :as test :refer [is deftest testing]]
[taoensso.carmine :as car :refer [wcar]]
[taoensso.carmine.message-queue :as mq]))

Expand All @@ -25,7 +25,7 @@
(deftest tests-1 ; Basic enqueuing & dequeuing
(is (do (println (str "Running message queue tests")) true))
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))
(is
(let [status (tq-status)
{:keys [messages mid-circle]} status]
Expand All @@ -35,7 +35,7 @@
(is (= :queued (wcar* (mq/message-status tq :mid1))))

;; Dupe
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))

(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))) ; New msg
Expand All @@ -45,7 +45,7 @@
)

(deftest tests-2 ; Handling: success
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))
;; (is (= "eoq-backoff" (wcar* (dequeue* tq))))

;; Handler will *not* run against eoq-backoff/nil reply:
Expand All @@ -62,7 +62,7 @@
(is (= nil (wcar* (mq/message-status tq :mid1)))))

(deftest tests-3 ; Handling: handler crash
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))

;; Simulates bad handler
Expand All @@ -74,7 +74,7 @@
(wcar* (dequeue* tq))))))

(deftest tests-4 ; Handling: retry with backoff
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= {:qname :carmine-test-queue :mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
Expand All @@ -90,7 +90,7 @@
(wcar* (dequeue* tq))))))

(deftest tests-5 ; Handling: success with backoff (dedupe)
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= {:qname :carmine-test-queue :mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
Expand All @@ -103,12 +103,12 @@
(is (= nil (wcar* (dequeue* tq)))) ; Will gc
(is (= :done-with-backoff (wcar* (mq/message-status tq :mid1)))) ; Backoff (< 3s)
(is (= {:carmine.mq/error :done-with-backoff}
(wcar* (mq/enqueue tq :msg1 :mid1)))) ; Dupe
(wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))) ; Dupe
(is (= "mid1" (do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar* (mq/enqueue tq :msg1 :mid1))))))
(wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))))

(deftest test-6 ; Handling: enqueue while :locked
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= :locked
(do (future
Expand All @@ -118,29 +118,99 @@
(wcar* (dequeue* tq))))
(Thread/sleep 50)
(wcar* (mq/message-status tq :mid1)))))
(is (= {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :mid1 :allow-requeue))))
(is (= {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :allow-requeue? true))))
(is (= {:carmine.mq/error :locked-with-requeue}
(wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))))
(wcar* (mq/enqueue tq :msg1-requeued :unique-message-id :mid1 :allow-requeue? true))))
(is (= :queued ; cmp :done-awaiting-gc
(do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!)
(wcar* (mq/message-status tq :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))))

(deftest test-7 ; Handling: enqueue while :done-with-backoff
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))))
(is (= "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= :done-with-backoff
(do (mq/handle1 conn-opts tq
(fn [_] {:status :success :backoff-ms 3000})
(wcar* (dequeue* tq)))
(Thread/sleep 20)
(wcar* (mq/message-status tq :mid1)))))
(is (= {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))))
(is (= {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1-requeued :unique-message-id :mid1 :allow-requeue? true))))
(is (= :queued ; cmp :done-awaiting-gc
(do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar* (mq/message-status tq :mid1)))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq)))))

(deftest test-8 ; Enqueue/dequeue with initial backoff
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100))))
(is
(let [status (tq-status)
{:keys [messages mid-circle]} status]
(and
(= messages {"mid1" :msg1 "mid2" :msg2})
(= mid-circle ["mid2" "mid1" "end-of-circle"]))))
;; Dupes before the backoff expired
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2))))
;; Both should be queued with backoff before the backoff expires
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
;; Move time past second message
(is (do (Thread/sleep 150) true))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2))))
;; Move time past first message
(is (do (Thread/sleep 750) true))
(is (= :queued (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2))))
;; Dupes after the backoff expired
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))
(is (= {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2))))
;; TODO Is the order of retrieval actually predictable?
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= ["mid1" :msg1 1] (wcar* (dequeue* tq))))
(is (= :locked (wcar* (mq/message-status tq :mid1))))
(is (= ["mid2" :msg2 1] (wcar* (dequeue* tq))))
(is (= :locked (wcar* (mq/message-status tq :mid2))))
(is (= "eoq-backoff" (wcar* (dequeue* tq))))
(is (= nil (wcar* (dequeue* tq))))
)

(deftest test-message-queue-with-initial-backoff
(testing "Message status changes over time"
;; Setup
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 150) true)) ;; Move time past second message
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 750) true)) ;; Move time past first message
(is (= :queued (wcar* (mq/message-status tq :mid1))))
(is (= :queued (wcar* (mq/message-status tq :mid2)))))
(testing "Errors when we enqueue with same ids"
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1))))
(is (= {:carmine.mq/error :queued-with-backoff} (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2)))))
(testing "Errors change over time"
(is (= "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))))
(is (= "mid1" (wcar* (mq/enqueue tq :msg1 :unique-message-id :mid1 :initial-backoff-ms 500))))
(is (= "mid2" (wcar* (mq/enqueue tq :msg2 :unique-message-id :mid2 :initial-backoff-ms 100))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid1))))
(is (= :queued-with-backoff (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 150) true)) ;; Move time past second message
(is (= :queued (wcar* (mq/message-status tq :mid2))))
(is (do (Thread/sleep 750) true)) ;; Move time past first message
(is (= :queued (wcar* (mq/message-status tq :mid1))))))

0 comments on commit 9af62e4

Please sign in to comment.