Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API improvement refactor #5

Merged
merged 15 commits into from
Jul 3, 2024
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ output.calva-repl
*~
/.clj-kondo
.envrc
redis
9 changes: 5 additions & 4 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{:paths ["src"]
:deps
{redis.clients/jedis {:mvn/version "5.1.2"}}
{redis.clients/jedis {:mvn/version "5.1.2"}
org.clojure/tools.logging {:mvn/version "1.3.0"}}

:aliases
{;; clj -A:dev -m com.moclojer.rq
Expand Down Expand Up @@ -43,6 +44,6 @@
:sign-releases? false
:pom-file "target/classes/META-INF/maven/com.moclojer/rq/pom.xml"
:artifact "target/com.moclojer.rq.jar"}}}}



9 changes: 9 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: "3.7"

services:
redis:
image: redis:latest
ports:
- "6379:6379"
volumes:
- ./redis:/redis
57 changes: 19 additions & 38 deletions src/com/moclojer/rq.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns com.moclojer.rq
(:require [com.moclojer.rq.queue :as queue])
(:import [redis.clients.jedis JedisPooled]))
(:import
[redis.clients.jedis JedisPooled]))

(def version "0.1.1")

Expand All @@ -9,39 +9,20 @@
^{:private true :dynamic true}
*redis-pool* (ref nil))

(defn client
"redis connect client"
[url]
(when-not @*redis-pool*
(dosync
(ref-set *redis-pool* (JedisPooled. url))))
*redis-pool*)

(defn client-kill
"redis kill client"
[]
(.destroy @*redis-pool*))

(defn client-disconnect
"redis disconnect client"
[]
(.returnResource @*redis-pool*))

