Skip to content
Ben Sless edited this page Oct 15, 2021 · 3 revisions

core.async channel flow

(defn print-drain [f]
  (m/reduce println f))

(defn print-call [t]
  (t println println))

(defn forever [task]
  (m/ap (m/? (m/?> (m/seed (repeat task))))))

(defn- take! [chan]
  (m/via
   m/blk
   (let [r (async/<!! chan)]
     (if (nil? r)
       (throw (ex-info "take cancelled, channel closed" {:cancelled? true}))
       r))))

(defn channel-flow [ch-recv]
  (forever (take! ch-recv)))

(comment
  (def ch (async/chan))
  (def cancel (print-call (print-drain (m/ap (m/?> (channel-flow ch))))))
  (async/put! ch "new val 1") ;; prints "nil new val 1"
  (async/put! ch "new val 2") ;; prints "nil new val 2"
  (async/close! ch)  ;; exception - take cancelled, channel closed
  ;; OR
  (cancel) ;; java.lang.InterruptedException
  )

Backpressured flow using rdv

(defn rdv-flow [rdv]
  (forever rdv))

(comment
  (def rdv (m/rdv))
  (def cancel (print-call (print-drain (rdv-flow rdv))))
  (m/? (rdv "val 1")) ;; prints nil val 1, blocks until flow is ready to accept new value
  (cancel))