From 6ad14534d03a84c45f47b9f2148691617ffc0d14 Mon Sep 17 00:00:00 2001 From: Daniel Compton Date: Thu, 7 Jul 2016 11:54:44 +1200 Subject: [PATCH] [#242] BREAKING: Switch from .cljx to .cljc (@danielcompton) Breaks support for Clojure < 1.7 --- src/taoensso/{sente.cljx => sente.cljc} | 1441 +++++++++-------- .../{interfaces.cljx => interfaces.cljc} | 0 src/taoensso/sente/packers/transit.cljc | 110 ++ src/taoensso/sente/packers/transit.cljx | 112 -- 4 files changed, 831 insertions(+), 832 deletions(-) rename src/taoensso/{sente.cljx => sente.cljc} (52%) rename src/taoensso/sente/{interfaces.cljx => interfaces.cljc} (100%) create mode 100644 src/taoensso/sente/packers/transit.cljc delete mode 100644 src/taoensso/sente/packers/transit.cljx diff --git a/src/taoensso/sente.cljx b/src/taoensso/sente.cljc similarity index 52% rename from src/taoensso/sente.cljx rename to src/taoensso/sente.cljc index 2000acf..1f1dac0 100644 --- a/src/taoensso/sente.cljx +++ b/src/taoensso/sente.cljc @@ -60,26 +60,26 @@ {:author "Peter Taoussanis (@ptaoussanis)"} - #+clj - (:require - [clojure.string :as str] - [clojure.core.async :as async :refer (! >!! put! chan go go-loop)] - [taoensso.encore :as enc :refer (swap-in! reset-in! swapped have have! have?)] - [taoensso.timbre :as timbre :refer (tracef debugf infof warnf errorf)] - [taoensso.sente.interfaces :as interfaces]) - - #+cljs - (:require - [clojure.string :as str] - [cljs.core.async :as async :refer (! put! chan)] - [taoensso.encore :as enc :refer (format swap-in! reset-in! swapped) - :refer-macros (have have! have?)] - [taoensso.timbre :as timbre :refer-macros (tracef debugf infof warnf errorf)] - [taoensso.sente.interfaces :as interfaces]) - - #+cljs - (:require-macros - [cljs.core.async.macros :as asyncm :refer (go go-loop)])) + #?(:clj + (:require + [clojure.string :as str] + [clojure.core.async :as async :refer (! >!! put! chan go go-loop)] + [taoensso.encore :as enc :refer (swap-in! reset-in! swapped have have! have?)] + [taoensso.timbre :as timbre :refer (tracef debugf infof warnf errorf)] + [taoensso.sente.interfaces :as interfaces])) + + #?(:cljs + (:require + [clojure.string :as str] + [cljs.core.async :as async :refer (! put! chan)] + [taoensso.encore :as enc :refer (format swap-in! reset-in! swapped) + :refer-macros (have have! have?)] + [taoensso.timbre :as timbre :refer-macros (tracef debugf infof warnf errorf)] + [taoensso.sente.interfaces :as interfaces])) + + #?(:cljs + (:require-macros + [cljs.core.async.macros :as asyncm :refer (go go-loop)]))) (if (vector? taoensso.encore/encore-version) (enc/assert-min-encore-version [2 53 1]) @@ -161,8 +161,8 @@ (put! ch-recv ev-msg*)))) ;;; Note that cb replys need _not_ be `event` form! -#+cljs (defn cb-error? [cb-reply-clj] (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply-clj)) -#+cljs (defn cb-success? [cb-reply-clj] (not (cb-error? cb-reply-clj))) +#?(:cljs (defn cb-error? [cb-reply-clj] (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply-clj))) +#?(:cljs (defn cb-success? [cb-reply-clj] (not (cb-error? cb-reply-clj)))) ;;;; Packing ;; * Client<->server payloads are arbitrary Clojure vals (cb replies or events). @@ -177,7 +177,7 @@ clj (try (interfaces/unpack packer pstr) - (catch #+clj Throwable #+cljs :default t + (catch #?(:clj Throwable :cljs :default) t (debugf "Bad package: %s (%s)" pstr t) [:chsk/bad-package pstr])) @@ -730,321 +730,322 @@ ;;;; Client API -#+cljs (def ajax-lite "Alias of `taoensso.encore/ajax-lite`" enc/ajax-lite) -#+cljs -(defprotocol IChSocket - (-chsk-connect! [chsk]) - (-chsk-disconnect! [chsk reconn?]) - (-chsk-reconnect! [chsk]) - (-chsk-send! [chsk ev opts])) - -#+cljs (defn chsk-connect! [chsk] (-chsk-connect! chsk)) -#+cljs (defn chsk-destroy! "Deprecated" [chsk] (-chsk-disconnect! chsk false)) -#+cljs (defn chsk-disconnect! [chsk] (-chsk-disconnect! chsk false)) -#+cljs -(defn chsk-reconnect! "Useful for reauthenticating after login/logout, etc." - [chsk] (-chsk-reconnect! chsk)) - -#+cljs -(defn chsk-send! - "Sends `[ev-id ev-?data :as event]`, returns true on apparent success." - ([chsk ev] (chsk-send! chsk ev {})) - ([chsk ev ?timeout-ms ?cb] (chsk-send! chsk ev {:timeout-ms ?timeout-ms - :cb ?cb})) - ([chsk ev opts] - (tracef "Chsk send: (%s) %s" (assoc opts :cb (boolean (:cb opts))) ev) - (-chsk-send! chsk ev opts))) - -#+cljs -(defn- chsk-send->closed! [?cb-fn] - (warnf "Chsk send against closed chsk.") - (when ?cb-fn (?cb-fn :chsk/closed)) - false) - -#+cljs -(defn- assert-send-args [x ?timeout-ms ?cb] - (assert-event x) - (assert (or (and (nil? ?timeout-ms) (nil? ?cb)) - (and (enc/nneg-int? ?timeout-ms))) - (str "cb requires a timeout; timeout-ms should be a +ive integer: " ?timeout-ms)) - (assert (or (nil? ?cb) (ifn? ?cb) (enc/chan? ?cb)) - (str "cb should be nil, an ifn, or a channel: " (type ?cb)))) - -#+cljs -(defn- pull-unused-cb-fn! [cbs-waiting_ ?cb-uuid] - (when-let [cb-uuid ?cb-uuid] - (swap-in! cbs-waiting_ [cb-uuid] - (fn [?f] (swapped :swap/dissoc ?f))))) - -#+cljs -(defn- merge>chsk-state! [{:keys [chs state_] :as chsk} merge-state] - (let [[old-state new-state] - (swap-in! state_ [] - (fn [old-state] - (let [new-state (merge old-state merge-state) - - ;; Is this a reasonable way of helping client distinguish - ;; cause of an auto reconnect? Didn't give it much - ;; thought... - requested-reconnect? - (and (:requested-reconnect-pending? old-state) - (:open? new-state) - (not (:open? old-state))) - - new-state - (if (:first-open? old-state) - (assoc new-state :first-open? false) - new-state) - - new-state - (if requested-reconnect? - (-> new-state - (dissoc :requested-reconnect-pending?) - (assoc :requested-reconnect? true)) - (dissoc new-state :requested-reconnect?))] - - (swapped new-state [old-state new-state]))))] - - (when (not= old-state new-state) - (let [output [old-state new-state]] - ;; (debugf "Chsk state change: %s" output) - (put! (:state chs) [:chsk/state output]) - output)))) - -#+cljs -(defn- cb-chan-as-fn - "Experimental, undocumented. Allows a core.async channel to be provided - instead of a cb-fn. The channel will receive values of form - [.cb ]." - [?cb ev] - (if (or (nil? ?cb) (ifn? ?cb)) - ?cb - (do - (have? enc/chan? ?cb) - (assert-event ev) - (let [[ev-id _] ev - cb-ch ?cb] - (fn [reply] - (put! cb-ch - [(keyword (str (enc/fq-name ev-id) ".cb")) - reply])))))) - -#+cljs -(defn- receive-buffered-evs! [chs clj] - (tracef "receive-buffered-evs!: %s" clj) - (let [buffered-evs (have vector? clj)] - (doseq [ev buffered-evs] - (assert-event ev) - ;; Should never receive :chsk/* events from server here: - (let [[id] ev] (assert (not= (namespace id) "chsk"))) - (put! (: e/o #{:ws :ajax}, etc. - :open? true - :ever-opened? true - :uid ?uid - :csrf-token ?csrf-token - :handshake-data ?handshake-data - :first-open? first-handshake?} - - handshake-ev - [:chsk/handshake - [?uid ?csrf-token ?handshake-data first-handshake?]]] - - (assert-event handshake-ev) - (when (str/blank? ?csrf-token) - (warnf "SECURITY WARNING: no CSRF token available for use by Sente")) - - (merge>chsk-state! chsk new-state) - (put! (:internal chs) handshake-ev) - - :handled)) - -#+cljs -(defrecord ChWebSocket - ;; WebSocket-only IChSocket implementation - ;; Handles (re)connections, cbs, etc. - - [client-id chs params packer url - state_ ; {:type _ :open? _ :uid _ :csrf-token _ ...} - active-retry-id_ retry-count_ ever-opened?_ - backoff-ms-fn ; (fn [nattempt]) -> msecs - cbs-waiting_ ; { ...} - socket_] - - IChSocket - (-chsk-disconnect! [chsk reconn?] - (reset! active-retry-id_ "_disable-auto-retry") - (if reconn? - (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) - (merge>chsk-state! chsk {:open? false})) - (when-let [s @socket_] (.close s 1000 "CLOSE_NORMAL"))) - - (-chsk-reconnect! [chsk] - (-chsk-disconnect! chsk :reconn) - (-chsk-connect! chsk)) - - (-chsk-send! [chsk ev opts] - (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts - _ (assert-send-args ev ?timeout-ms ?cb) - ?cb-fn (cb-chan-as-fn ?cb ev)] - (if-not (:open? @state_) ; Definitely closed - (chsk-send->closed! ?cb-fn) - - ;; TODO Buffer before sending (but honor `:flush?`) - (let [?cb-uuid (when ?cb-fn (enc/uuid-str 6)) - ppstr (pack packer (meta ev) ev ?cb-uuid)] - - (when-let [cb-uuid ?cb-uuid] - (reset-in! cbs-waiting_ [cb-uuid] (have ?cb-fn)) - (when-let [timeout-ms ?timeout-ms] - (go (clj ws-ev) (catch :default _ ws-ev))) - - (let [;; Note that `ws-ev` doesn't seem to contain - ;; much useful info? Ref. http://goo.gl/bBJq0p - last-ws-error {:uuid (enc/uuid-str) - :ev ws-ev}] - - (merge>chsk-state! chsk {:last-ws-error last-ws-error})))) - - (aset "onmessage" ; Nb receives both push & cb evs! - (fn [ws-ev] - (let [ppstr (enc/oget ws-ev "data") - [clj ?cb-uuid] (unpack packer ppstr)] - - ;; Nb may or may NOT satisfy `event?` since we - ;; also receive cb replies here! This is actually - ;; why we prefix our pstrs to indicate whether - ;; they're wrapped or not. - ;; (assert-event clj) ;; NO! - - (or - (when (handshake? clj) - (receive-handshake! :ws chsk clj) - (reset! retry-count_ 0)) - - (when (= clj :chsk/ws-ping) - (when @debug-mode?_ - (receive-buffered-evs! chs [[:debug/ws-ping]])) - :noop) - - (if-let [cb-uuid ?cb-uuid] - (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ - cb-uuid)] - (cb-fn clj) - (warnf "Cb reply w/o local cb-fn: %s" clj)) - (let [buffered-evs clj] - (receive-buffered-evs! chs buffered-evs))))))) - - #_(aset "onopen" - (fn [_ws-ev] - ;; NO, better for server to send a handshake: - ;; (merge>chsk-state! chsk {:open? true}) - )) - - ;; Fires repeatedly (on each connection attempt) while - ;; server is down: - (aset "onclose" - (fn [ws-ev] - (let [clean? (enc/oget ws-ev "wasClean") - code (enc/oget ws-ev "code") - reason (enc/oget ws-ev "reason") - last-ws-close - {:uuid (enc/uuid-str) - :ev ws-ev - :clean? clean? - :code code - :reason reason}] - - ;; Firefox calls "onclose" while unloading, - ;; Ref. http://goo.gl/G5BYbn: - (if clean? - (do - (debugf "Clean WebSocket close, will not attempt reconnect") - (merge>chsk-state! chsk {:last-ws-close last-ws-close})) - (do - (merge>chsk-state! chsk {:last-ws-close last-ws-close :open? false}) - (retry-fn)))))))))))] - - (reset! active-retry-id_ retry-id) - (reset! retry-count_ 0) - (connect-fn) - chsk)))) - -#+cljs -(defn- new-ChWebSocket [opts] - (map->ChWebSocket - (merge - {:state_ (atom {:type :ws :open? false}) - :active-retry-id_ (atom "_pending") - :retry-count_ (atom 0) - :ever-opened?_ (atom false) - :cbs-waiting_ (atom {}) - :socket_ (atom nil)} - opts))) +#?(:cljs (def ajax-lite "Alias of `taoensso.encore/ajax-lite`" enc/ajax-lite)) +#?(:cljs + (defprotocol IChSocket + (-chsk-connect! [chsk]) + (-chsk-disconnect! [chsk reconn?]) + (-chsk-reconnect! [chsk]) + (-chsk-send! [chsk ev opts]))) + +#?(:cljs + (do + (defn chsk-connect! [chsk] (-chsk-connect! chsk)) + (defn chsk-destroy! "Deprecated" [chsk] (-chsk-disconnect! chsk false)) + (defn chsk-disconnect! [chsk] (-chsk-disconnect! chsk false)) + (defn chsk-reconnect! "Useful for reauthenticating after login/logout, etc." + [chsk] (-chsk-reconnect! chsk)))) + +#?(:cljs + (defn chsk-send! + "Sends `[ev-id ev-?data :as event]`, returns true on apparent success." + ([chsk ev] (chsk-send! chsk ev {})) + ([chsk ev ?timeout-ms ?cb] (chsk-send! chsk ev {:timeout-ms ?timeout-ms + :cb ?cb})) + ([chsk ev opts] + (tracef "Chsk send: (%s) %s" (assoc opts :cb (boolean (:cb opts))) ev) + (-chsk-send! chsk ev opts)))) + +#?(:cljs + (defn- chsk-send->closed! [?cb-fn] + (warnf "Chsk send against closed chsk.") + (when ?cb-fn (?cb-fn :chsk/closed)) + false)) + +#?(:cljs + (defn- assert-send-args [x ?timeout-ms ?cb] + (assert-event x) + (assert (or (and (nil? ?timeout-ms) (nil? ?cb)) + (and (enc/nneg-int? ?timeout-ms))) + (str "cb requires a timeout; timeout-ms should be a +ive integer: " ?timeout-ms)) + (assert (or (nil? ?cb) (ifn? ?cb) (enc/chan? ?cb)) + (str "cb should be nil, an ifn, or a channel: " (type ?cb))))) + +#?(:cljs + (defn- pull-unused-cb-fn! [cbs-waiting_ ?cb-uuid] + (when-let [cb-uuid ?cb-uuid] + (swap-in! cbs-waiting_ [cb-uuid] + (fn [?f] (swapped :swap/dissoc ?f)))))) + +#?(:cljs + (defn- merge>chsk-state! [{:keys [chs state_] :as chsk} merge-state] + (let [[old-state new-state] + (swap-in! state_ [] + (fn [old-state] + (let [new-state (merge old-state merge-state) + + ;; Is this a reasonable way of helping client distinguish + ;; cause of an auto reconnect? Didn't give it much + ;; thought... + requested-reconnect? + (and (:requested-reconnect-pending? old-state) + (:open? new-state) + (not (:open? old-state))) + + new-state + (if (:first-open? old-state) + (assoc new-state :first-open? false) + new-state) + + new-state + (if requested-reconnect? + (-> new-state + (dissoc :requested-reconnect-pending?) + (assoc :requested-reconnect? true)) + (dissoc new-state :requested-reconnect?))] + + (swapped new-state [old-state new-state]))))] + + (when (not= old-state new-state) + (let [output [old-state new-state]] + ;; (debugf "Chsk state change: %s" output) + (put! (:state chs) [:chsk/state output]) + output))))) + +#?(:cljs + (defn- cb-chan-as-fn + "Experimental, undocumented. Allows a core.async channel to be provided + instead of a cb-fn. The channel will receive values of form + [.cb ]." + [?cb ev] + (if (or (nil? ?cb) (ifn? ?cb)) + ?cb + (do + (have? enc/chan? ?cb) + (assert-event ev) + (let [[ev-id _] ev + cb-ch ?cb] + (fn [reply] + (put! cb-ch + [(keyword (str (enc/fq-name ev-id) ".cb")) + reply]))))))) + +#?(:cljs + (defn- receive-buffered-evs! [chs clj] + (tracef "receive-buffered-evs!: %s" clj) + (let [buffered-evs (have vector? clj)] + (doseq [ev buffered-evs] + (assert-event ev) + ;; Should never receive :chsk/* events from server here: + (let [[id] ev] (assert (not= (namespace id) "chsk"))) + (put! (: e/o #{:ws :ajax}, etc. + :open? true + :ever-opened? true + :uid ?uid + :csrf-token ?csrf-token + :handshake-data ?handshake-data + :first-open? first-handshake?} + + handshake-ev + [:chsk/handshake + [?uid ?csrf-token ?handshake-data first-handshake?]]] + + (assert-event handshake-ev) + (when (str/blank? ?csrf-token) + (warnf "SECURITY WARNING: no CSRF token available for use by Sente")) + + (merge>chsk-state! chsk new-state) + (put! (:internal chs) handshake-ev) + + :handled))) + +#?(:cljs + (defrecord ChWebSocket + ;; WebSocket-only IChSocket implementation + ;; Handles (re)connections, cbs, etc. + + [client-id chs params packer url + state_ ; {:type _ :open? _ :uid _ :csrf-token _ ...} + active-retry-id_ retry-count_ ever-opened?_ + backoff-ms-fn ; (fn [nattempt]) -> msecs + cbs-waiting_ ; { ...} + socket_] + + IChSocket + (-chsk-disconnect! [chsk reconn?] + (reset! active-retry-id_ "_disable-auto-retry") + (if reconn? + (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (merge>chsk-state! chsk {:open? false})) + (when-let [s @socket_] (.close s 1000 "CLOSE_NORMAL"))) + + (-chsk-reconnect! [chsk] + (-chsk-disconnect! chsk :reconn) + (-chsk-connect! chsk)) + + (-chsk-send! [chsk ev opts] + (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts + _ (assert-send-args ev ?timeout-ms ?cb) + ?cb-fn (cb-chan-as-fn ?cb ev)] + (if-not (:open? @state_) ; Definitely closed + (chsk-send->closed! ?cb-fn) + + ;; TODO Buffer before sending (but honor `:flush?`) + (let [?cb-uuid (when ?cb-fn (enc/uuid-str 6)) + ppstr (pack packer (meta ev) ev ?cb-uuid)] + + (when-let [cb-uuid ?cb-uuid] + (reset-in! cbs-waiting_ [cb-uuid] (have ?cb-fn)) + (when-let [timeout-ms ?timeout-ms] + (go (clj ws-ev) (catch :default _ ws-ev))) + + (let [;; Note that `ws-ev` doesn't seem to contain + ;; much useful info? Ref. http://goo.gl/bBJq0p + last-ws-error {:uuid (enc/uuid-str) + :ev ws-ev}] + + (merge>chsk-state! chsk {:last-ws-error last-ws-error})))) + + (aset "onmessage" ; Nb receives both push & cb evs! + (fn [ws-ev] + (let [ppstr (enc/oget ws-ev "data") + [clj ?cb-uuid] (unpack packer ppstr)] + + ;; Nb may or may NOT satisfy `event?` since we + ;; also receive cb replies here! This is actually + ;; why we prefix our pstrs to indicate whether + ;; they're wrapped or not. + ;; (assert-event clj) ;; NO! + + (or + (when (handshake? clj) + (receive-handshake! :ws chsk clj) + (reset! retry-count_ 0)) + + (when (= clj :chsk/ws-ping) + (when @debug-mode?_ + (receive-buffered-evs! chs [[:debug/ws-ping]])) + :noop) + + (if-let [cb-uuid ?cb-uuid] + (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ + cb-uuid)] + (cb-fn clj) + (warnf "Cb reply w/o local cb-fn: %s" clj)) + (let [buffered-evs clj] + (receive-buffered-evs! chs buffered-evs))))))) + + #_(aset "onopen" + (fn [_ws-ev] + ;; NO, better for server to send a handshake: + ;; (merge>chsk-state! chsk {:open? true}) + )) + + ;; Fires repeatedly (on each connection attempt) while + ;; server is down: + (aset "onclose" + (fn [ws-ev] + (let [clean? (enc/oget ws-ev "wasClean") + code (enc/oget ws-ev "code") + reason (enc/oget ws-ev "reason") + last-ws-close + {:uuid (enc/uuid-str) + :ev ws-ev + :clean? clean? + :code code + :reason reason}] + + ;; Firefox calls "onclose" while unloading, + ;; Ref. http://goo.gl/G5BYbn: + (if clean? + (do + (debugf "Clean WebSocket close, will not attempt reconnect") + (merge>chsk-state! chsk {:last-ws-close last-ws-close})) + (do + (merge>chsk-state! chsk {:last-ws-close last-ws-close :open? false}) + (retry-fn)))))))))))] + + (reset! active-retry-id_ retry-id) + (reset! retry-count_ 0) + (connect-fn) + chsk))))) + +#?(:cljs + (defn- new-ChWebSocket [opts] + (map->ChWebSocket + (merge + {:state_ (atom {:type :ws :open? false}) + :active-retry-id_ (atom "_pending") + :retry-count_ (atom 0) + :ever-opened?_ (atom false) + :cbs-waiting_ (atom {}) + :socket_ (atom nil)} + opts)))) (def ^:private default-client-side-ajax-timeout-ms "We must set *some* client-side timeout otherwise an unpredictable (and @@ -1052,359 +1053,359 @@ :lp-timeout-ms." (enc/ms :secs 60)) -#+cljs -(defrecord ChAjaxSocket - ;; Ajax-only IChSocket implementation - ;; Handles (re)polling, etc. - - [client-id chs params packer url state_ - active-retry-id_ ever-opened?_ - backoff-ms-fn - ajax-opts curr-xhr_] - - IChSocket - (-chsk-disconnect! [chsk reconn?] - (reset! active-retry-id_ "_disable-auto-retry") - (if reconn? - (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) - (merge>chsk-state! chsk {:open? false})) - (when-let [x @curr-xhr_] (.abort x))) - - (-chsk-reconnect! [chsk] - (-chsk-disconnect! chsk :reconn) - (-chsk-connect! chsk)) - - (-chsk-send! [chsk ev opts] - (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts - _ (assert-send-args ev ?timeout-ms ?cb) - ?cb-fn (cb-chan-as-fn ?cb ev)] - (if-not (:open? @state_) ; Definitely closed - (chsk-send->closed! ?cb-fn) - - ;; TODO Buffer before sending (but honor `:flush?`) - (let [csrf-token (:csrf-token @state_)] - (ajax-lite url - (merge ajax-opts - {:method :post - :timeout-ms (or ?timeout-ms (:timeout-ms ajax-opts) - default-client-side-ajax-timeout-ms) - :resp-type :text ; We'll do our own pstr decoding - :headers - (merge (:headers ajax-opts) ; 1st (don't clobber impl.): - {:X-CSRF-Token csrf-token}) - - :params - (let [ppstr (pack packer (meta ev) ev (when ?cb-fn :ajax-cb))] - (merge params ; 1st (don't clobber impl.): - {:udt (enc/now-udt) ; Force uncached resp - - ;; A duplicate of X-CSRF-Token for user's convenience and - ;; for back compatibility with earlier CSRF docs: - :csrf-token csrf-token - - ;; Just for user's convenience here. non-lp-POSTs don't - ;; actually need a client-id for Sente's own implementation: - :client-id client-id - - :ppstr ppstr}))}) - - (fn ajax-cb [{:keys [?error ?content]}] - (if ?error - (if (= ?error :timeout) - (when ?cb-fn (?cb-fn :chsk/timeout)) - (do (merge>chsk-state! chsk {:open? false}) - (when ?cb-fn (?cb-fn :chsk/error)))) - - (let [content ?content - resp-ppstr content - [resp-clj _] (unpack packer resp-ppstr)] - (if ?cb-fn - (?cb-fn resp-clj) - (when (not= resp-clj :chsk/dummy-cb-200) - (warnf "Cb reply w/o local cb-fn: %s" resp-clj))) - (merge>chsk-state! chsk {:open? true}))))) - - :apparent-success)))) - - (-chsk-connect! [chsk] - (let [retry-id (enc/uuid-str) - poll-fn ; async-poll-for-update-fn - (fn poll-fn [retry-count] - (tracef "async-poll-for-update!") - (let [retry-fn - (fn [] - (when (= @active-retry-id_ retry-id) - (let [retry-count* (inc retry-count) - backoff-ms (backoff-ms-fn retry-count*)] - (warnf "Chsk is closed: will try reconnect (%s)" retry-count*) - (.setTimeout js/window (fn [] (poll-fn retry-count*)) backoff-ms))))] - - (reset! curr-xhr_ - (ajax-lite url - (merge ajax-opts - {:method :get ; :timeout-ms timeout-ms - :timeout-ms (or (:timeout-ms ajax-opts) - default-client-side-ajax-timeout-ms) - :resp-type :text ; Prefer to do our own pstr reading - :params - (merge - - ;; Note that user params here are actually POST params for - ;; convenience. Contrast: WebSocket params sent as query - ;; params since there's no other choice there. - params ; 1st (don't clobber impl.): - - {:udt (enc/now-udt) ; Force uncached resp - :client-id client-id} - - ;; A truthy :handshake? param will prompt server to - ;; reply immediately with a handshake response, - ;; letting us confirm that our client<->server comms - ;; are working: - (when-not (:open? @state_) {:handshake? true}))}) - - (fn ajax-cb [{:keys [?error ?content]}] - (if ?error - (cond - (= ?error :timeout) (poll-fn 0) - ;; (= ?error :abort) ; Abort => intentional, not an error - :else - (do (merge>chsk-state! chsk {:open? false}) - (retry-fn))) - - ;; The Ajax long-poller is used only for events, never cbs: - (let [content ?content - ppstr content - [clj] (unpack packer ppstr) - handshake? (handshake? clj)] - - (when handshake? (receive-handshake! :ajax chsk clj)) - - (merge>chsk-state! chsk {:open? true}) - (poll-fn 0) ; Repoll asap - - (when-not handshake? - (or - (when (= clj :chsk/timeout) - (when @debug-mode?_ - (receive-buffered-evs! chs [[:debug/timeout]])) - :noop) - - (let [buffered-evs clj] ; An application reply - (receive-buffered-evs! chs buffered-evs)))))))))))] - - (reset! active-retry-id_ retry-id) - (poll-fn 0) - chsk))) - -#+cljs -(defn- new-ChAjaxSocket [opts] - (map->ChAjaxSocket - (merge - {:state_ (atom {:type :ajax :open? false}) - :active-retry-id_ (atom "_pending") - :ever-opened?_ (atom false) - :curr-xhr_ (atom nil)} - opts))) - -#+cljs -(defrecord ChAutoSocket - ;; Dynamic WebSocket/Ajax IChSocket implementation - ;; Wraps a swappable ChWebSocket/ChAjaxSocket - - [ws-chsk-opts ajax-chsk-opts state_ - impl_ ; ChWebSocket or ChAjaxSocket - ] - - IChSocket - (-chsk-disconnect! [chsk reconn?] - (when-let [impl @impl_] - (-chsk-disconnect! impl reconn?))) - - ;; Possibly reset impl type: - (-chsk-reconnect! [chsk] - (when-let [impl @impl_] - (-chsk-disconnect! impl :reconn) - (-chsk-connect! chsk))) - - (-chsk-send! [chsk ev opts] - (if-let [impl @impl_] - (-chsk-send! impl ev opts) - (let [{?cb :cb} opts - ?cb-fn (cb-chan-as-fn ?cb ev)] - (chsk-send->closed! ?cb-fn)))) - - (-chsk-connect! [chsk] - ;; Starting with a simple downgrade-only strategy here as a proof of concept - ;; TODO Later consider smarter downgrade or downgrade+upgrade strategies? - (let [ajax-chsk-opts (assoc ajax-chsk-opts :state_ state_) - ws-chsk-opts (assoc ws-chsk-opts :state_ state_) - - ajax-conn! - (fn [] - ;; Remove :auto->:ajax downgrade watch - (remove-watch state_ :chsk/auto-ajax-downgrade) - (-chsk-connect! (new-ChAjaxSocket ajax-chsk-opts))) - - ws-conn! - (fn [] - ;; Configure :auto->:ajax downgrade watch - (let [downgraded?_ (atom false)] - (add-watch state_ :chsk/auto-ajax-downgrade - (fn [_ _ old-state new-state] - (when-let [impl @impl_] - (when-let [ever-opened?_ (:ever-opened?_ impl)] - (when-not @ever-opened?_ - (when (:last-error new-state) - (when (compare-and-set! downgraded?_ false true) - (warnf "Permanently downgrading :auto chsk -> :ajax") - (-chsk-disconnect! impl false) - (reset! impl_ (ajax-conn!)))))))))) - - (-chsk-connect! (new-ChWebSocket ws-chsk-opts)))] - - (reset! impl_ (or (ws-conn!) (ajax-conn!))) - chsk))) - -#+cljs -(defn- new-ChAutoSocket [opts] - (map->ChAutoSocket - (merge - {:state_ (atom {:type :auto :open? false}) - :impl_ (atom nil)} - opts))) - -#+cljs -(defn- get-chsk-url [protocol host path type] - (let [protocol (case type - :ajax protocol - :ws (if (= protocol "https:") "wss:" "ws:"))] - (str protocol "//" (enc/path host path)))) - -#+cljs -(defn make-channel-socket-client! - "Returns nil on failure, or a map with keys: - :ch-recv ; core.async channel to receive `event-msg`s (internal or from - ; clients). May `put!` (inject) arbitrary `event`s to this channel. - :send-fn ; (fn [event & [?timeout-ms ?cb-fn]]) for client>server send. - :state ; Watchable, read-only (atom {:type _ :open? _ :uid _ :csrf-token _}). - :chsk ; IChSocket implementer. You can usu. ignore this. - - Common options: - :type ; e/o #{:auto :ws :ajax}. You'll usually want the default (:auto). - :host ; Server host (defaults to current page's host). - :params ; Map of any params to incl. in chsk Ring requests (handy - ; for application-level auth, etc.). - :packer ; :edn (default), or an IPacker implementation. - :ajax-opts ; Base opts map provided to `taoensso.encore/ajax-lite`. - :wrap-recv-evs? ; Should events from server be wrapped in [:chsk/recv _]?" - - [path & - [{:keys [type host params recv-buf-or-n packer - client-id ajax-opts wrap-recv-evs? backoff-ms-fn] - :as opts - :or {type :auto - recv-buf-or-n (async/sliding-buffer 2048) ; Mostly for buffered-evs - packer :edn - client-id (or (:client-uuid opts) ; Backwards compatibility - (enc/uuid-str)) - - ;; TODO Deprecated. Default to false later, then eventually just - ;; drop this option altogether? - here now for back compatibility: - wrap-recv-evs? true - - backoff-ms-fn enc/exp-backoff}} - - _deprecated-more-opts]] - - (have? [:in #{:ajax :ws :auto}] type) - (have? enc/nblank-str? client-id) - - (when (not (nil? _deprecated-more-opts)) (warnf "`make-channel-socket-client!` fn signature CHANGED with Sente v0.10.0.")) - (when (contains? opts :lp-timeout) (warnf ":lp-timeout opt has CHANGED; please use :lp-timout-ms.")) - - (let [packer (coerce-packer packer) - - win-loc (enc/get-win-loc) - win-protocol (:protocol win-loc) - host (or host (:host win-loc)) - path (or path (:pathname win-loc)) - - [ws-url ajax-url] - (if-let [f (:chsk-url-fn opts)] ; Deprecated - [(f path win-loc :ws) (f path win-loc :ajax)] - [(get-chsk-url win-protocol host path :ws) - (get-chsk-url win-protocol host path :ajax)]) - - private-chs - {:internal (chan (async/sliding-buffer 128)) - :state (chan (async/sliding-buffer 10)) - ;; Nb must be >= max expected buffered-evs size: - :ev-msg [ev] - (let [[ev-id ev-?data :as ev] (as-event ev)] - {:ch-recv ev-ch - :send-fn send-fn - :state (:state_ chsk) - :event ev - :id ev-id - :?data ev-?data})) - ev-ch)] - - {:chsk chsk - :ch-recv ev-msg-ch ; Public `ev`s->`ev-msg`s ch - :send-fn send-fn - :state (:state_ chsk)}) - - (warnf "Failed to create channel socket")))) +#?(:cljs + (defrecord ChAjaxSocket + ;; Ajax-only IChSocket implementation + ;; Handles (re)polling, etc. + + [client-id chs params packer url state_ + active-retry-id_ ever-opened?_ + backoff-ms-fn + ajax-opts curr-xhr_] + + IChSocket + (-chsk-disconnect! [chsk reconn?] + (reset! active-retry-id_ "_disable-auto-retry") + (if reconn? + (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (merge>chsk-state! chsk {:open? false})) + (when-let [x @curr-xhr_] (.abort x))) + + (-chsk-reconnect! [chsk] + (-chsk-disconnect! chsk :reconn) + (-chsk-connect! chsk)) + + (-chsk-send! [chsk ev opts] + (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts + _ (assert-send-args ev ?timeout-ms ?cb) + ?cb-fn (cb-chan-as-fn ?cb ev)] + (if-not (:open? @state_) ; Definitely closed + (chsk-send->closed! ?cb-fn) + + ;; TODO Buffer before sending (but honor `:flush?`) + (let [csrf-token (:csrf-token @state_)] + (ajax-lite url + (merge ajax-opts + {:method :post + :timeout-ms (or ?timeout-ms (:timeout-ms ajax-opts) + default-client-side-ajax-timeout-ms) + :resp-type :text ; We'll do our own pstr decoding + :headers + (merge (:headers ajax-opts) ; 1st (don't clobber impl.): + {:X-CSRF-Token csrf-token}) + + :params + (let [ppstr (pack packer (meta ev) ev (when ?cb-fn :ajax-cb))] + (merge params ; 1st (don't clobber impl.): + {:udt (enc/now-udt) ; Force uncached resp + + ;; A duplicate of X-CSRF-Token for user's convenience and + ;; for back compatibility with earlier CSRF docs: + :csrf-token csrf-token + + ;; Just for user's convenience here. non-lp-POSTs don't + ;; actually need a client-id for Sente's own implementation: + :client-id client-id + + :ppstr ppstr}))}) + + (fn ajax-cb [{:keys [?error ?content]}] + (if ?error + (if (= ?error :timeout) + (when ?cb-fn (?cb-fn :chsk/timeout)) + (do (merge>chsk-state! chsk {:open? false}) + (when ?cb-fn (?cb-fn :chsk/error)))) + + (let [content ?content + resp-ppstr content + [resp-clj _] (unpack packer resp-ppstr)] + (if ?cb-fn + (?cb-fn resp-clj) + (when (not= resp-clj :chsk/dummy-cb-200) + (warnf "Cb reply w/o local cb-fn: %s" resp-clj))) + (merge>chsk-state! chsk {:open? true}))))) + + :apparent-success)))) + + (-chsk-connect! [chsk] + (let [retry-id (enc/uuid-str) + poll-fn ; async-poll-for-update-fn + (fn poll-fn [retry-count] + (tracef "async-poll-for-update!") + (let [retry-fn + (fn [] + (when (= @active-retry-id_ retry-id) + (let [retry-count* (inc retry-count) + backoff-ms (backoff-ms-fn retry-count*)] + (warnf "Chsk is closed: will try reconnect (%s)" retry-count*) + (.setTimeout js/window (fn [] (poll-fn retry-count*)) backoff-ms))))] + + (reset! curr-xhr_ + (ajax-lite url + (merge ajax-opts + {:method :get ; :timeout-ms timeout-ms + :timeout-ms (or (:timeout-ms ajax-opts) + default-client-side-ajax-timeout-ms) + :resp-type :text ; Prefer to do our own pstr reading + :params + (merge + + ;; Note that user params here are actually POST params for + ;; convenience. Contrast: WebSocket params sent as query + ;; params since there's no other choice there. + params ; 1st (don't clobber impl.): + + {:udt (enc/now-udt) ; Force uncached resp + :client-id client-id} + + ;; A truthy :handshake? param will prompt server to + ;; reply immediately with a handshake response, + ;; letting us confirm that our client<->server comms + ;; are working: + (when-not (:open? @state_) {:handshake? true}))}) + + (fn ajax-cb [{:keys [?error ?content]}] + (if ?error + (cond + (= ?error :timeout) (poll-fn 0) + ;; (= ?error :abort) ; Abort => intentional, not an error + :else + (do (merge>chsk-state! chsk {:open? false}) + (retry-fn))) + + ;; The Ajax long-poller is used only for events, never cbs: + (let [content ?content + ppstr content + [clj] (unpack packer ppstr) + handshake? (handshake? clj)] + + (when handshake? (receive-handshake! :ajax chsk clj)) + + (merge>chsk-state! chsk {:open? true}) + (poll-fn 0) ; Repoll asap + + (when-not handshake? + (or + (when (= clj :chsk/timeout) + (when @debug-mode?_ + (receive-buffered-evs! chs [[:debug/timeout]])) + :noop) + + (let [buffered-evs clj] ; An application reply + (receive-buffered-evs! chs buffered-evs)))))))))))] + + (reset! active-retry-id_ retry-id) + (poll-fn 0) + chsk)))) + +#?(:cljs + (defn- new-ChAjaxSocket [opts] + (map->ChAjaxSocket + (merge + {:state_ (atom {:type :ajax :open? false}) + :active-retry-id_ (atom "_pending") + :ever-opened?_ (atom false) + :curr-xhr_ (atom nil)} + opts)))) + +#?(:cljs + (defrecord ChAutoSocket + ;; Dynamic WebSocket/Ajax IChSocket implementation + ;; Wraps a swappable ChWebSocket/ChAjaxSocket + + [ws-chsk-opts ajax-chsk-opts state_ + impl_ ; ChWebSocket or ChAjaxSocket + ] + + IChSocket + (-chsk-disconnect! [chsk reconn?] + (when-let [impl @impl_] + (-chsk-disconnect! impl reconn?))) + + ;; Possibly reset impl type: + (-chsk-reconnect! [chsk] + (when-let [impl @impl_] + (-chsk-disconnect! impl :reconn) + (-chsk-connect! chsk))) + + (-chsk-send! [chsk ev opts] + (if-let [impl @impl_] + (-chsk-send! impl ev opts) + (let [{?cb :cb} opts + ?cb-fn (cb-chan-as-fn ?cb ev)] + (chsk-send->closed! ?cb-fn)))) + + (-chsk-connect! [chsk] + ;; Starting with a simple downgrade-only strategy here as a proof of concept + ;; TODO Later consider smarter downgrade or downgrade+upgrade strategies? + (let [ajax-chsk-opts (assoc ajax-chsk-opts :state_ state_) + ws-chsk-opts (assoc ws-chsk-opts :state_ state_) + + ajax-conn! + (fn [] + ;; Remove :auto->:ajax downgrade watch + (remove-watch state_ :chsk/auto-ajax-downgrade) + (-chsk-connect! (new-ChAjaxSocket ajax-chsk-opts))) + + ws-conn! + (fn [] + ;; Configure :auto->:ajax downgrade watch + (let [downgraded?_ (atom false)] + (add-watch state_ :chsk/auto-ajax-downgrade + (fn [_ _ old-state new-state] + (when-let [impl @impl_] + (when-let [ever-opened?_ (:ever-opened?_ impl)] + (when-not @ever-opened?_ + (when (:last-error new-state) + (when (compare-and-set! downgraded?_ false true) + (warnf "Permanently downgrading :auto chsk -> :ajax") + (-chsk-disconnect! impl false) + (reset! impl_ (ajax-conn!)))))))))) + + (-chsk-connect! (new-ChWebSocket ws-chsk-opts)))] + + (reset! impl_ (or (ws-conn!) (ajax-conn!))) + chsk)))) + +#?(:cljs + (defn- new-ChAutoSocket [opts] + (map->ChAutoSocket + (merge + {:state_ (atom {:type :auto :open? false}) + :impl_ (atom nil)} + opts)))) + +#?(:cljs + (defn- get-chsk-url [protocol host path type] + (let [protocol (case type + :ajax protocol + :ws (if (= protocol "https:") "wss:" "ws:"))] + (str protocol "//" (enc/path host path))))) + +#?(:cljs + (defn make-channel-socket-client! + "Returns nil on failure, or a map with keys: + :ch-recv ; core.async channel to receive `event-msg`s (internal or from + ; clients). May `put!` (inject) arbitrary `event`s to this channel. + :send-fn ; (fn [event & [?timeout-ms ?cb-fn]]) for client>server send. + :state ; Watchable, read-only (atom {:type _ :open? _ :uid _ :csrf-token _}). + :chsk ; IChSocket implementer. You can usu. ignore this. + + Common options: + :type ; e/o #{:auto :ws :ajax}. You'll usually want the default (:auto). + :host ; Server host (defaults to current page's host). + :params ; Map of any params to incl. in chsk Ring requests (handy + ; for application-level auth, etc.). + :packer ; :edn (default), or an IPacker implementation. + :ajax-opts ; Base opts map provided to `taoensso.encore/ajax-lite`. + :wrap-recv-evs? ; Should events from server be wrapped in [:chsk/recv _]?" + + [path & + [{:keys [type host params recv-buf-or-n packer + client-id ajax-opts wrap-recv-evs? backoff-ms-fn] + :as opts + :or {type :auto + recv-buf-or-n (async/sliding-buffer 2048) ; Mostly for buffered-evs + packer :edn + client-id (or (:client-uuid opts) ; Backwards compatibility + (enc/uuid-str)) + + ;; TODO Deprecated. Default to false later, then eventually just + ;; drop this option altogether? - here now for back compatibility: + wrap-recv-evs? true + + backoff-ms-fn enc/exp-backoff}} + + _deprecated-more-opts]] + + (have? [:in #{:ajax :ws :auto}] type) + (have? enc/nblank-str? client-id) + + (when (not (nil? _deprecated-more-opts)) (warnf "`make-channel-socket-client!` fn signature CHANGED with Sente v0.10.0.")) + (when (contains? opts :lp-timeout) (warnf ":lp-timeout opt has CHANGED; please use :lp-timout-ms.")) + + (let [packer (coerce-packer packer) + + win-loc (enc/get-win-loc) + win-protocol (:protocol win-loc) + host (or host (:host win-loc)) + path (or path (:pathname win-loc)) + + [ws-url ajax-url] + (if-let [f (:chsk-url-fn opts)] ; Deprecated + [(f path win-loc :ws) (f path win-loc :ajax)] + [(get-chsk-url win-protocol host path :ws) + (get-chsk-url win-protocol host path :ajax)]) + + private-chs + {:internal (chan (async/sliding-buffer 128)) + :state (chan (async/sliding-buffer 10)) + ;; Nb must be >= max expected buffered-evs size: + :ev-msg [ev] + (let [[ev-id ev-?data :as ev] (as-event ev)] + {:ch-recv ev-ch + :send-fn send-fn + :state (:state_ chsk) + :event ev + :id ev-id + :?data ev-?data})) + ev-ch)] + + {:chsk chsk + :ch-recv ev-msg-ch ; Public `ev`s->`ev-msg`s ch + :send-fn send-fn + :state (:state_ chsk)}) + + (warnf "Failed to create channel socket"))))) ;;;; Event-msg routers (handler loops) @@ -1468,42 +1469,42 @@ ;;;; Platform aliases -(def event-msg? #+clj server-event-msg? #+cljs client-event-msg?) +(def event-msg? #?(:clj server-event-msg? :cljs client-event-msg?)) (def make-channel-socket! - #+clj make-channel-socket-server! - #+cljs make-channel-socket-client!) + #?(:clj make-channel-socket-server! + :cljs make-channel-socket-client!)) (def start-chsk-router! - #+clj start-server-chsk-router! - #+cljs start-client-chsk-router!) + #?(:clj start-server-chsk-router! + :cljs start-client-chsk-router!)) ;;;; Deprecated -#+clj -(defn start-chsk-router-loop! - "DEPRECATED: Please use `start-chsk-router!` instead" - [event-msg-handler ch-recv] - (start-server-chsk-router! ch-recv - ;; Old handler form: (fn [ev-msg ch-recv]) - (fn [ev-msg] (event-msg-handler ev-msg (:ch-recv ev-msg))))) - -#+cljs -(defn start-chsk-router-loop! - "DEPRECATED: Please use `start-chsk-router!` instead" - [event-handler ch-recv] - (start-client-chsk-router! ch-recv - ;; Old handler form: (fn [ev ch-recv]) - (fn [ev-msg] (event-handler (:event ev-msg) (:ch-recv ev-msg))))) +#?(:clj + (defn start-chsk-router-loop! + "DEPRECATED: Please use `start-chsk-router!` instead" + [event-msg-handler ch-recv] + (start-server-chsk-router! ch-recv + ;; Old handler form: (fn [ev-msg ch-recv]) + (fn [ev-msg] (event-msg-handler ev-msg (:ch-recv ev-msg)))))) + +#?(:cljs + (defn start-chsk-router-loop! + "DEPRECATED: Please use `start-chsk-router!` instead" + [event-handler ch-recv] + (start-client-chsk-router! ch-recv + ;; Old handler form: (fn [ev ch-recv]) + (fn [ev-msg] (event-handler (:event ev-msg) (:ch-recv ev-msg)))))) (def set-logging-level! "DEPRECATED. Please use `timbre/set-level!` instead" timbre/set-level!) -#+cljs (def ajax-call "DEPRECATED: Please use `ajax-lite` instead" enc/ajax-lite) -#+cljs -(def default-chsk-url-fn "DEPRECATED" - (fn [path {:as location :keys [protocol host pathname]} websocket?] - (let [protocol - (if websocket? - (if (= protocol "https:") "wss:" "ws:") - protocol)] - (str protocol "//" host (or path pathname))))) +#?(:cljs (def ajax-call "DEPRECATED: Please use `ajax-lite` instead" enc/ajax-lite)) +#?(:cljs + (def default-chsk-url-fn "DEPRECATED" + (fn [path {:as location :keys [protocol host pathname]} websocket?] + (let [protocol + (if websocket? + (if (= protocol "https:") "wss:" "ws:") + protocol)] + (str protocol "//" host (or path pathname)))))) diff --git a/src/taoensso/sente/interfaces.cljx b/src/taoensso/sente/interfaces.cljc similarity index 100% rename from src/taoensso/sente/interfaces.cljx rename to src/taoensso/sente/interfaces.cljc diff --git a/src/taoensso/sente/packers/transit.cljc b/src/taoensso/sente/packers/transit.cljc new file mode 100644 index 0000000..651a5cd --- /dev/null +++ b/src/taoensso/sente/packers/transit.cljc @@ -0,0 +1,110 @@ +(ns taoensso.sente.packers.transit + "Alpha - subject to change! + Optional Transit-format[1] IPacker implementation for use with Sente. + [1] https://github.com/cognitect/transit-format." + {:author "Peter Taoussanis, @ckarlsen84"} + + #?(:clj + (:require + [clojure.string :as str] + [taoensso.encore :as enc :refer (have have! have?)] + [taoensso.timbre :as timbre] + [cognitect.transit :as transit] + [taoensso.sente.interfaces :as interfaces :refer (pack unpack)])) + + #?(:clj + (:import [java.io ByteArrayInputStream ByteArrayOutputStream])) + + #?(:cljs + (:require + [clojure.string :as str] + [taoensso.encore :as enc :refer-macros (have have! have?)] + [cognitect.transit :as transit] + [taoensso.sente.interfaces :as interfaces :refer (pack unpack)]))) + +#?(:clj + (defn- get-charset [transit-fmt] + ;; :msgpack appears to need ISO-8859-1 to retain binary data correctly when + ;; string-encoded, all other (non-binary) formats can get UTF-8: + (if (enc/kw-identical? transit-fmt :msgpack) "ISO-8859-1" "UTF-8"))) + +#?(:clj + (def ^:private cache-read-handlers + "reader-opts -> reader-opts with cached read handler map" + (let [cache (enc/memoize_ (fn [m] (transit/read-handler-map m)))] + (fn [reader-opts] + (if-let [m (:handlers reader-opts)] + (assoc reader-opts :handlers (cache m)) + reader-opts))))) + +#?(:clj + (def ^:private cache-write-handlers + "writer-opts -> writer-opts with cached write handler map" + (let [cache (enc/memoize_ (fn [m] (transit/write-handler-map m)))] + (fn [writer-opts] + (if-let [m (:handlers writer-opts)] + (assoc writer-opts :handlers (cache m)) + writer-opts))))) + +#?(:clj + (def ^:private transit-writer-fn-proxy + (enc/thread-local-proxy + (fn [fmt opts] + (let [^String charset (get-charset fmt) + opts (cache-write-handlers opts) + ^ByteArrayOutputStream baos (ByteArrayOutputStream. 64) + writer (transit/writer baos fmt opts)] + (fn [x] + (transit/write writer x) + (let [result (.toString baos charset)] + (.reset baos) + result))))))) + +(def ^:private get-transit-writer-fn + "Returns thread-safe (fn [x-to-write])" + #?(:cljs + (enc/memoize_ + (fn [fmt opts] + (let [writer (transit/writer fmt opts)] + (fn [x] (transit/write writer x))))) + :clj + (fn [fmt opts] + (let [thread-local-transit-writer-fn (.get ^ThreadLocal transit-writer-fn-proxy)] + (thread-local-transit-writer-fn fmt opts))))) + +(def ^:private get-transit-reader-fn + "Returns thread-safe (fn [str-to-read])" + #?(:cljs + (enc/memoize_ + (fn [fmt opts] + (let [reader (transit/reader fmt opts)] + (fn [s] (transit/read reader s))))) + :clj + (fn [fmt opts] + (let [^String charset (get-charset fmt) + opts (cache-read-handlers opts)] + (fn [s] + (let [ba (.getBytes ^String s ^String charset) + ^ByteArrayInputStream bais (ByteArrayInputStream. ba) + reader (transit/reader bais fmt opts)] + (transit/read reader))))))) + +(deftype TransitPacker [transit-fmt writer-opts reader-opts] + taoensso.sente.interfaces/IPacker + (pack [_ x] ((get-transit-writer-fn transit-fmt writer-opts) x)) + (unpack [_ s] ((get-transit-reader-fn transit-fmt reader-opts) s))) + +(defn get-transit-packer "Returns a new TransitPacker" + ([ ] (get-transit-packer :json {} {})) + ([transit-fmt] (get-transit-packer transit-fmt {} {})) + ([transit-fmt writer-opts reader-opts] + ;; No transit-cljs support for msgpack atm + (have? [:el #{:json #_:msgpack}] transit-fmt) + (have? map? writer-opts reader-opts) + (TransitPacker. transit-fmt writer-opts reader-opts))) + +(comment + (def tp (get-transit-packer)) + (enc/qb 10000 + (unpack tp (pack tp [:chsk/ws-ping "foo"])) + (enc/read-edn (enc/pr-edn [:chsk/ws-ping "foo"])))) diff --git a/src/taoensso/sente/packers/transit.cljx b/src/taoensso/sente/packers/transit.cljx deleted file mode 100644 index bb7a70e..0000000 --- a/src/taoensso/sente/packers/transit.cljx +++ /dev/null @@ -1,112 +0,0 @@ -(ns taoensso.sente.packers.transit - "Alpha - subject to change! - Optional Transit-format[1] IPacker implementation for use with Sente. - [1] https://github.com/cognitect/transit-format." - {:author "Peter Taoussanis, @ckarlsen84"} - - #+clj - (:require - [clojure.string :as str] - [taoensso.encore :as enc :refer (have have! have?)] - [taoensso.timbre :as timbre] - [cognitect.transit :as transit] - [taoensso.sente.interfaces :as interfaces :refer (pack unpack)]) - - #+clj - (:import [java.io ByteArrayInputStream ByteArrayOutputStream]) - - #+cljs - (:require - [clojure.string :as str] - [taoensso.encore :as enc :refer-macros (have have! have?)] - [cognitect.transit :as transit] - [taoensso.sente.interfaces :as interfaces :refer (pack unpack)])) - -#+clj -(defn- get-charset [transit-fmt] - ;; :msgpack appears to need ISO-8859-1 to retain binary data correctly when - ;; string-encoded, all other (non-binary) formats can get UTF-8: - (if (enc/kw-identical? transit-fmt :msgpack) "ISO-8859-1" "UTF-8")) - -#+clj -(def ^:private cache-read-handlers - "reader-opts -> reader-opts with cached read handler map" - (let [cache (enc/memoize_ (fn [m] (transit/read-handler-map m)))] - (fn [reader-opts] - (if-let [m (:handlers reader-opts)] - (assoc reader-opts :handlers (cache m)) - reader-opts)))) - -#+clj -(def ^:private cache-write-handlers - "writer-opts -> writer-opts with cached write handler map" - (let [cache (enc/memoize_ (fn [m] (transit/write-handler-map m)))] - (fn [writer-opts] - (if-let [m (:handlers writer-opts)] - (assoc writer-opts :handlers (cache m)) - writer-opts)))) - -#+clj -(def ^:private transit-writer-fn-proxy - (enc/thread-local-proxy - (fn [fmt opts] - (let [^String charset (get-charset fmt) - opts (cache-write-handlers opts) - ^ByteArrayOutputStream baos (ByteArrayOutputStream. 64) - writer (transit/writer baos fmt opts)] - (fn [x] - (transit/write writer x) - (let [result (.toString baos charset)] - (.reset baos) - result)))))) - -(def ^:private get-transit-writer-fn - "Returns thread-safe (fn [x-to-write])" - #+cljs - (enc/memoize_ - (fn [fmt opts] - (let [writer (transit/writer fmt opts)] - (fn [x] (transit/write writer x))))) - - #+clj - (fn [fmt opts] - (let [thread-local-transit-writer-fn (.get ^ThreadLocal transit-writer-fn-proxy)] - (thread-local-transit-writer-fn fmt opts)))) - -(def ^:private get-transit-reader-fn - "Returns thread-safe (fn [str-to-read])" - #+cljs - (enc/memoize_ - (fn [fmt opts] - (let [reader (transit/reader fmt opts)] - (fn [s] (transit/read reader s))))) - - #+clj - (fn [fmt opts] - (let [^String charset (get-charset fmt) - opts (cache-read-handlers opts)] - (fn [s] - (let [ba (.getBytes ^String s ^String charset) - ^ByteArrayInputStream bais (ByteArrayInputStream. ba) - reader (transit/reader bais fmt opts)] - (transit/read reader)))))) - -(deftype TransitPacker [transit-fmt writer-opts reader-opts] - taoensso.sente.interfaces/IPacker - (pack [_ x] ((get-transit-writer-fn transit-fmt writer-opts) x)) - (unpack [_ s] ((get-transit-reader-fn transit-fmt reader-opts) s))) - -(defn get-transit-packer "Returns a new TransitPacker" - ([ ] (get-transit-packer :json {} {})) - ([transit-fmt] (get-transit-packer transit-fmt {} {})) - ([transit-fmt writer-opts reader-opts] - ;; No transit-cljs support for msgpack atm - (have? [:el #{:json #_:msgpack}] transit-fmt) - (have? map? writer-opts reader-opts) - (TransitPacker. transit-fmt writer-opts reader-opts))) - -(comment - (def tp (get-transit-packer)) - (enc/qb 10000 - (unpack tp (pack tp [:chsk/ws-ping "foo"])) - (enc/read-edn (enc/pr-edn [:chsk/ws-ping "foo"]))))