From e0d575410335bf9e125fc58e196fdcbe0c790457 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Tue, 2 Jul 2024 10:18:27 -0300 Subject: [PATCH 01/15] refactor: improve `queue.clj` --- .gitignore | 1 + deps.edn | 9 +- src/com/moclojer/rq/queue.clj | 162 ++++++++++++++++++++++++---------- src/com/moclojer/rq/utils.clj | 32 +++++++ 4 files changed, 154 insertions(+), 50 deletions(-) create mode 100644 src/com/moclojer/rq/utils.clj diff --git a/.gitignore b/.gitignore index 0758b35..d08494d 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ output.calva-repl *~ /.clj-kondo .envrc +/docker diff --git a/deps.edn b/deps.edn index 72c040b..aa54274 100644 --- a/deps.edn +++ b/deps.edn @@ -1,6 +1,7 @@ {:paths ["src"] :deps - {redis.clients/jedis {:mvn/version "5.1.2"}} + {redis.clients/jedis {:mvn/version "5.1.2"} + org.clojure/core.async {:mvn/version "1.6.681"}} :aliases {;; clj -A:dev -m com.moclojer.rq @@ -43,6 +44,6 @@ :sign-releases? false :pom-file "target/classes/META-INF/maven/com.moclojer/rq/pom.xml" :artifact "target/com.moclojer.rq.jar"}}}} - - - + + + diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index 47ad6eb..fa59467 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -1,48 +1,118 @@ -(ns com.moclojer.rq.queue) - -(defn pattern-name - "Return the pattern name for the queue, which is the name prefixed with `rq:`" - [name] - (str "rq:" name)) - -;; queue:push -;; (queue/producer client queue-name {:foo "bar"} :at ... :in ... :retry 3 :retry-delay 10) -(defn producer - "Push a job to the queue" - [redis-client queue-name message & {:keys [direction at in retry retry-delay] - :or {direction "l" retry 3 retry-delay 10}}] - (println at in retry retry-delay) - (let [qname (pattern-name queue-name) - msg (into-array [(pr-str message)])] - (if (= direction "r") - (.rpush @redis-client qname msg) - (.lpush @redis-client qname msg)))) - -(defn consumer-size - "get size of the queue" - [redis-client queue-name] - (.llen @redis-client (pattern-name queue-name))) - -;; queue:process and pop -;; (queue/consumer client queue-name (fn [job] (println job)) -(defn consumer - "Consume a message from the queue, blocking in loop until a message is available." - [redis-client queue-name consumer-fn & {:keys [size loop-sleep direction] - :or {direction "l" size -1 loop-sleep 0}}] - (let [qname (pattern-name queue-name)] - (loop [] - (Thread/sleep loop-sleep) - (flush) - (let [msg (if (= direction "r") - (.rrange @redis-client qname 0 size) - (.lrange @redis-client qname 0 size))] - (when msg - (consumer-fn msg) - (println :size (consumer-size redis-client qname)) - (if (= direction "r") - (.rpop @redis-client qname) - (.lpop @redis-client qname)) - (System/exit 0) - (recur)))))) +(ns com.moclojer.rq.queue + (:require + [clojure.core.async :as async] + [clojure.edn :as edn] + [com.moclojer.rq.utils :as utils])) +(defn push! + [client queue-name message & options] + ;; NOTE: maybe use clojure.tools.logging? this way the enduser chooses which + ;; logging mechanism to use by themselves. + (let [{:keys [direction pattern _at _in _retry _retry-delay] + :or {direction :l + pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name) + encoded-message (into-array [(pr-str message)])] + (if (= direction :l) + (.lpush @client packed-queue-name encoded-message) + (.rpush @client packed-queue-name encoded-message)))) +(comment + (import redis.clients.jedis.JedisPooled) + + (let [client (atom (JedisPooled. "redis://localhost:6379"))] + (push! client "my-queue" {:hello true} :direction :l) + (.close @client)) + ;; + ) + +(defn pop! + [client queue-name & options] + (let [{:keys [direction pattern] + :or {direction :l + pattern :rq}} options + packed-queue-name (utils/pack-pattern pattern queue-name) + message (if (= direction :l) + (.lpop @client packed-queue-name) + (.rpop @client packed-queue-name))] + (edn/read-string message))) + +(comment + (import redis.clients.jedis.JedisPooled) + + (let [client (atom (JedisPooled. "redis://localhost:6379"))] + (push! client "my-queue" {:hello true} :direction :r) + (let [popped-val (pop! client "my-queue" :direction :r)] + (.close @client) + popped-val)) + ;; => {:hello true} + ) + +(defn llen + "get size of a queue" + [client queue-name & options] + (let [{:keys [pattern] + :or {pattern :rq}} options] + (.llen @client (utils/pack-pattern pattern queue-name)))) + +(comment + (import redis.clients.jedis.JedisPooled) + + (let [client (atom (JedisPooled. "redis://localhost:6379"))] + (push! client "my-queue" {:hello 1} :direction :r) + (push! client "my-queue" {:hello 2} :direction :r) + (push! client "my-queue" {:hello 3} :direction :r) + (let [queue-length (llen client "my-queue")] + (pop! client "my-queue" :direction :r) + (pop! client "my-queue" :direction :r) + (pop! client "my-queue" :direction :r) + (.close @client) + queue-length)) + ;; => 3 + ) + +(defn wait-and-consume! + "returns the channel, which can be closed with async/close!" + [client queue-name consume-fn & options] + (let [{:keys [direction pattern sleep-time buffer-size] + :or {direction :l + pattern :rq + sleep-time 200 + buffer-size 1024}} options + chan (async/chan (async/sliding-buffer buffer-size))] + + ;; wait and retrieve messages + (async/go-loop [] + (when (.isConnected @client) + (if-let [?message (pop! client queue-name + :direction direction + :pattern pattern)] + (do + (Thread/sleep sleep-time) + (when (async/>! chan ?message) + (recur))) + (recur)))) + + ;; now consume them + (async/go-loop [] + (when-let [?message (async/str + [pattern] + (let [patterns {:rq "rq:" + :pending "rq:pending:"}] + (or (get-in patterns [pattern]) + (throw (ex-info (str "No pattern named " pattern) + {:cause :illegal-argument + :value pattern + :expected (keys patterns)}))))) + +(defn pack-pattern + [pattern queue-name] + (str (pattern->str pattern) queue-name)) + +(defn unpack-pattern + [pattern queue-name] + (apply str (-> (pattern->str pattern) + (count) + (drop queue-name)))) + +(comment + (pack-pattern :rq "my-queue") + ;; => "rq:my-queue" + + (pack-pattern :pending "foobar") + ;; => "rq:pendingfoobar" + + (unpack-pattern :pending "rq:pending:foobar") + ;; => "foobar" + ) From 02d3991ad0b29ce8f11db6d45bf78c956c32bb59 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Tue, 2 Jul 2024 14:25:06 -0300 Subject: [PATCH 02/15] fix: rm consumer `:wontfix` --- src/com/moclojer/rq/queue.clj | 47 ----------------------------------- 1 file changed, 47 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index fa59467..8875d62 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -1,6 +1,5 @@ (ns com.moclojer.rq.queue (:require - [clojure.core.async :as async] [clojure.edn :as edn] [com.moclojer.rq.utils :as utils])) @@ -70,49 +69,3 @@ queue-length)) ;; => 3 ) - -(defn wait-and-consume! - "returns the channel, which can be closed with async/close!" - [client queue-name consume-fn & options] - (let [{:keys [direction pattern sleep-time buffer-size] - :or {direction :l - pattern :rq - sleep-time 200 - buffer-size 1024}} options - chan (async/chan (async/sliding-buffer buffer-size))] - - ;; wait and retrieve messages - (async/go-loop [] - (when (.isConnected @client) - (if-let [?message (pop! client queue-name - :direction direction - :pattern pattern)] - (do - (Thread/sleep sleep-time) - (when (async/>! chan ?message) - (recur))) - (recur)))) - - ;; now consume them - (async/go-loop [] - (when-let [?message (async/ Date: Tue, 2 Jul 2024 15:23:50 -0300 Subject: [PATCH 03/15] refactor: improve `pubsub` --- deps.edn | 3 +- src/com/moclojer/rq.clj | 80 +++++++++++++++++++--------------- src/com/moclojer/rq/pubsub.clj | 37 +++++++++++----- 3 files changed, 72 insertions(+), 48 deletions(-) diff --git a/deps.edn b/deps.edn index aa54274..6fdd3bb 100644 --- a/deps.edn +++ b/deps.edn @@ -1,7 +1,6 @@ {:paths ["src"] :deps - {redis.clients/jedis {:mvn/version "5.1.2"} - org.clojure/core.async {:mvn/version "1.6.681"}} + {redis.clients/jedis {:mvn/version "5.1.2"}} :aliases {;; clj -A:dev -m com.moclojer.rq diff --git a/src/com/moclojer/rq.clj b/src/com/moclojer/rq.clj index e8d0a89..7d3a833 100644 --- a/src/com/moclojer/rq.clj +++ b/src/com/moclojer/rq.clj @@ -1,6 +1,9 @@ (ns com.moclojer.rq - (:require [com.moclojer.rq.queue :as queue]) - (:import [redis.clients.jedis JedisPooled])) + (:require + [com.moclojer.rq.pubsub :as pubsub] + [com.moclojer.rq.queue :as queue]) + (:import + [redis.clients.jedis JedisPooled])) (def version "0.1.1") @@ -9,39 +12,46 @@ ^{:private true :dynamic true} *redis-pool* (ref nil)) -(defn client +(defn create-client "redis connect client" - [url] - (when-not @*redis-pool* - (dosync - (ref-set *redis-pool* (JedisPooled. url)))) - *redis-pool*) - -(defn client-kill - "redis kill client" - [] - (.destroy @*redis-pool*)) - -(defn client-disconnect - "redis disconnect client" - [] - (.returnResource @*redis-pool*)) - -(defn -main - [& _] - (client "redis://localhost:6379") - (queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now) - :foo "bar"}) - (println :size (queue/consumer-size *redis-pool* "my-queue")) - (queue/consumer *redis-pool* "my-queue" #(prn :msg %1))) - + ([url] + (create-client url false)) + ([url ref?] + (let [pool (JedisPooled. url)] + (if (and ref? (not @*redis-pool*)) + (dosync + (ref-set *redis-pool* pool) + *redis-pool*) + (atom pool))))) + +(defn close-client + "redis close client" + ([] (close-client *redis-pool*)) + ([client] (.close @client))) (comment - (client "redis://localhost:6379") - (queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now) - :foo "bar"}) - (println :size (queue/consumer-size *redis-pool* "my-queue")) - (queue/consumer *redis-pool* "my-queue" #(prn :msg %1)) - - #_(pubsub/publish *redis-pool* "hello.world" "value set") - #_(pubsub/subscribe *redis-pool* #(prn :chan %1 :msg %2) ["hello.world"])) + (let [client (create-client "redis://localhost:6379")] + (queue/push! client "my-queue" {:user/name "john" + :user/surname "doe" + :user/age 24}) + (let [popped (queue/pop! client "my-queue")] + (close-client client) + popped)) + ;; => #:user{:name "john", :surname "doe", :age 24} + + (let [client (create-client "redis://localhost:6379")] + (pubsub/subscribe! client #(prn :channel %1 :received %2) + ["my-channel" "my-other-channel"]) + (Thread/sleep 1000) + (dotimes [_ 10] + (pubsub/publish! client "my-channel" + {:topic/id :created-user + :user/id 123 + :user/name "john doe"}) + (pubsub/publish! client "my-other-channel" + {:hello :bye + :try 2 + :user-count 5})) + (close-client client)) + ;; + ) diff --git a/src/com/moclojer/rq/pubsub.clj b/src/com/moclojer/rq/pubsub.clj index eb9f7a0..4fcf4a3 100644 --- a/src/com/moclojer/rq/pubsub.clj +++ b/src/com/moclojer/rq/pubsub.clj @@ -1,25 +1,40 @@ (ns com.moclojer.rq.pubsub - (:import [redis.clients.jedis JedisPubSub])) + (:require + [clojure.edn :as edn]) + (:import + [redis.clients.jedis JedisPubSub])) ;; (pubsub/publish redis-client "name-subs" "value set") -(defn publish +(defn publish! "Publish a message to a channel" - [redis-client channel message] - (.publish @redis-client channel message)) + [client channel message] + (.publish @client channel (pr-str message))) -(defn listener +(defn create-listener [on-msg-fn] (proxy [JedisPubSub] [] (onMessage [channel message] (try + ;; TODO: switch for a logger (println "onMessage" channel message) - (on-msg-fn channel message) + (on-msg-fn channel (edn/read-string message)) (catch Exception e - (ex-message e) nil))))) + (.printStackTrace e) + ;; TODO: switch for a logger + (prn :error (ex-message e))))))) -;; (pubsub/subscribe redis-client ["name-subs"]) -(defn subscribe +(defn subscribe! "Subscribe to channels and call the callback function when a message is received is possible to subscribe to multiple channels" - [redis-client on-msg-fn channels] - (.subscribe @redis-client (listener on-msg-fn) (into-array channels))) + [client on-msg-fn channels] + (future (.subscribe @client (create-listener on-msg-fn) (into-array channels)))) + +(comment + (import redis.clients.jedis.JedisPooled) + + (let [client (atom (JedisPooled. "redis://localhost:6379"))] + (subscribe! client #(prn %1 %2) ["my-channel"]) + (Thread/sleep 1000) + (publish! client "my-channel" {:hello true})) + ;; + ) From bb90dcb7dd80d578eae1f2464e58f3a3490f1a81 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 13:31:09 -0300 Subject: [PATCH 04/15] chore: allowing `docker-compose.yml` --- docker/docker-compose.yml | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 docker/docker-compose.yml diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml new file mode 100644 index 0000000..0f3944f --- /dev/null +++ b/docker/docker-compose.yml @@ -0,0 +1,9 @@ +version: "3.7" + +services: + redis: + image: redis:latest + ports: + - "6379:6379" + volumes: + - ./redis:/redis From 8e4029287624b4abe1f3c48af059f92f1b989731 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 13:48:31 -0300 Subject: [PATCH 05/15] feat: add logs everywhere Since we're using `clojure.tools.logging`, we let the enduser decide which logger implementation that will actually do the logging job, be it SLF4J, Log4j2, Logback, etc. Also, the logs being only print debug messages, so if the enduser doesn't want to see them, just raise their logger base level to INFO, and they won't appear anymore. --- .gitignore | 2 +- deps.edn | 3 ++- src/com/moclojer/rq/pubsub.clj | 23 +++++++++++++++++------ src/com/moclojer/rq/queue.clj | 31 +++++++++++++++++++++++-------- 4 files changed, 43 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index d08494d..f3f40bb 100644 --- a/.gitignore +++ b/.gitignore @@ -21,4 +21,4 @@ output.calva-repl *~ /.clj-kondo .envrc -/docker +redis diff --git a/deps.edn b/deps.edn index 6fdd3bb..9052d05 100644 --- a/deps.edn +++ b/deps.edn @@ -1,6 +1,7 @@ {:paths ["src"] :deps - {redis.clients/jedis {:mvn/version "5.1.2"}} + {redis.clients/jedis {:mvn/version "5.1.2"} + org.clojure/tools.logging {:mvn/version "1.3.0"}} :aliases {;; clj -A:dev -m com.moclojer.rq diff --git a/src/com/moclojer/rq/pubsub.clj b/src/com/moclojer/rq/pubsub.clj index 4fcf4a3..3dfe0ca 100644 --- a/src/com/moclojer/rq/pubsub.clj +++ b/src/com/moclojer/rq/pubsub.clj @@ -1,6 +1,7 @@ (ns com.moclojer.rq.pubsub (:require - [clojure.edn :as edn]) + [clojure.edn :as edn] + [clojure.tools.loggings :as log]) (:import [redis.clients.jedis JedisPubSub])) @@ -8,20 +9,30 @@ (defn publish! "Publish a message to a channel" [client channel message] - (.publish @client channel (pr-str message))) + (let [consumer-count (.publish @client channel (pr-str message))] + + (log/debug "published to channel" + {:channel channel + :message message + :consumer-count consumer-count}) + + consumer-count)) (defn create-listener [on-msg-fn] (proxy [JedisPubSub] [] (onMessage [channel message] (try - ;; TODO: switch for a logger - (println "onMessage" channel message) + (log/info "consumed from channel" + {:channel channel + :message message}) (on-msg-fn channel (edn/read-string message)) (catch Exception e (.printStackTrace e) - ;; TODO: switch for a logger - (prn :error (ex-message e))))))) + (log/error "failed to consume from channel" + {:channel channel + :message message + :exception e})))))) (defn subscribe! "Subscribe to channels and call the callback function when a message is received diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index 8875d62..d5c2b13 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -1,20 +1,28 @@ (ns com.moclojer.rq.queue (:require [clojure.edn :as edn] + [clojure.tools.logging :as log] [com.moclojer.rq.utils :as utils])) (defn push! [client queue-name message & options] - ;; NOTE: maybe use clojure.tools.logging? this way the enduser chooses which - ;; logging mechanism to use by themselves. (let [{:keys [direction pattern _at _in _retry _retry-delay] :or {direction :l - pattern :rq}} options + pattern :rq} + :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) - encoded-message (into-array [(pr-str message)])] - (if (= direction :l) - (.lpush @client packed-queue-name encoded-message) - (.rpush @client packed-queue-name encoded-message)))) + encoded-message (into-array [(pr-str message)]) + pushed-count (if (= direction :l) + (.lpush @client packed-queue-name encoded-message) + (.rpush @client packed-queue-name encoded-message))] + + (log/debug "pushed to queue" + {:queue-name packed-queue-name + :message message + :options opts + :pushed-count pushed-count}) + + pushed-count)) (comment (import redis.clients.jedis.JedisPooled) @@ -29,11 +37,18 @@ [client queue-name & options] (let [{:keys [direction pattern] :or {direction :l - pattern :rq}} options + pattern :rq} + :as opts} options packed-queue-name (utils/pack-pattern pattern queue-name) message (if (= direction :l) (.lpop @client packed-queue-name) (.rpop @client packed-queue-name))] + + (log/debug "popped from queue" + {:queue-name packed-queue-name + :options opts + :message message}) + (edn/read-string message))) (comment From 55303df91dba9ac7db028191119ef397e29addad Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 14:52:16 -0300 Subject: [PATCH 06/15] test: add tests and rm comments --- src/com/moclojer/rq.clj | 27 --------------------- src/com/moclojer/rq/pubsub.clj | 13 +--------- src/com/moclojer/rq/queue.clj | 36 ---------------------------- test/com/moclojer/rq/pubsub_test.clj | 27 +++++++++++++++++++++ test/com/moclojer/rq/queue_test.clj | 27 +++++++++++++++++++++ test/com/moclojer/rq_test.clj | 11 +++++++++ test/com/moclojer/test_utils.clj | 10 ++++++++ 7 files changed, 76 insertions(+), 75 deletions(-) create mode 100644 test/com/moclojer/rq/pubsub_test.clj create mode 100644 test/com/moclojer/rq/queue_test.clj create mode 100644 test/com/moclojer/rq_test.clj create mode 100644 test/com/moclojer/test_utils.clj diff --git a/src/com/moclojer/rq.clj b/src/com/moclojer/rq.clj index 7d3a833..8efa67e 100644 --- a/src/com/moclojer/rq.clj +++ b/src/com/moclojer/rq.clj @@ -28,30 +28,3 @@ "redis close client" ([] (close-client *redis-pool*)) ([client] (.close @client))) - -(comment - (let [client (create-client "redis://localhost:6379")] - (queue/push! client "my-queue" {:user/name "john" - :user/surname "doe" - :user/age 24}) - (let [popped (queue/pop! client "my-queue")] - (close-client client) - popped)) - ;; => #:user{:name "john", :surname "doe", :age 24} - - (let [client (create-client "redis://localhost:6379")] - (pubsub/subscribe! client #(prn :channel %1 :received %2) - ["my-channel" "my-other-channel"]) - (Thread/sleep 1000) - (dotimes [_ 10] - (pubsub/publish! client "my-channel" - {:topic/id :created-user - :user/id 123 - :user/name "john doe"}) - (pubsub/publish! client "my-other-channel" - {:hello :bye - :try 2 - :user-count 5})) - (close-client client)) - ;; - ) diff --git a/src/com/moclojer/rq/pubsub.clj b/src/com/moclojer/rq/pubsub.clj index 3dfe0ca..a49e4b3 100644 --- a/src/com/moclojer/rq/pubsub.clj +++ b/src/com/moclojer/rq/pubsub.clj @@ -1,11 +1,10 @@ (ns com.moclojer.rq.pubsub (:require [clojure.edn :as edn] - [clojure.tools.loggings :as log]) + [clojure.tools.logging :as log]) (:import [redis.clients.jedis JedisPubSub])) -;; (pubsub/publish redis-client "name-subs" "value set") (defn publish! "Publish a message to a channel" [client channel message] @@ -39,13 +38,3 @@ is possible to subscribe to multiple channels" [client on-msg-fn channels] (future (.subscribe @client (create-listener on-msg-fn) (into-array channels)))) - -(comment - (import redis.clients.jedis.JedisPooled) - - (let [client (atom (JedisPooled. "redis://localhost:6379"))] - (subscribe! client #(prn %1 %2) ["my-channel"]) - (Thread/sleep 1000) - (publish! client "my-channel" {:hello true})) - ;; - ) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index d5c2b13..cc0ebd0 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -24,15 +24,6 @@ pushed-count)) -(comment - (import redis.clients.jedis.JedisPooled) - - (let [client (atom (JedisPooled. "redis://localhost:6379"))] - (push! client "my-queue" {:hello true} :direction :l) - (.close @client)) - ;; - ) - (defn pop! [client queue-name & options] (let [{:keys [direction pattern] @@ -51,36 +42,9 @@ (edn/read-string message))) -(comment - (import redis.clients.jedis.JedisPooled) - - (let [client (atom (JedisPooled. "redis://localhost:6379"))] - (push! client "my-queue" {:hello true} :direction :r) - (let [popped-val (pop! client "my-queue" :direction :r)] - (.close @client) - popped-val)) - ;; => {:hello true} - ) - (defn llen "get size of a queue" [client queue-name & options] (let [{:keys [pattern] :or {pattern :rq}} options] (.llen @client (utils/pack-pattern pattern queue-name)))) - -(comment - (import redis.clients.jedis.JedisPooled) - - (let [client (atom (JedisPooled. "redis://localhost:6379"))] - (push! client "my-queue" {:hello 1} :direction :r) - (push! client "my-queue" {:hello 2} :direction :r) - (push! client "my-queue" {:hello 3} :direction :r) - (let [queue-length (llen client "my-queue")] - (pop! client "my-queue" :direction :r) - (pop! client "my-queue" :direction :r) - (pop! client "my-queue" :direction :r) - (.close @client) - queue-length)) - ;; => 3 - ) diff --git a/test/com/moclojer/rq/pubsub_test.clj b/test/com/moclojer/rq/pubsub_test.clj new file mode 100644 index 0000000..dbb098d --- /dev/null +++ b/test/com/moclojer/rq/pubsub_test.clj @@ -0,0 +1,27 @@ +(ns com.moclojer.rq.pubsub-test + (:require + [clojure.test :as t] + [com.moclojer.rq :as rq] + [com.moclojer.rq.pubsub :as rq-pubsub] + [com.moclojer.test-utils :as utils])) + +(t/deftest pubsub-test + (let [client (rq/create-client "redis://localhost:6379") + subs (atom []) + channels (into [] (repeatedly 20 #(str (random-uuid)))) + messages (into [] (repeatedly 20 utils/gen-message)) + chan-msg (zipmap channels messages)] + (rq-pubsub/subscribe! client + #(swap! subs conj {:channel %1 :message %2}) + channels) + (doseq [[channel message] chan-msg] + (rq-pubsub/publish! client channel message)) + + (future + ;; more than enough time for 20 messages to be consumed + (Thread/sleep 3000) + (t/is (= (map (fn [[chan msg]] + {:channel chan + :message msg}) + chan-msg) + @subs))))) diff --git a/test/com/moclojer/rq/queue_test.clj b/test/com/moclojer/rq/queue_test.clj new file mode 100644 index 0000000..aa9f651 --- /dev/null +++ b/test/com/moclojer/rq/queue_test.clj @@ -0,0 +1,27 @@ +(ns com.moclojer.rq.queue-test + (:require + [clojure.test :as t] + [com.moclojer.rq :as rq] + [com.moclojer.rq.queue :as rq-queue] + [com.moclojer.test-utils :as utils])) + +(t/deftest queue-test + (let [client (rq/create-client "redis://localhost:6379") + queue-name (str (random-uuid)) + message (utils/gen-message)] + + (t/testing "raw" + (rq-queue/push! client queue-name message) + (rq-queue/push! client queue-name (utils/gen-message)) + (t/is (= 2 (rq-queue/llen client queue-name))) + (t/is (= message (rq-queue/pop! client queue-name :direction :r)))) + + (t/testing "direction" + (rq-queue/push! client queue-name message :direction :r) + (t/is (= message (rq-queue/pop! client queue-name :direction :r)))) + + (t/testing "pattern" + (rq-queue/push! client queue-name message :pattern :pending) + (t/is (= message (rq-queue/pop! client queue-name :pattern :pending)))) + + (rq/close-client client))) diff --git a/test/com/moclojer/rq_test.clj b/test/com/moclojer/rq_test.clj new file mode 100644 index 0000000..18cf275 --- /dev/null +++ b/test/com/moclojer/rq_test.clj @@ -0,0 +1,11 @@ +(ns com.moclojer.rq-test + (:require + [clojure.test :as t] + [com.moclojer.rq :as rq])) + +;; WARNING: redis needs to be runing. + +(t/deftest create-client-test + (let [client (rq/create-client "redis://localhost:6379")] + (t/is (.. @client getPool getResource)) + (rq/close-client client))) diff --git a/test/com/moclojer/test_utils.clj b/test/com/moclojer/test_utils.clj new file mode 100644 index 0000000..b49e226 --- /dev/null +++ b/test/com/moclojer/test_utils.clj @@ -0,0 +1,10 @@ +(ns com.moclojer.test-utils) + +(defn gen-message + "Generates a fuzzy message" + [] + {(random-uuid) 1 + (keyword (str (random-uuid))) true + :test 'hello + :my/test2 "123" + :foobar ["321"]}) From 6987588a44adcef1d91bcf37dd6f046907e16cd5 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 15:00:12 -0300 Subject: [PATCH 07/15] docs: add doc-strings --- src/com/moclojer/rq.clj | 8 +++----- src/com/moclojer/rq/pubsub.clj | 5 ++++- src/com/moclojer/rq/queue.clj | 2 ++ 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/com/moclojer/rq.clj b/src/com/moclojer/rq.clj index 8efa67e..59c4dcc 100644 --- a/src/com/moclojer/rq.clj +++ b/src/com/moclojer/rq.clj @@ -1,7 +1,4 @@ (ns com.moclojer.rq - (:require - [com.moclojer.rq.pubsub :as pubsub] - [com.moclojer.rq.queue :as queue]) (:import [redis.clients.jedis JedisPooled])) @@ -13,7 +10,8 @@ *redis-pool* (ref nil)) (defn create-client - "redis connect client" + "Connect to redis client. If `ref?` is true, will save the created instance + in the global var `*redis-pool*. Just returns the created instance otherwise." ([url] (create-client url false)) ([url ref?] @@ -25,6 +23,6 @@ (atom pool))))) (defn close-client - "redis close client" + "Disconnect and close redis client" ([] (close-client *redis-pool*)) ([client] (.close @client))) diff --git a/src/com/moclojer/rq/pubsub.clj b/src/com/moclojer/rq/pubsub.clj index a49e4b3..c939ca1 100644 --- a/src/com/moclojer/rq/pubsub.clj +++ b/src/com/moclojer/rq/pubsub.clj @@ -18,6 +18,9 @@ consumer-count)) (defn create-listener + "Create a listener for the pubsub. It will be entry point for any published + data, being responsible for routing the right consumer. However, that's on + the enduser." [on-msg-fn] (proxy [JedisPubSub] [] (onMessage [channel message] @@ -35,6 +38,6 @@ (defn subscribe! "Subscribe to channels and call the callback function when a message is received - is possible to subscribe to multiple channels" + NOTE: It is possible to subscribe to multiple channels" [client on-msg-fn channels] (future (.subscribe @client (create-listener on-msg-fn) (into-array channels)))) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index cc0ebd0..cdfaeeb 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -5,6 +5,7 @@ [com.moclojer.rq.utils :as utils])) (defn push! + "Push a messagem into a queue" [client queue-name message & options] (let [{:keys [direction pattern _at _in _retry _retry-delay] :or {direction :l @@ -25,6 +26,7 @@ pushed-count)) (defn pop! + "Pop a message from a queue" [client queue-name & options] (let [{:keys [direction pattern] :or {direction :l From 78a079534d0c4551628bf2392d407e6185e0314f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josu=C3=A9=20Teodoro=20Moreira?= Date: Wed, 3 Jul 2024 15:08:26 -0300 Subject: [PATCH 08/15] style: mensagem -> message Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- src/com/moclojer/rq/queue.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index cdfaeeb..e8a6967 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -5,7 +5,7 @@ [com.moclojer.rq.utils :as utils])) (defn push! - "Push a messagem into a queue" + "Push a message into a queue" [client queue-name message & options] (let [{:keys [direction pattern _at _in _retry _retry-delay] :or {direction :l From f974be243e5687cde43986770314bc0aab78641d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josu=C3=A9=20Teodoro=20Moreira?= Date: Wed, 3 Jul 2024 15:19:41 -0300 Subject: [PATCH 09/15] style: grammar & log improvements Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- src/com/moclojer/rq/queue.clj | 33 ++++++++++++++++++++++++++++----- src/com/moclojer/rq/utils.clj | 2 +- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/com/moclojer/rq/queue.clj b/src/com/moclojer/rq/queue.clj index e8a6967..b8ac904 100644 --- a/src/com/moclojer/rq/queue.clj +++ b/src/com/moclojer/rq/queue.clj @@ -5,7 +5,15 @@ [com.moclojer.rq.utils :as utils])) (defn push! - "Push a message into a queue" + "Push a message into a queue. + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - message: Message to be pushed + - options: Optional parameters, including: + - direction: Direction to push the message (:l or :r) + - pattern: Pattern for the queue name" [client queue-name message & options] (let [{:keys [direction pattern _at _in _retry _retry-delay] :or {direction :l @@ -18,7 +26,8 @@ (.rpush @client packed-queue-name encoded-message))] (log/debug "pushed to queue" - {:queue-name packed-queue-name + {:client client + :queue-name packed-queue-name :message message :options opts :pushed-count pushed-count}) @@ -26,7 +35,14 @@ pushed-count)) (defn pop! - "Pop a message from a queue" + "Pop a message from a queue. + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - options: Optional parameters, including: + - direction: Direction to pop the message (:l or :r) + - pattern: Pattern for the queue name" [client queue-name & options] (let [{:keys [direction pattern] :or {direction :l @@ -38,14 +54,21 @@ (.rpop @client packed-queue-name))] (log/debug "popped from queue" - {:queue-name packed-queue-name + {:client client + :queue-name packed-queue-name :options opts :message message}) (edn/read-string message))) (defn llen - "get size of a queue" + "Get the size of a queue. + + Parameters: + - client: Redis client + - queue-name: Name of the queue + - options: Optional parameters, including: + - pattern: Pattern for the queue name" [client queue-name & options] (let [{:keys [pattern] :or {pattern :rq}} options] diff --git a/src/com/moclojer/rq/utils.clj b/src/com/moclojer/rq/utils.clj index 1f1602c..3752799 100644 --- a/src/com/moclojer/rq/utils.clj +++ b/src/com/moclojer/rq/utils.clj @@ -25,7 +25,7 @@ ;; => "rq:my-queue" (pack-pattern :pending "foobar") - ;; => "rq:pendingfoobar" + ;; => "rq:pending:foobar" (unpack-pattern :pending "rq:pending:foobar") ;; => "foobar" From fc611267ce70d43416fefdd542f0d89410c6fb3a Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 15:20:18 -0300 Subject: [PATCH 10/15] docs: update README with new implementation --- README.md | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 4f7af61..0010a78 100644 --- a/README.md +++ b/README.md @@ -12,17 +12,19 @@ RQ (Redis Queue) is a simple Clojure package for queueing jobs and processing th [com.moclojer.rq.queue :as queue] [com.moclojer.rq.pubsub :as pubsub])) -(def *redis-pool* (rq/client "redis://localhost:6379/0")) +(def *redis-pool* (rq/create-client "redis://localhost:6379/0")) ;; queue -(queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now) - :foo "bar"}) -(println :size (queue/consumer-size *redis-pool* "my-queue")) -(queue/consumer *redis-pool* "my-queue" #(prn :msg %1)) +(queue/push! *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now) + :foo "bar"}) +(println :size (queue/llen *redis-pool* "my-queue")) +(prn :popped (queue/pop! *redis-pool* "my-queue")) ;; pub/sub -(pubsub/publish *redis-pool* "name-subs" "value set") -(pubsub/subscribe *redis-pool* #(prn :chan %1 :msg %2) ["name-subs"]) +(pubsub/subscribe! *redis-pool* #(prn :chan %1 :msg %2) ["name-subs"]) +(pubsub/publish! *redis-pool* "name-subs" {:hello true)}) + +(rq/close-client *redis-pool*) ``` ## installation From 2aa7fd9bbcd3ec17653184dd6cefb24a1360b64e Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 15:25:07 -0300 Subject: [PATCH 11/15] refactor: improve `unpack-pattern` efficiency --- src/com/moclojer/rq/utils.clj | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/com/moclojer/rq/utils.clj b/src/com/moclojer/rq/utils.clj index 3752799..471fd47 100644 --- a/src/com/moclojer/rq/utils.clj +++ b/src/com/moclojer/rq/utils.clj @@ -16,9 +16,7 @@ (defn unpack-pattern [pattern queue-name] - (apply str (-> (pattern->str pattern) - (count) - (drop queue-name)))) + (subs queue-name (count (pattern->str pattern)))) (comment (pack-pattern :rq "my-queue") From 439d4dae81c6c6f7663bf2a8708dcbb3c6c7e55e Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 16:29:50 -0300 Subject: [PATCH 12/15] chore: add ci test workflow --- .github/workflows/tests.yml | 39 +++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 .github/workflows/tests.yml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..c63ebc7 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,39 @@ +name: Run Tests + +on: + push: + branches: + - main + - 'refs/tags/*' + pull_request: + branches: + - main + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Java + uses: actions/setup-java@v1 + with: + distribution: 'adopt' + java-version: '11' + + - name: Install clojure cli + uses: DeLaGuardo/setup-clojure@master + with: + cli: 1.11.1.1113 + + - name: Cache Maven packages + uses: actions/cache@v3 + with: + path: ~/.m2 + key: ${{ runner.os }}-m2-${{ hashFiles('**/deps.edn') }} + restore-keys: ${{ runner.os }}-m2 + + - name: Run tests + run: clojure -M:test From 3e6bafe4f8773bc9073eee7c94261da102940a40 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 16:37:50 -0300 Subject: [PATCH 13/15] feat: raise redis service before tests --- .github/workflows/tests.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index c63ebc7..352b2e2 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -12,6 +12,12 @@ on: jobs: test: runs-on: ubuntu-latest + services: + redis: + image: "redis:7" + ports: + - 6379:6379 + options: --rm steps: - name: Checkout code From d6efcff11247073c7e516bd233dde21f8a2419d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josu=C3=A9=20Teodoro=20Moreira?= Date: Wed, 3 Jul 2024 17:47:09 -0300 Subject: [PATCH 14/15] chore(deps): bump actions/checkout` to v3 Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- .github/workflows/tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 352b2e2..9af256c 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,7 +21,7 @@ jobs: steps: - name: Checkout code - uses: actions/checkout@v2 + uses: actions/checkout@v3 - name: Set up Java uses: actions/setup-java@v1 From b6dac3c07dba44e4491a5838ed7160d3cf569da2 Mon Sep 17 00:00:00 2001 From: J0sueTM Date: Wed, 3 Jul 2024 18:40:52 -0300 Subject: [PATCH 15/15] docs: add workflow diagram --- README.md | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/README.md b/README.md index 0010a78..c75ae2b 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,37 @@ RQ (Redis Queue) is a simple Clojure package for queueing jobs and processing th (rq/close-client *redis-pool*) ``` +The workflow in the given example can be represented as follows: + +```mermaid +sequenceDiagram + participant User + participant Client + participant Queue + participant PubSub + participant Logger + + User->>Client: create-client URL + Client-->>Logger: log client creation + Client-->>User: return client + + User->>Queue: push! message + Queue-->>Logger: log push message + Queue-->>Queue: push message to queue + + User->>PubSub: publish! channel, message + PubSub-->>Logger: log publish message + PubSub-->>PubSub: publish message to channel + + User->>Queue: pop! queue-name + Queue-->>Logger: log pop operation + Queue-->>User: return popped message + + User->>Client: close-client client + Client-->>Logger: log closing client + Client-->>User: confirm client closure +``` + ## installation We distribute the library via [Clojars](https://clojars.org/com.moclojer/rq).