Skip to content

Commit

Permalink
Adds do-for-pool! feature
Browse files Browse the repository at this point in the history
  • Loading branch information
jtkDvlp committed Apr 20, 2024
1 parent 966a2dc commit 20f5dca
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 41 deletions.
26 changes: 12 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,31 @@ The following example handling both the browser and the worker within one script
[]
(let [;; you can create one worker or a pool (async channel of workers)
worker-pool
(main/create-pool 2 "js/worker/worker.js")
(main/create-pool 2 "worker.js")

;; a "do-with-pool" or "-worker" (see below) will return immediately and give you a result channel. So to print the result you have to handle the channel
print-result
(fn [result-chan]
(fn [message result-chan]
(go
(let [result (<! result-chan)]
(.debug js/console
message
(str (:state result))
result))))]
(clj->js result)))))]

;; Copy all simple values
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10}}))
;; Copy the simple values and transfer the ArrayBuffer
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:d]}))
;; Copy the simple values and transfer the ArrayBuffer, but transfer (browser thread) will fail cause the wrong value and the wrong type is marked to do so
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:c]}))
;; Copy the simple values and transfer the ArrayBuffer, but transfer mirroring (worker thread) will fail cause the wrong value and the wrong type is marked to do so
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:c]} :transfer [:d]}))))
(print-result "Copy all simple values" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10}}))
(print-result "Copy the simple values and transfer the ArrayBuffer" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:d]}))
(print-result "Copy the simple values and transfer the ArrayBuffer, but transfer (browser thread) will fail cause the wrong value and the wrong type is marked to do so" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:c]}))
(print-result "Copy the simple values and transfer the ArrayBuffer, but transfer mirroring (worker thread) will fail cause the wrong value and the wrong type is marked to do so" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:c]} :transfer [:d]}))
(print-result "Copy values but do it with every worker of the pool" (main/do-for-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10}}))))

;; Setup the worker path (handling both in one file)
(defn worker
[]
(worker/register
:mirror
(fn [arguments]
arguments))
:mirror
(fn [arguments]
arguments))

(worker/bootstrap))

Expand Down
69 changes: 53 additions & 16 deletions src/cljs_workers/core.cljs
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
(ns cljs-workers.core
(:require [cljs.core.async :refer [chan promise-chan <! >! put!]])
(:require-macros [cljs.core.async.macros :refer [go]]))
(:require
[cljs.core.async
:refer [chan promise-chan <! >! put!]
:as async])

(:require-macros
[cljs.core.async.macros :refer [go go-loop]]))

(defn supported?
[]
(-> js/self
.-Worker
undefined?
not))
(.-Worker)
(undefined?)
(not)))

(defn worker?
[]
(-> js/self
.-document
undefined?))
(.-document)
(undefined?)))

(def main?
(complement worker?))
Expand All @@ -33,18 +38,18 @@
(let [workers (chan count)]
(dotimes [_ count]
(put! workers (create-one script)))
workers)))
{:workers workers, :count count})))

(defn- do-request!
[worker {:keys [handler arguments transfer] :as request}]
(let [message
(-> {:handler handler, :arguments arguments}
clj->js)
(clj->js))

transfer
(->> transfer
(select-keys arguments)
vals)]
(vals))]

(if (seq transfer)
(.postMessage worker message (clj->js transfer))
Expand Down Expand Up @@ -75,18 +80,50 @@

(defn do-with-pool!
[pool {:keys [handler arguments transfer] :as request}]
;; WATCHOUT: We want an promise-chan!
(let [result* (promise-chan)]
(go
(let [worker
(<! pool)
(let [{:keys [workers]}
pool

result-chan
(do-with-worker! worker request)
worker
(<! workers)

result
(<! result-chan)]
(<! (do-with-worker! worker request))]

(>! pool worker)
(>! workers worker)
(>! result* result)))

result*))

(defn- take!
[n ch]
(go-loop [n n, xs []]
(if (> n 0)
(recur
(dec n)
(conj xs (<! ch)))
xs)))

(defn do-for-pool!
[pool {:keys [handler arguments transfer] :as request}]
;; WATCHOUT: We want an promise-chan!
(let [result* (promise-chan)]
(go
(let [{:keys [workers count]}
pool

all-workers
(<! (take! count workers))

results
(->> all-workers
(map #(do-with-worker! % request))
(async/map vector)
(<!))]

(async/onto-chan! workers all-workers false)
(>! result* results)))

result*))
29 changes: 18 additions & 11 deletions test/cljs_workers/test.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,40 @@
[cljs-workers.worker :as worker])
(:require-macros [cljs.core.async.macros :refer [go]]))

;; Setup the browser path (handling both in one file)
(defn app
[]
(let [worker-pool
(let [;; you can create one worker or a pool (async channel of workers)
worker-pool
(main/create-pool 2 "worker.js")

;; a "do-with-pool" or "-worker" (see below) will return immediately and give you a result channel. So to print the result you have to handle the channel
print-result
(fn [result-chan]
(fn [message result-chan]
(go
(let [result (<! result-chan)]
(.debug js/console
message
(str (:state result))
result))))]
(clj->js result)))))]

(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10}}))
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:d]}))
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:c]}))
(print-result (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:c]} :transfer [:d]}))))
(print-result "Copy all simple values" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10}}))
(print-result "Copy the simple values and transfer the ArrayBuffer" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:d]}))
(print-result "Copy the simple values and transfer the ArrayBuffer, but transfer (browser thread) will fail cause the wrong value and the wrong type is marked to do so" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:d]} :transfer [:c]}))
(print-result "Copy the simple values and transfer the ArrayBuffer, but transfer mirroring (worker thread) will fail cause the wrong value and the wrong type is marked to do so" (main/do-with-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10 :d (js/ArrayBuffer. 10) :transfer [:c]} :transfer [:d]}))
(print-result "Copy values but do it with every worker of the pool" (main/do-for-pool! worker-pool {:handler :mirror, :arguments {:a "Hallo" :b "Welt" :c 10}}))))

;; Setup the worker path (handling both in one file)
(defn worker
[]
(worker/register
:mirror
(fn [arguments]
arguments))
:mirror
(fn [arguments]
arguments))

(worker/bootstrap))

(if (and (main/supported?) (main/main?))
;; Decide which path to setup
(if (main/main?)
(app)
(worker))

0 comments on commit 20f5dca

Please sign in to comment.