Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API improvement refactor #5

Merged
merged 15 commits into from
Jul 3, 2024
Merged
39 changes: 39 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: Run Tests

on:
push:
branches:
- main
- 'refs/tags/*'
pull_request:
branches:
- main

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v2
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

- name: Set up Java
uses: actions/setup-java@v1
with:
distribution: 'adopt'
java-version: '11'
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

- name: Install clojure cli
uses: DeLaGuardo/setup-clojure@master
with:
cli: 1.11.1.1113
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

- name: Cache Maven packages
uses: actions/cache@v3
with:
path: ~/.m2
key: ${{ runner.os }}-m2-${{ hashFiles('**/deps.edn') }}
restore-keys: ${{ runner.os }}-m2

- name: Run tests
run: clojure -M:test
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ output.calva-repl
*~
/.clj-kondo
.envrc
redis
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@ RQ (Redis Queue) is a simple Clojure package for queueing jobs and processing th
[com.moclojer.rq.queue :as queue]
[com.moclojer.rq.pubsub :as pubsub]))

(def *redis-pool* (rq/client "redis://localhost:6379/0"))
(def *redis-pool* (rq/create-client "redis://localhost:6379/0"))

;; queue
(queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now)
:foo "bar"})
(println :size (queue/consumer-size *redis-pool* "my-queue"))
(queue/consumer *redis-pool* "my-queue" #(prn :msg %1))
(queue/push! *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now)
:foo "bar"})
(println :size (queue/llen *redis-pool* "my-queue"))
(prn :popped (queue/pop! *redis-pool* "my-queue"))

;; pub/sub
(pubsub/publish *redis-pool* "name-subs" "value set")
(pubsub/subscribe *redis-pool* #(prn :chan %1 :msg %2) ["name-subs"])
(pubsub/subscribe! *redis-pool* #(prn :chan %1 :msg %2) ["name-subs"])
(pubsub/publish! *redis-pool* "name-subs" {:hello true)})

(rq/close-client *redis-pool*)
```

## installation
Expand Down
9 changes: 5 additions & 4 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{:paths ["src"]
:deps
{redis.clients/jedis {:mvn/version "5.1.2"}}
{redis.clients/jedis {:mvn/version "5.1.2"}
org.clojure/tools.logging {:mvn/version "1.3.0"}}

:aliases
{;; clj -A:dev -m com.moclojer.rq
Expand Down Expand Up @@ -43,6 +44,6 @@
:sign-releases? false
:pom-file "target/classes/META-INF/maven/com.moclojer/rq/pom.xml"
:artifact "target/com.moclojer.rq.jar"}}}}



9 changes: 9 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
version: "3.7"

services:
redis:
image: redis:latest
ports:
- "6379:6379"
volumes:
- ./redis:/redis
57 changes: 19 additions & 38 deletions src/com/moclojer/rq.clj
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns com.moclojer.rq
(:require [com.moclojer.rq.queue :as queue])
(:import [redis.clients.jedis JedisPooled]))
(:import
[redis.clients.jedis JedisPooled]))

(def version "0.1.1")

Expand All @@ -9,39 +9,20 @@
^{:private true :dynamic true}
*redis-pool* (ref nil))

(defn client
"redis connect client"
[url]
(when-not @*redis-pool*
(dosync
(ref-set *redis-pool* (JedisPooled. url))))
*redis-pool*)

(defn client-kill
"redis kill client"
[]
(.destroy @*redis-pool*))

(defn client-disconnect
"redis disconnect client"
[]
(.returnResource @*redis-pool*))

(defn -main
[& _]
(client "redis://localhost:6379")
(queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now)
:foo "bar"})
(println :size (queue/consumer-size *redis-pool* "my-queue"))
(queue/consumer *redis-pool* "my-queue" #(prn :msg %1)))


(comment
(client "redis://localhost:6379")
(queue/producer *redis-pool* "my-queue" {:now (java.time.LocalDateTime/now)
:foo "bar"})
(println :size (queue/consumer-size *redis-pool* "my-queue"))
(queue/consumer *redis-pool* "my-queue" #(prn :msg %1))

#_(pubsub/publish *redis-pool* "hello.world" "value set")
#_(pubsub/subscribe *redis-pool* #(prn :chan %1 :msg %2) ["hello.world"]))
(defn create-client
"Connect to redis client. If `ref?` is true, will save the created instance
in the global var `*redis-pool*. Just returns the created instance otherwise."
([url]
(create-client url false))
([url ref?]
(let [pool (JedisPooled. url)]
(if (and ref? (not @*redis-pool*))
(dosync
(ref-set *redis-pool* pool)
*redis-pool*)
(atom pool)))))

(defn close-client
"Disconnect and close redis client"
([] (close-client *redis-pool*))
([client] (.close @client)))
46 changes: 32 additions & 14 deletions src/com/moclojer/rq/pubsub.clj
Original file line number Diff line number Diff line change
@@ -1,25 +1,43 @@
(ns com.moclojer.rq.pubsub
(:import [redis.clients.jedis JedisPubSub]))
(:require
[clojure.edn :as edn]
[clojure.tools.logging :as log])
(:import
[redis.clients.jedis JedisPubSub]))

;; (pubsub/publish redis-client "name-subs" "value set")
(defn publish
(defn publish!
"Publish a message to a channel"
[redis-client channel message]
(.publish @redis-client channel message))
[client channel message]
(let [consumer-count (.publish @client channel (pr-str message))]

(defn listener
(log/debug "published to channel"
{:channel channel
:message message
:consumer-count consumer-count})

consumer-count))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(defn create-listener
"Create a listener for the pubsub. It will be entry point for any published
data, being responsible for routing the right consumer. However, that's on
the enduser."
[on-msg-fn]
(proxy [JedisPubSub] []
(onMessage [channel message]
(try
(println "onMessage" channel message)
(on-msg-fn channel message)
(log/info "consumed from channel"
{:channel channel
:message message})
(on-msg-fn channel (edn/read-string message))
(catch Exception e
(ex-message e) nil)))))
(.printStackTrace e)
(log/error "failed to consume from channel"
{:channel channel
:message message
:exception e}))))))