(defn -main
[& _]
(client "redis://localhost:6379")
(queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now)
:foo "bar"})
(println :size (queue/consumer-size *redis-pool* "my-queue"))
(queue/consumer *redis-pool* "my-queue" #(prn :msg %1)))


(comment
(client "redis://localhost:6379")
(queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now)
:foo "bar"})
(println :size (queue/consumer-size *redis-pool* "my-queue"))
(queue/consumer *redis-pool* "my-queue" #(prn :msg %1))

#_(pubsub/publish *redis-pool* "hello.world" "value set")
#_(pubsub/subscribe *redis-pool* #(prn :chan %1 :msg %2) ["hello.world"]))
(defn create-client
"Connect to redis client. If `ref?` is true, will save the created instance
in the global var `*redis-pool*. Just returns the created instance otherwise."
([url]
(create-client url false))
([url ref?]
(let [pool (JedisPooled. url)]
(if (and ref? (not @*redis-pool*))
(dosync
(ref-set *redis-pool* pool)
*redis-pool*)
(atom pool)))))

(defn close-client
"Disconnect and close redis client"
([] (close-client *redis-pool*))
([client] (.close @client)))
46 changes: 32 additions & 14 deletions src/com/moclojer/rq/pubsub.clj
Original file line number Diff line number Diff line change
@@ -1,25 +1,43 @@
(ns com.moclojer.rq.pubsub
(:import [redis.clients.jedis JedisPubSub]))
(:require
[clojure.edn :as edn]
[clojure.tools.logging :as log])
(:import
[redis.clients.jedis JedisPubSub]))

;; (pubsub/publish redis-client "name-subs" "value set")
(defn publish
(defn publish!
"Publish a message to a channel"
[redis-client channel message]
(.publish @redis-client channel message))
[client channel message]
(let [consumer-count (.publish @client channel (pr-str message))]

(defn listener
(log/debug "published to channel"
{:channel channel
:message message
:consumer-count consumer-count})

consumer-count))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(defn create-listener
"Create a listener for the pubsub. It will be entry point for any published
data, being responsible for routing the right consumer. However, that's on
the enduser."
[on-msg-fn]
(proxy [JedisPubSub] []
(onMessage [channel message]
(try
(println "onMessage" channel message)
(on-msg-fn channel message)
(log/info "consumed from channel"
{:channel channel
:message message})
(on-msg-fn channel (edn/read-string message))
(catch Exception e
(ex-message e) nil)))))
(.printStackTrace e)
(log/error "failed to consume from channel"
{:channel channel
:message message
:exception e}))))))

;; (pubsub/subscribe redis-client ["name-subs"])
(defn subscribe
(defn subscribe!
"Subscribe to channels and call the callback function when a message is received
is possible to subscribe to multiple channels"
[redis-client on-msg-fn channels]
(.subscribe @redis-client (listener on-msg-fn) (into-array channels)))
NOTE: It is possible to subscribe to multiple channels"
[client on-msg-fn channels]
(future (.subscribe @client (create-listener on-msg-fn) (into-array channels))))
88 changes: 46 additions & 42 deletions src/com/moclojer/rq/queue.clj
Original file line number Diff line number Diff line change
@@ -1,48 +1,52 @@
(ns com.moclojer.rq.queue)
(ns com.moclojer.rq.queue
(:require
[clojure.edn :as edn]
[clojure.tools.logging :as log]
[com.moclojer.rq.utils :as utils]))

(defn pattern-name
"Return the pattern name for the queue, which is the name prefixed with `rq:`"
[name]
(str "rq:" name))
(defn push!
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
"Push a message into a queue"
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
[client queue-name message & options]
(let [{:keys [direction pattern _at _in _retry _retry-delay]
:or {direction :l
pattern :rq}
:as opts} options
packed-queue-name (utils/pack-pattern pattern queue-name)
encoded-message (into-array [(pr-str message)])
pushed-count (if (= direction :l)
(.lpush @client packed-queue-name encoded-message)
(.rpush @client packed-queue-name encoded-message))]

;; queue:push
;; (queue/producer client queue-name {:foo "bar"} :at ... :in ... :retry 3 :retry-delay 10)
(defn producer
"Push a job to the queue"
[redis-client queue-name message & {:keys [direction at in retry retry-delay]
:or {direction "l" retry 3 retry-delay 10}}]
(println at in retry retry-delay)
(let [qname (pattern-name queue-name)
msg (into-array [(pr-str message)])]
(if (= direction "r")
(.rpush @redis-client qname msg)
(.lpush @redis-client qname msg))))
(log/debug "pushed to queue"
{:queue-name packed-queue-name
:message message
:options opts
:pushed-count pushed-count})
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(defn consumer-size
"get size of the queue"
[redis-client queue-name]
(.llen @redis-client (pattern-name queue-name)))
pushed-count))

;; queue:process and pop
;; (queue/consumer client queue-name (fn [job] (println job))
(defn consumer
"Consume a message from the queue, blocking in loop until a message is available."
[redis-client queue-name consumer-fn & {:keys [size loop-sleep direction]
:or {direction "l" size -1 loop-sleep 0}}]
(let [qname (pattern-name queue-name)]
(loop []
(Thread/sleep loop-sleep)
(flush)
(let [msg (if (= direction "r")
(.rrange @redis-client qname 0 size)
(.lrange @redis-client qname 0 size))]
(when msg
(consumer-fn msg)
(println :size (consumer-size redis-client qname))
(if (= direction "r")
(.rpop @redis-client qname)
(.lpop @redis-client qname))
(System/exit 0)
(recur))))))
(defn pop!
"Pop a message from a queue"
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
[client queue-name & options]
(let [{:keys [direction pattern]
:or {direction :l
pattern :rq}
:as opts} options
packed-queue-name (utils/pack-pattern pattern queue-name)
message (if (= direction :l)
(.lpop @client packed-queue-name)
(.rpop @client packed-queue-name))]

(log/debug "popped from queue"
{:queue-name packed-queue-name
:options opts
:message message})
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(edn/read-string message)))

(defn llen
"get size of a queue"
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
[client queue-name & options]
(let [{:keys [pattern]
:or {pattern :rq}} options]
(.llen @client (utils/pack-pattern pattern queue-name))))
32 changes: 32 additions & 0 deletions src/com/moclojer/rq/utils.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
(ns com.moclojer.rq.utils)

(defn- pattern->str
[pattern]
(let [patterns {:rq "rq:"
:pending "rq:pending:"}]
(or (get-in patterns [pattern])
(throw (ex-info (str "No pattern named " pattern)
{:cause :illegal-argument
:value pattern
:expected (keys patterns)})))))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(defn pack-pattern
[pattern queue-name]
(str (pattern->str pattern) queue-name))

(defn unpack-pattern
[pattern queue-name]
(apply str (-> (pattern->str pattern)
(count)
(drop queue-name))))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(comment
(pack-pattern :rq "my-queue")
;; => "rq:my-queue"

(pack-pattern :pending "foobar")
;; => "rq:pendingfoobar"

(unpack-pattern :pending "rq:pending:foobar")
;; => "foobar"
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
)
27 changes: 27 additions & 0 deletions test/com/moclojer/rq/pubsub_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(ns com.moclojer.rq.pubsub-test
(:require
[clojure.test :as t]
[com.moclojer.rq :as rq]
[com.moclojer.rq.pubsub :as rq-pubsub]
[com.moclojer.test-utils :as utils]))

(t/deftest pubsub-test
(let [client (rq/create-client "redis://localhost:6379")
subs (atom [])
channels (into [] (repeatedly 20 #(str (random-uuid))))
messages (into [] (repeatedly 20 utils/gen-message))
chan-msg (zipmap channels messages)]
(rq-pubsub/subscribe! client
#(swap! subs conj {:channel %1 :message %2})
channels)
(doseq [[channel message] chan-msg]
(rq-pubsub/publish! client channel message))

(future
;; more than enough time for 20 messages to be consumed
(Thread/sleep 3000)
(t/is (= (map (fn [[chan msg]]
{:channel chan
:message msg})
chan-msg)
@subs)))))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 27 additions & 0 deletions test/com/moclojer/rq/queue_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
(ns com.moclojer.rq.queue-test
(:require
[clojure.test :as t]
[com.moclojer.rq :as rq]
[com.moclojer.rq.queue :as rq-queue]
[com.moclojer.test-utils :as utils]))

(t/deftest queue-test
(let [client (rq/create-client "redis://localhost:6379")
queue-name (str (random-uuid))
message (utils/gen-message)]

(t/testing "raw"
(rq-queue/push! client queue-name message)
(rq-queue/push! client queue-name (utils/gen-message))
(t/is (= 2 (rq-queue/llen client queue-name)))
(t/is (= message (rq-queue/pop! client queue-name :direction :r))))

(t/testing "direction"
(rq-queue/push! client queue-name message :direction :r)
(t/is (= message (rq-queue/pop! client queue-name :direction :r))))

(t/testing "pattern"
(rq-queue/push! client queue-name message :pattern :pending)
(t/is (= message (rq-queue/pop! client queue-name :pattern :pending))))

(rq/close-client client)))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved
11 changes: 11 additions & 0 deletions test/com/moclojer/rq_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
(ns com.moclojer.rq-test
(:require
[clojure.test :as t]
[com.moclojer.rq :as rq]))

;; WARNING: redis needs to be runing.

(t/deftest create-client-test
(let [client (rq/create-client "redis://localhost:6379")]
(t/is (.. @client getPool getResource))
(rq/close-client client)))
10 changes: 10 additions & 0 deletions test/com/moclojer/test_utils.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
(ns com.moclojer.test-utils)

(defn gen-message
"Generates a fuzzy message"
[]
{(random-uuid) 1
(keyword (str (random-uuid))) true
:test 'hello
:my/test2 "123"
:foobar ["321"]})