diff --git a/src/taoensso/sente.cljc b/src/taoensso/sente.cljc index 4144871..642e6bb 100644 --- a/src/taoensso/sente.cljc +++ b/src/taoensso/sente.cljc @@ -182,8 +182,8 @@ (put! ch-recv ev-msg*)))) ;;; Note that cb replys need _not_ be `event` form! -#?(:cljs (defn cb-error? [cb-reply-clj] (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply-clj))) -#?(:cljs (defn cb-success? [cb-reply-clj] (not (cb-error? cb-reply-clj)))) +(defn cb-error? [cb-reply-clj] (#{:chsk/closed :chsk/timeout :chsk/error} cb-reply-clj)) +(defn cb-success? [cb-reply-clj] (not (cb-error? cb-reply-clj))) ;;;; Packing ;; * Client<->server payloads are arbitrary Clojure vals (cb replies or events). @@ -752,155 +752,143 @@ ;;;; Client API #?(:cljs (def ajax-lite "Alias of `taoensso.encore/ajax-lite`" enc/ajax-lite)) -#?(:cljs - (defprotocol IChSocket - (-chsk-connect! [chsk]) - (-chsk-disconnect! [chsk reason]) - (-chsk-reconnect! [chsk]) - (-chsk-send! [chsk ev opts]))) - -#?(:cljs - (do - (defn chsk-connect! [chsk] (-chsk-connect! chsk)) - (defn chsk-disconnect! [chsk] (-chsk-disconnect! chsk :requested-disconnect)) - (defn chsk-reconnect! "Useful for reauthenticating after login/logout, etc." - [chsk] (-chsk-reconnect! chsk)) - (def chsk-destroy! "Deprecated" chsk-disconnect!))) - -#?(:cljs - (defn chsk-send! - "Sends `[ev-id ev-?data :as event]`, returns true on apparent success." - ([chsk ev] (chsk-send! chsk ev {})) - ([chsk ev ?timeout-ms ?cb] (chsk-send! chsk ev {:timeout-ms ?timeout-ms - :cb ?cb})) - ([chsk ev opts] - (tracef "Chsk send: (%s) %s" (assoc opts :cb (boolean (:cb opts))) ev) - (-chsk-send! chsk ev opts)))) - -#?(:cljs - (defn- chsk-send->closed! [?cb-fn] - (warnf "Chsk send against closed chsk.") - (when ?cb-fn (?cb-fn :chsk/closed)) - false)) - -#?(:cljs - (defn- assert-send-args [x ?timeout-ms ?cb] - (assert-event x) - (assert (or (and (nil? ?timeout-ms) (nil? ?cb)) - (and (enc/nat-int? ?timeout-ms))) - (str "cb requires a timeout; timeout-ms should be a +ive integer: " ?timeout-ms)) - (assert (or (nil? ?cb) (ifn? ?cb) (enc/chan? ?cb)) - (str "cb should be nil, an ifn, or a channel: " (type ?cb))))) -#?(:cljs - (defn- pull-unused-cb-fn! [cbs-waiting_ ?cb-uuid] - (when-let [cb-uuid ?cb-uuid] - (swap-in! cbs-waiting_ [cb-uuid] - (fn [?f] (swapped :swap/dissoc ?f)))))) - -#?(:cljs - (defn- swap-chsk-state! - "Atomically swaps the value of chk's :state_ atom." - [chsk f] - (let [[old-state new-state] - (swap-in! (:state_ chsk) - (fn [old-state] - (let [new-state (f old-state) - new-state - (if (:first-open? old-state) - (assoc new-state :first-open? false) - new-state) - - new-state - (if (:open? new-state) - (dissoc new-state :udt-next-reconnect) - new-state)] - - (swapped new-state [old-state new-state]))))] - - (when (not= old-state new-state) - (let [output [old-state new-state]] - ;; (debugf "Chsk state change: %s" output) - (put! (get-in chsk [:chs :state]) [:chsk/state output]) - output))))) - -#?(:cljs - (defn- chsk-state->closed [state reason] - (have? map? state) - (have? [:el #{:requested-disconnect - :requested-reconnect - :downgrading-ws-to-ajax - :unexpected}] reason) - (if (or (:open? state) (not= reason :unexpected)) - (-> state - (dissoc :udt-next-reconnect) - (assoc - :open? false - :last-close {:udt (enc/now-udt) :reason reason})) - state))) - -#?(:cljs - (defn- cb-chan-as-fn - "Experimental, undocumented. Allows a core.async channel to be provided +(defprotocol IChSocket + (-chsk-connect! [chsk]) + (-chsk-disconnect! [chsk reason]) + (-chsk-reconnect! [chsk]) + (-chsk-send! [chsk ev opts])) + +(defn chsk-connect! [chsk] (-chsk-connect! chsk)) +(defn chsk-disconnect! [chsk] (-chsk-disconnect! chsk :requested-disconnect)) +(defn chsk-reconnect! "Useful for reauthenticating after login/logout, etc." + [chsk] (-chsk-reconnect! chsk)) +(def chsk-destroy! "Deprecated" chsk-disconnect!) + +(defn chsk-send! + "Sends `[ev-id ev-?data :as event]`, returns true on apparent success." + ([chsk ev] (chsk-send! chsk ev {})) + ([chsk ev ?timeout-ms ?cb] (chsk-send! chsk ev {:timeout-ms ?timeout-ms + :cb ?cb})) + ([chsk ev opts] + (tracef "Chsk send: (%s) %s" (assoc opts :cb (boolean (:cb opts))) ev) + (-chsk-send! chsk ev opts))) + +(defn- chsk-send->closed! [?cb-fn] + (warnf "Chsk send against closed chsk.") + (when ?cb-fn (?cb-fn :chsk/closed)) + false) + +(defn- assert-send-args [x ?timeout-ms ?cb] + (assert-event x) + (assert (or (and (nil? ?timeout-ms) (nil? ?cb)) + (and (enc/nat-int? ?timeout-ms))) + (str "cb requires a timeout; timeout-ms should be a +ive integer: " ?timeout-ms)) + (assert (or (nil? ?cb) (ifn? ?cb) (enc/chan? ?cb)) + (str "cb should be nil, an ifn, or a channel: " (type ?cb)))) + +(defn- pull-unused-cb-fn! [cbs-waiting_ ?cb-uuid] + (when-let [cb-uuid ?cb-uuid] + (swap-in! cbs-waiting_ [cb-uuid] + (fn [?f] (swapped :swap/dissoc ?f))))) + +(defn- swap-chsk-state! + "Atomically swaps the value of chk's :state_ atom." + [chsk f] + (let [[old-state new-state] + (swap-in! (:state_ chsk) + (fn [old-state] + (let [new-state (f old-state) + new-state + (if (:first-open? old-state) + (assoc new-state :first-open? false) + new-state) + + new-state + (if (:open? new-state) + (dissoc new-state :udt-next-reconnect) + new-state)] + + (swapped new-state [old-state new-state]))))] + + (when (not= old-state new-state) + (let [output [old-state new-state]] + ;; (debugf "Chsk state change: %s" output) + (put! (get-in chsk [:chs :state]) [:chsk/state output]) + output)))) + +(defn- chsk-state->closed [state reason] + (have? map? state) + (have? [:el #{:requested-disconnect + :requested-reconnect + :downgrading-ws-to-ajax + :unexpected}] reason) + (if (or (:open? state) (not= reason :unexpected)) + (-> state + (dissoc :udt-next-reconnect) + (assoc + :open? false + :last-close {:udt (enc/now-udt) :reason reason})) + state)) + +(defn- cb-chan-as-fn + "Experimental, undocumented. Allows a core.async channel to be provided instead of a cb-fn. The channel will receive values of form [.cb ]." - [?cb ev] - (if (or (nil? ?cb) (ifn? ?cb)) - ?cb - (do - (have? enc/chan? ?cb) - (assert-event ev) - (let [[ev-id _] ev - cb-ch ?cb] - (fn [reply] - (put! cb-ch - [(keyword (str (enc/as-qname ev-id) ".cb")) - reply]))))))) - -#?(:cljs - (defn- receive-buffered-evs! [chs clj] - (tracef "receive-buffered-evs!: %s" clj) - (let [buffered-evs (have vector? clj)] - (doseq [ev buffered-evs] - (assert-event ev) - ;; Should never receive :chsk/* events from server here: - (let [[id] ev] (assert (not= (namespace id) "chsk"))) - (put! (: e/o #{:ws :ajax}, etc. - :open? true - :ever-opened? true - :uid ?uid - :csrf-token ?csrf-token - :handshake-data ?handshake-data - :first-open? first-handshake?} - - handshake-ev - [:chsk/handshake - [?uid ?csrf-token ?handshake-data first-handshake?]]] - - (assert-event handshake-ev) - (when (str/blank? ?csrf-token) - (warnf "SECURITY WARNING: no CSRF token available for use by Sente")) - - (swap-chsk-state! chsk #(merge % new-state)) - (put! (:internal chs) handshake-ev) - - :handled))) + [?cb ev] + (if (or (nil? ?cb) (ifn? ?cb)) + ?cb + (do + (have? enc/chan? ?cb) + (assert-event ev) + (let [[ev-id _] ev + cb-ch ?cb] + (fn [reply] + (put! cb-ch + [(keyword (str (enc/as-qname ev-id) ".cb")) + reply])))))) + +(defn- receive-buffered-evs! [chs clj] + (tracef "receive-buffered-evs!: %s" clj) + (let [buffered-evs (have vector? clj)] + (doseq [ev buffered-evs] + (assert-event ev) + ;; Should never receive :chsk/* events from server here: + (let [[id] ev] (assert (not= (namespace id) "chsk"))) + (put! (: e/o #{:ws :ajax}, etc. + :open? true + :ever-opened? true + :uid ?uid + :csrf-token ?csrf-token + :handshake-data ?handshake-data + :first-open? first-handshake?} + + handshake-ev + [:chsk/handshake + [?uid ?csrf-token ?handshake-data first-handshake?]]] + + (assert-event handshake-ev) + (when (str/blank? ?csrf-token) + (warnf "SECURITY WARNING: no CSRF token available for use by Sente")) + + (swap-chsk-state! chsk #(merge % new-state)) + (put! (:internal chs) handshake-ev) + + :handled)) #?(:clj (defmacro ^:private elide-require @@ -930,125 +918,173 @@ ;; (errorf e "Unable to load npm websocket lib") nil))))))) -#?(:cljs - (defrecord ChWebSocket - ;; WebSocket-only IChSocket implementation - ;; Handles (re)connections, cbs, etc. - - [client-id chs params packer url ws-kalive-ms - state_ ; {:type _ :open? _ :uid _ :csrf-token _ ...} - instance-handle_ retry-count_ ever-opened?_ - backoff-ms-fn ; (fn [nattempt]) -> msecs - cbs-waiting_ ; { ...} - socket_ - udt-last-comms_] - - IChSocket - (-chsk-disconnect! [chsk reason] - (reset! instance-handle_ nil) ; Disable auto retry - (swap-chsk-state! chsk #(chsk-state->closed % reason)) - (when-let [s @socket_] (.close s 1000 "CLOSE_NORMAL"))) - - (-chsk-reconnect! [chsk] - (-chsk-disconnect! chsk :requested-reconnect) - (-chsk-connect! chsk)) - - (-chsk-send! [chsk ev opts] - (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts - _ (assert-send-args ev ?timeout-ms ?cb) - ?cb-fn (cb-chan-as-fn ?cb ev)] - (if-not (:open? @state_) ; Definitely closed - (chsk-send->closed! ?cb-fn) - - ;; TODO Buffer before sending (but honor `:flush?`) - (let [?cb-uuid (when ?cb-fn (enc/uuid-str 6)) - ppstr (pack packer (meta ev) ev ?cb-uuid)] - - (when-let [cb-uuid ?cb-uuid] - (reset-in! cbs-waiting_ [cb-uuid] (have ?cb-fn)) - (when-let [timeout-ms ?timeout-ms] - (go - (clj ws-ev) + (catch :default _ ws-ev))) + (on-error ws-ev))) + + (aset "onmessage" + (fn [ws-ev] + (let [ppstr (enc/oget ws-ev "data")] + (on-msg ppstr)))) + + (aset "onclose" + (fn [ws-ev] + (on-close ws-evt)))) + (JSDefaultWebSocket. ?socket)))))) + + +(defrecord ChWebSocket + ;; WebSocket-only IChSocket implementation + ;; Handles (re)connections, cbs, etc. + + [client-id chs params packer url ws-kalive-ms + state_ ; {:type _ :open? _ :uid _ :csrf-token _ ...} + instance-handle_ retry-count_ ever-opened?_ + backoff-ms-fn ; (fn [nattempt]) -> msecs + cbs-waiting_ ; { ...} + cws-creator + client-wsocket_ + udt-last-comms_] + + IChSocket + (-chsk-disconnect! [chsk reason] + (reset! instance-handle_ nil) ; Disable auto retry + (swap-chsk-state! chsk #(chsk-state->closed % reason)) + (when-let [cws @client-wsocket_] + (interfaces/cws-close! cws))) + + (-chsk-reconnect! [chsk] + (-chsk-disconnect! chsk :requested-reconnect) + (-chsk-connect! chsk)) + + (-chsk-send! [chsk ev opts] + (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts + _ (assert-send-args ev ?timeout-ms ?cb) + ?cb-fn (cb-chan-as-fn ?cb ev)] + (if-not (:open? @state_) ; Definitely closed + (chsk-send->closed! ?cb-fn) + + ;; TODO Buffer before sending (but honor `:flush?`) + (let [?cb-uuid (when ?cb-fn (enc/uuid-str 6)) + ppstr (pack packer (meta ev) ev ?cb-uuid)] + + (when-let [cb-uuid ?cb-uuid] + (reset-in! cbs-waiting_ [cb-uuid] (have ?cb-fn)) + (when-let [timeout-ms ?timeout-ms] + (go + (clj ws-ev) - (catch :default _ ws-ev))) - - (let [last-ws-error {:udt (enc/now-udt), :ev ws-ev}] - (swap-chsk-state! chsk - #(assoc % :last-ws-error last-ws-error))))) - - (aset "onmessage" ; Nb receives both push & cb evs! - (fn [ws-ev] - (let [ppstr (enc/oget ws-ev "data") - - ;; `clj` may/not satisfy `event?` since - ;; we also receive cb replies here. This - ;; is why we prefix pstrs to indicate - ;; whether they're wrapped or not - [clj ?cb-uuid] (unpack packer ppstr)] - - (reset! udt-last-comms_ (enc/now-udt)) - - (or + #?(:cljs (.setTimeout goog/global connect-fn backoff-ms) + :clj (future (Thread/sleep backoff-ms) (connect-fn))) ;; avoid thread? + (swap-chsk-state! chsk + #(assoc % :udt-next-reconnect udt-next-reconnect))))) + + url (enc/merge-url-with-query-string + url + (merge params ; 1st (don't clobber impl.): + {:client-id client-id})) + ;; those are sente's callback functions + ;; that take care of reconnections and + ;; keeping track of active connections. + on-error (fn [ws-ev] + (let [last-ws-error {:udt (enc/now-udt), :ev ws-ev}] + (swap-chsk-state! chsk + #(assoc % :last-ws-error last-ws-error)))) + on-close (fn [ws-ev] + #?(:cljs (let [clean? (enc/oget ws-ev "wasClean") + code (enc/oget ws-ev "code") + reason (enc/oget ws-ev "reason") + last-ws-close + {:udt (enc/now-udt) + :ev ws-ev + :clean? clean? + :code code + :reason reason}] + + ;; Firefox calls "onclose" while unloading, + ;; Ref. http://goo.gl/G5BYbn: + (if clean? + (do + (debugf "Clean WebSocket close, will not attempt reconnect") + (swap-chsk-state! chsk + #(assoc % :last-ws-close last-ws-close))) + (do + (swap-chsk-state! chsk + #(assoc (chsk-state->closed % :unexpected) + :last-ws-close last-ws-close)) + (retry-fn)))) + :clj (let [last-ws-close {:udt (enc/now-udt) + :ev ws-ev}] + (swap-chsk-state! chsk + #(assoc (chsk-state->closed % :unexpected) + :last-ws-close last-ws-close))))) + on-msg (fn [ppstr] ; Nb receives both push & cb evs! + (let [ + ;; `clj` may/not satisfy `event?` since + ;; we also receive cb replies here. This + ;; is why we prefix pstrs to indicate + ;; whether they're wrapped or not + [clj ?cb-uuid] (unpack packer ppstr)] + + (reset! udt-last-comms_ (enc/now-udt)) + + (or (when (handshake? clj) (receive-handshake! :ws chsk clj) (reset! retry-count_ 0) @@ -1060,73 +1096,59 @@ (if-let [cb-uuid ?cb-uuid] (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ - cb-uuid)] + cb-uuid)] (cb-fn clj) (warnf "Cb reply w/o local cb-fn: %s" clj)) (let [buffered-evs clj] - (receive-buffered-evs! chs buffered-evs))))))) - - ;; Fires repeatedly (on each connection attempt) while - ;; server is down: - (aset "onclose" - (fn [ws-ev] - (let [clean? (enc/oget ws-ev "wasClean") - code (enc/oget ws-ev "code") - reason (enc/oget ws-ev "reason") - last-ws-close - {:udt (enc/now-udt) - :ev ws-ev - :clean? clean? - :code code - :reason reason}] - - ;; Firefox calls "onclose" while unloading, - ;; Ref. http://goo.gl/G5BYbn: - (if clean? - (do - (debugf "Clean WebSocket close, will not attempt reconnect") - (swap-chsk-state! chsk - #(assoc % :last-ws-close last-ws-close))) - (do - (swap-chsk-state! chsk - #(assoc (chsk-state->closed % :unexpected) - :last-ws-close last-ws-close)) - (retry-fn))))))))))))] - - (when-let [ms ws-kalive-ms] - (go-loop [] - (let [udt-t0 @udt-last-comms_] - (server (should auto-close conn if it's - ;; gone dead). The server generally sends pings so - ;; this should be rare. Mostly here to help clients - ;; identify conns that were suddenly dropped. - - (-chsk-send! chsk [:chsk/ws-ping] {:flush? true}))) - (recur))))) - - (reset! retry-count_ 0) - (connect-fn) - chsk))))) - -#?(:cljs - (defn- new-ChWebSocket [opts] - (map->ChWebSocket - (merge - {:state_ (atom {:type :ws :open? false :ever-opened? false}) - :instance-handle_ (atom nil) - :retry-count_ (atom 0) - :ever-opened?_ (atom false) - :cbs-waiting_ (atom {}) - :socket_ (atom nil) - :udt-last-comms_ (atom nil)} - opts)))) + (receive-buffered-evs! chs buffered-evs)))))) + ?socket (cws-creator url {:on-msg on-msg + :on-error on-error + :on-close on-close})] + + + (if-not ?socket + (retry-fn) ; Couldn't even get a socket + (do (assert (satisfies? interfaces/IClientWebSocket ?socket)) + #?(:cljs (js/console.log ?socket)) + (reset! client-wsocket_ ?socket))))))] + + (when-let [ms ws-kalive-ms] + (go-loop [] + (let [udt-t0 @udt-last-comms_] + (server (should auto-close conn if it's + ;; gone dead). The server generally sends pings so + ;; this should be rare. Mostly here to help clients + ;; identify conns that were suddenly dropped. + + (-chsk-send! chsk [:chsk/ws-ping] {:flush? true}))) + (recur))))) + + (reset! retry-count_ 0) + (connect-fn) + chsk))) + + +#?(:clj (defn jvm-websocket-creator [] + (throw (Exception. "Missing websocket creator.")))) + +(defn- new-ChWebSocket [opts] + (map->ChWebSocket + (merge + {:state_ (atom {:type :ws :open? false :ever-opened? false}) + :instance-handle_ (atom nil) + :retry-count_ (atom 0) + :ever-opened?_ (atom false) + :cbs-waiting_ (atom {}) + :client-wsocket_ (atom nil) + :udt-last-comms_ (atom nil)} + opts))) (def ^:private default-client-side-ajax-timeout-ms "We must set *some* client-side timeout otherwise an unpredictable (and @@ -1290,95 +1312,91 @@ (poll-fn 0) chsk)))) -#?(:cljs - (defn- new-ChAjaxSocket [opts] - (map->ChAjaxSocket - (merge - {:state_ (atom {:type :ajax :open? false :ever-opened? false}) - :instance-handle_ (atom nil) - :ever-opened?_ (atom false) - :curr-xhr_ (atom nil)} - opts)))) - -#?(:cljs - (defrecord ChAutoSocket - ;; Dynamic WebSocket/Ajax IChSocket implementation - ;; Wraps a swappable ChWebSocket/ChAjaxSocket - - [ws-chsk-opts ajax-chsk-opts state_ - impl_ ; ChWebSocket or ChAjaxSocket - ] - - IChSocket - (-chsk-disconnect! [chsk reason] - (when-let [impl @impl_] - (-chsk-disconnect! impl reason))) - - ;; Possibly reset impl type: - (-chsk-reconnect! [chsk] - (when-let [impl @impl_] - (-chsk-disconnect! impl :requested-reconnect) - (-chsk-connect! chsk))) - - (-chsk-send! [chsk ev opts] - (if-let [impl @impl_] - (-chsk-send! impl ev opts) - (let [{?cb :cb} opts - ?cb-fn (cb-chan-as-fn ?cb ev)] - (chsk-send->closed! ?cb-fn)))) - - (-chsk-connect! [chsk] - ;; Starting with a simple downgrade-only strategy here as a proof of concept - ;; TODO Later consider smarter downgrade or downgrade+upgrade strategies? - (let [ajax-chsk-opts (assoc ajax-chsk-opts :state_ state_) - ws-chsk-opts (assoc ws-chsk-opts :state_ state_) - - ajax-conn! - (fn [] - ;; Remove :auto->:ajax downgrade watch - (remove-watch state_ :chsk/auto-ajax-downgrade) - (-chsk-connect! (new-ChAjaxSocket ajax-chsk-opts))) - - ws-conn! - (fn [] - ;; Configure :auto->:ajax downgrade watch - (let [downgraded?_ (atom false)] - (add-watch state_ :chsk/auto-ajax-downgrade - (fn [_ _ old-state new-state] - (when-let [impl @impl_] - (when-let [ever-opened?_ (:ever-opened?_ impl)] - (when-not @ever-opened?_ - (when (:last-error new-state) - (when (compare-and-set! downgraded?_ false true) - (warnf "Permanently downgrading :auto chsk -> :ajax") - (-chsk-disconnect! impl :downgrading-ws-to-ajax) - (reset! impl_ (ajax-conn!)))))))))) - - (-chsk-connect! (new-ChWebSocket ws-chsk-opts)))] - - (reset! impl_ (or (ws-conn!) (ajax-conn!))) - chsk)))) - -#?(:cljs - (defn- new-ChAutoSocket [opts] - (map->ChAutoSocket - (merge - {:state_ (atom {:type :auto :open? false :ever-opened? false}) - :impl_ (atom nil)} - opts)))) - -#?(:cljs - (defn- get-chsk-url [protocol host path type] - (let [protocol (case protocol :http "http:" :https "https:" protocol) - protocol (have [:el #{"http:" "https:"}] protocol) - protocol (case type - :ajax protocol - :ws (case protocol "https:" "wss:" "http:" "ws:"))] - (str protocol "//" (enc/path host path))))) - -#?(:cljs - (defn make-channel-socket-client! - "Returns nil on failure, or a map with keys: +(defn- new-ChAjaxSocket [opts] + #?(:clj (throw (Exception. "No JVM ajax socket implemented.")) + :cljs (map->ChAjaxSocket + (merge + {:state_ (atom {:type :ajax :open? false :ever-opened? false}) + :instance-handle_ (atom nil) + :ever-opened?_ (atom false) + :curr-xhr_ (atom nil)} + opts)))) + +(defrecord ChAutoSocket + ;; Dynamic WebSocket/Ajax IChSocket implementation + ;; Wraps a swappable ChWebSocket/ChAjaxSocket + + [ws-chsk-opts ajax-chsk-opts state_ + impl_ ; ChWebSocket or ChAjaxSocket + ] + + IChSocket + (-chsk-disconnect! [chsk reason] + (when-let [impl @impl_] + (-chsk-disconnect! impl reason))) + + ;; Possibly reset impl type: + (-chsk-reconnect! [chsk] + (when-let [impl @impl_] + (-chsk-disconnect! impl :requested-reconnect) + (-chsk-connect! chsk))) + + (-chsk-send! [chsk ev opts] + (if-let [impl @impl_] + (-chsk-send! impl ev opts) + (let [{?cb :cb} opts + ?cb-fn (cb-chan-as-fn ?cb ev)] + (chsk-send->closed! ?cb-fn)))) + + (-chsk-connect! [chsk] + ;; Starting with a simple downgrade-only strategy here as a proof of concept + ;; TODO Later consider smarter downgrade or downgrade+upgrade strategies? + (let [ajax-chsk-opts (assoc ajax-chsk-opts :state_ state_) + ws-chsk-opts (assoc ws-chsk-opts :state_ state_) + + ajax-conn! + (fn [] + ;; Remove :auto->:ajax downgrade watch + (remove-watch state_ :chsk/auto-ajax-downgrade) + (-chsk-connect! (new-ChAjaxSocket ajax-chsk-opts))) + + ws-conn! + (fn [] + ;; Configure :auto->:ajax downgrade watch + (let [downgraded?_ (atom false)] + (add-watch state_ :chsk/auto-ajax-downgrade + (fn [_ _ old-state new-state] + (when-let [impl @impl_] + (when-let [ever-opened?_ (:ever-opened?_ impl)] + (when-not @ever-opened?_ + (when (:last-error new-state) + (when (compare-and-set! downgraded?_ false true) + (warnf "Permanently downgrading :auto chsk -> :ajax") + (-chsk-disconnect! impl :downgrading-ws-to-ajax) + (reset! impl_ (ajax-conn!)))))))))) + + (-chsk-connect! (new-ChWebSocket ws-chsk-opts)))] + + (reset! impl_ (or (ws-conn!) (ajax-conn!))) + chsk))) + +(defn- new-ChAutoSocket [opts] + (map->ChAutoSocket + (merge + {:state_ (atom {:type :auto :open? false :ever-opened? false}) + :impl_ (atom nil)} + opts))) + +(defn- get-chsk-url [protocol host path type] + (let [protocol (case protocol :http "http:" :https "https:" protocol) + protocol (have [:el #{"http:" "https:"}] protocol) + protocol (case type + :ajax protocol + :ws (case protocol "https:" "wss:" "http:" "ws:"))] + (str protocol "//" (enc/path host path)))) + +(defn make-channel-socket-client! + "Returns nil on failure, or a map with keys: :ch-recv ; core.async channel to receive `event-msg`s (internal or from ; clients). May `put!` (inject) arbitrary `event`s to this channel. :send-fn ; (fn [event & [?timeout-ms ?cb-fn]]) for client>server send. @@ -1395,116 +1413,120 @@ :ajax-opts ; Base opts map provided to `taoensso.encore/ajax-lite`. :wrap-recv-evs? ; Should events from server be wrapped in [:chsk/recv _]? :ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity - ; w/in given msecs. Should be different to server's :ws-kalive-ms." - - [path & - [{:keys [type protocol host params recv-buf-or-n packer ws-kalive-ms - client-id ajax-opts wrap-recv-evs? backoff-ms-fn] - :as opts - :or {type :auto - recv-buf-or-n (async/sliding-buffer 2048) ; Mostly for buffered-evs - packer :edn - client-id (or (:client-uuid opts) ; Backwards compatibility - (enc/uuid-str)) - wrap-recv-evs? true - backoff-ms-fn enc/exp-backoff - ws-kalive-ms (enc/ms :secs 20)}} - - _deprecated-more-opts]] - - (have? [:in #{:ajax :ws :auto}] type) - (have? enc/nblank-str? client-id) - - (when (not (nil? _deprecated-more-opts)) (warnf "`make-channel-socket-client!` fn signature CHANGED with Sente v0.10.0.")) - (when (contains? opts :lp-timeout) (warnf ":lp-timeout opt has CHANGED; please use :lp-timout-ms.")) - - (let [packer (coerce-packer packer) - - [ws-url ajax-url] - (let [;; Not available with React Native, etc.: - win-loc (enc/get-win-loc) - path (or path (:pathname win-loc))] - - (if-let [f (:chsk-url-fn opts)] ; Deprecated - [(f path win-loc :ws) - (f path win-loc :ajax)] - - (let [protocol (or protocol (:protocol win-loc) :http) - host (or host (:host win-loc))] - [(get-chsk-url protocol host path :ws) - (get-chsk-url protocol host path :ajax)]))) - - private-chs - {:internal (chan (async/sliding-buffer 128)) - :state (chan (async/sliding-buffer 10)) - := max expected buffered-evs size: - buf (async/sliding-buffer 512)] - (if wrap-recv-evs? - (chan buf (map (fn [ev] [:chsk/recv ev]))) - (chan buf)))} - - common-chsk-opts - {:client-id client-id - :chs private-chs - :params params - :packer packer - :ws-kalive-ms ws-kalive-ms} - - ws-chsk-opts - (merge common-chsk-opts - {:url ws-url - :backoff-ms-fn backoff-ms-fn}) - - ajax-chsk-opts - (merge common-chsk-opts - {:url ajax-url - :ajax-opts ajax-opts - :backoff-ms-fn backoff-ms-fn}) - - auto-chsk-opts - {:ws-chsk-opts ws-chsk-opts - :ajax-chsk-opts ajax-chsk-opts} - - ?chsk - (-chsk-connect! - (case type - :ws (new-ChWebSocket ws-chsk-opts) - :ajax (new-ChAjaxSocket ajax-chsk-opts) - :auto (new-ChAutoSocket auto-chsk-opts)))] - - (if-let [chsk ?chsk] - (let [chsk-state_ (:state_ chsk) - internal-ch (:internal private-chs) - send-fn (partial chsk-send! chsk) - ev-ch - (async/merge - [(:internal private-chs) - (:state private-chs) - (:= max expected buffered-evs size: + buf (async/sliding-buffer 512)] + (if wrap-recv-evs? + (chan buf (map (fn [ev] [:chsk/recv ev]))) + (chan buf)))} + + common-chsk-opts + {:client-id client-id + :chs private-chs + :params params + :packer packer + :ws-kalive-ms ws-kalive-ms} + + ws-chsk-opts + (merge common-chsk-opts + {:url ws-url + :backoff-ms-fn backoff-ms-fn + :cws-creator cws-creator}) + + ajax-chsk-opts + (merge common-chsk-opts + {:url ajax-url + :ajax-opts ajax-opts + :backoff-ms-fn backoff-ms-fn}) + + auto-chsk-opts + {:ws-chsk-opts ws-chsk-opts + :ajax-chsk-opts ajax-chsk-opts} + + ?chsk + (-chsk-connect! + (case type + :ws (new-ChWebSocket ws-chsk-opts) + :ajax (new-ChAjaxSocket ajax-chsk-opts) + :auto (new-ChAutoSocket auto-chsk-opts)))] + + (if-let [chsk ?chsk] + (let [chsk-state_ (:state_ chsk) + internal-ch (:internal private-chs) + send-fn (partial chsk-send! chsk) + ev-ch + (async/merge + [(:internal private-chs) + (:state private-chs) + (: serialized strings." (pack [_ x]) (unpack [_ x])) + + +;;;; Client sockets + +(defprotocol IClientWebSocket ; cws + ;; Wraps a client's socket interface to abstract away + ;; implementation differences. + + (cws-close! [cws] "If the channel is open when called: closes the channel and returns true. + Otherwise noops and returns false.") + (cws-send! [cws msg] + "If the socket is open when called: sends a message over socket and + returns true. Otherwise noops and returns false.")) + + +;; Client websocket creator can be provided in the field :cws-creator. +;; The result should implement IClientWebsocket. +;; Here are the arguments the creator should expect : +;; [url callbacks-map] +;; Where `callbacks-map` has the following entry: +;; on-msg - [msg] the message (ppstr) +;; on-error - [evt] +;; on-close - [evt] +;; The websocket creator should return nil if it can't connect. +