;; (pubsub/subscribe redis-client ["name-subs"])
(defn subscribe
(defn subscribe!
"Subscribe to channels and call the callback function when a message is received
is possible to subscribe to multiple channels"
[redis-client on-msg-fn channels]
(.subscribe @redis-client (listener on-msg-fn) (into-array channels)))
NOTE: It is possible to subscribe to multiple channels"
[client on-msg-fn channels]
(future (.subscribe @client (create-listener on-msg-fn) (into-array channels))))
119 changes: 73 additions & 46 deletions src/com/moclojer/rq/queue.clj
Original file line number Diff line number Diff line change
@@ -1,48 +1,75 @@
(ns com.moclojer.rq.queue)

(defn pattern-name
"Return the pattern name for the queue, which is the name prefixed with `rq:`"
[name]
(str "rq:" name))

;; queue:push
;; (queue/producer client queue-name {:foo "bar"} :at ... :in ... :retry 3 :retry-delay 10)
(defn producer
"Push a job to the queue"
[redis-client queue-name message & {:keys [direction at in retry retry-delay]
:or {direction "l" retry 3 retry-delay 10}}]
(println at in retry retry-delay)
(let [qname (pattern-name queue-name)
msg (into-array [(pr-str message)])]
(if (= direction "r")
(.rpush @redis-client qname msg)
(.lpush @redis-client qname msg))))

(defn consumer-size
"get size of the queue"
[redis-client queue-name]
(.llen @redis-client (pattern-name queue-name)))

;; queue:process and pop
;; (queue/consumer client queue-name (fn [job] (println job))
(defn consumer
"Consume a message from the queue, blocking in loop until a message is available."
[redis-client queue-name consumer-fn & {:keys [size loop-sleep direction]
:or {direction "l" size -1 loop-sleep 0}}]
(let [qname (pattern-name queue-name)]
(loop []
(Thread/sleep loop-sleep)
(flush)
(let [msg (if (= direction "r")
(.rrange @redis-client qname 0 size)
(.lrange @redis-client qname 0 size))]
(when msg
(consumer-fn msg)
(println :size (consumer-size redis-client qname))
(if (= direction "r")
(.rpop @redis-client qname)
(.lpop @redis-client qname))
(System/exit 0)
(recur))))))
(ns com.moclojer.rq.queue
(:require
[clojure.edn :as edn]
[clojure.tools.logging :as log]
[com.moclojer.rq.utils :as utils]))

(defn push!
"Push a message into a queue.
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

Parameters:
- client: Redis client
- queue-name: Name of the queue
- message: Message to be pushed
- options: Optional parameters, including:
- direction: Direction to push the message (:l or :r)
- pattern: Pattern for the queue name"
[client queue-name message & options]
(let [{:keys [direction pattern _at _in _retry _retry-delay]
:or {direction :l
pattern :rq}
:as opts} options
packed-queue-name (utils/pack-pattern pattern queue-name)
encoded-message (into-array [(pr-str message)])
pushed-count (if (= direction :l)
(.lpush @client packed-queue-name encoded-message)
(.rpush @client packed-queue-name encoded-message))]

(log/debug "pushed to queue"
{:client client
:queue-name packed-queue-name
:message message
:options opts
:pushed-count pushed-count})

pushed-count))

(defn pop!
"Pop a message from a queue.

Parameters:
- client: Redis client
- queue-name: Name of the queue
- options: Optional parameters, including:
- direction: Direction to pop the message (:l or :r)
- pattern: Pattern for the queue name"
[client queue-name & options]
(let [{:keys [direction pattern]
:or {direction :l
pattern :rq}
:as opts} options
packed-queue-name (utils/pack-pattern pattern queue-name)
message (if (= direction :l)
(.lpop @client packed-queue-name)
(.rpop @client packed-queue-name))]

(log/debug "popped from queue"
{:client client
:queue-name packed-queue-name
:options opts
:message message})

(edn/read-string message)))

(defn llen
"Get the size of a queue.

Parameters:
- client: Redis client
- queue-name: Name of the queue
- options: Optional parameters, including:
- pattern: Pattern for the queue name"
[client queue-name & options]
(let [{:keys [pattern]
:or {pattern :rq}} options]
(.llen @client (utils/pack-pattern pattern queue-name))))
30 changes: 30 additions & 0 deletions src/com/moclojer/rq/utils.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
(ns com.moclojer.rq.utils)

(defn- pattern->str
[pattern]
(let [patterns {:rq "rq:"
:pending "rq:pending:"}]
(or (get-in patterns [pattern])
(throw (ex-info (str "No pattern named " pattern)
{:cause :illegal-argument
:value pattern
:expected (keys patterns)})))))
J0sueTM marked this conversation as resolved.
Show resolved Hide resolved

(defn pack-pattern
[pattern queue-name]
(str (pattern->str pattern) queue-name))

(defn unpack-pattern
[pattern queue-name]
(subs queue-name (count (pattern->str pattern))))

(comment
(pack-pattern :rq "my-queue")
;; => "rq:my-queue"

(pack-pattern :pending "foobar")
;; => "rq:pending:foobar"

(unpack-pattern :pending "rq:pending:foobar")
;; => "foobar"
)
Loading
Loading