diff --git a/src/taoensso/sente.cljx b/src/taoensso/sente.cljx index fc07298..158c6b9 100644 --- a/src/taoensso/sente.cljx +++ b/src/taoensso/sente.cljx @@ -630,10 +630,11 @@ #+cljs (def ajax-lite "Alias of `taoensso.encore/ajax-lite`" enc/ajax-lite) #+cljs (defprotocol IChSocket - (-chsk-init! [chsk] "Implementation detail") - (-chsk-send! [chsk ev opts] "Implementation detail") - (chsk-destroy! [chsk] "Permanently kills socket") - (chsk-reconnect! [chsk] "Drops socket connection and allows immediate auto-reconnect. Useful for reauthenticating after login/logout")) + (-chsk-connect! [chsk] "Implementation detail") + (-chsk-send! [chsk ev opts] "Implementation detail") + (chsk-destroy! [chsk] "DEPRECATED: Please use `chsk-disconnect!` instead") + (chsk-disconnect! [chsk] "Disconnects channel socket") + (chsk-reconnect! [chsk] "Reconnects channel socket (useful for reauthenticating after login/logout, etc.)")) #+cljs (defn chsk-send! @@ -740,19 +741,14 @@ :handled)))) -#+cljs -(defn- after-timeout [msecs nullary-f] - (let [timer-id (.setTimeout js/window nullary-f msecs)] - (fn stop [] (.clearTimeout js/window timer-id) nil))) - #+cljs ;; Handles reconnects, keep-alives, callbacks: (defrecord ChWebSocket - [client-id url params chs socket_ kalive-ms kalive-timer_ kalive-due?_ nattempt_ - cbs-waiting _ ; { ...} - state_ ; {:type _ :open? _ :uid _ :csrf-token _ :destroyed? _} - packer ; IPacker + [client-id chs params packer url + state_ ; {:type _ :open? _ :uid _ :csrf-token _} + cbs-waiting_ ; { ...} + socket_ kalive-ms kalive-timer_ kalive-due?_ backoff-ms-fn ; (fn [nattempt]) -> msecs - ] + active-retry-id_ retry-count_] IChSocket (-chsk-send! [chsk ev {:as opts ?timeout-ms :timeout-ms ?cb :cb :keys [flush?]}] @@ -785,91 +781,104 @@ (cb-fn* :chsk/error))) false)))))) - (chsk-reconnect! [chsk] - (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) - (when-let [s @socket_] (.close s 3000 "SENTE_RECONNECT"))) - - (chsk-destroy! [chsk] - (merge>chsk-state! chsk {:open? false :destroyed? true}) + (chsk-destroy! [chsk] (chsk-disconnect! chsk)) + (chsk-disconnect! [chsk] + (reset! active-retry-id_ "disconnected") + (merge>chsk-state! chsk {:open? false}) (when-let [s @socket_] (.close s 1000 "CLOSE_NORMAL"))) - (-chsk-init! [chsk] + (chsk-reconnect! [chsk] + (reset! active-retry-id_ "reconnecting") + (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (when-let [s @socket_] (.close s 3000 "SENTE_RECONNECT")) + (-chsk-connect! chsk)) + + (-chsk-connect! [chsk] (when-let [WebSocket (or (enc/oget js/window "WebSocket") (enc/oget js/window "MozWebSocket"))] - (let [connect-fn + (let [retry-id (enc/uuid-str) + connect-fn (fn connect-fn [] - - (when-not (:destroyed? @state_) - (let [retry-fn - (fn [] - (let [nattempt* (swap! nattempt_ inc) - backoff-ms (backoff-ms-fn nattempt*)] + (let [retry-fn + (fn [] + (when (= @active-retry-id_ retry-id) + (let [retry-count* (swap! retry-count_ inc) + backoff-ms (backoff-ms-fn retry-count*)] (.clearInterval js/window @kalive-timer_) - (warnf "Chsk is closed: will try reconnect (%s)." nattempt*) - (after-timeout backoff-ms connect-fn))) - - ?socket - (try - (WebSocket. - (enc/merge-url-with-query-string url - ;; User params first (don't clobber impl. params): - (merge params {:client-id client-id}))) - (catch js/Error e - (errorf e "WebSocket js/Error") - nil))] - - (if-not ?socket - (retry-fn) ; Couldn't even get a socket - - (reset! socket_ - (doto socket - (aset "onerror" (fn [ws-ev] (errorf "WebSocket error: %s" ws-ev))) - (aset "onmessage" ; Nb receives both push & cb evs! - (fn [ws-ev] - (let [;; Nb may or may NOT satisfy `event?` since we also - ;; receive cb replies here! This is actually why - ;; we prefix our pstrs to indicate whether they're - ;; wrapped or not. - ppstr (enc/oget ws-ev "data") - [clj ?cb-uuid] (unpack packer ppstr)] - ;; (assert-event clj) ;; NO! - (or - (and (handle-when-handshake! chsk chs clj) - (reset! nattempt_ 0)) - (if-let [cb-uuid ?cb-uuid] - (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ - cb-uuid)] - (cb-fn clj) - (warnf "Cb reply w/o local cb-fn: %s" clj)) - (let [buffered-evs clj] - (receive-buffered-evs! chs buffered-evs))))))) - - (aset "onopen" - (fn [_ws-ev] - (reset! kalive-timer_ - (.setInterval js/window - (fn [] - (when @kalive-due?_ ; Don't ping unnecessarily - (chsk-send! chsk [:chsk/ws-ping])) - (reset! kalive-due?_ true)) - kalive-ms)) - ;; NO, better for server to send a handshake!: - ;; (merge>chsk-state! chsk {:open? true}) - )) - - ;; Fires repeatedly (on each connection attempt) while - ;; server is down: - (aset "onclose" - (fn [_ws-ev] - (merge>chsk-state! chsk {:open? false}) - (retry!)))))))))] + (warnf "Chsk is closed: will try reconnect (%s)" + retry-count*) + (.setTimeout js/window connect-fn backoff-ms)))) + + ?socket + (try + (WebSocket. + (enc/merge-url-with-query-string url + ;; User params first (don't clobber impl. params): + (merge params {:client-id client-id}))) + (catch js/Error e + (errorf e "WebSocket js/Error") + nil))] + + (if-not ?socket + (retry-fn) ; Couldn't even get a socket + + (reset! socket_ + (doto ?socket + (aset "onerror" + (fn [ws-ev] (errorf "WebSocket error: %s" ws-ev))) + + (aset "onmessage" ; Nb receives both push & cb evs! + (fn [ws-ev] + (let [;; Nb may or may NOT satisfy `event?` since we also + ;; receive cb replies here! This is actually why + ;; we prefix our pstrs to indicate whether they're + ;; wrapped or not. + ppstr (enc/oget ws-ev "data") + [clj ?cb-uuid] (unpack packer ppstr)] + ;; (assert-event clj) ;; NO! + (or + (and (handle-when-handshake! chsk chs clj) + (reset! retry-count_ 0)) + (if-let [cb-uuid ?cb-uuid] + (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ + cb-uuid)] + (cb-fn clj) + (warnf "Cb reply w/o local cb-fn: %s" clj)) + (let [buffered-evs clj] + (receive-buffered-evs! chs buffered-evs))))))) + + (aset "onopen" + (fn [_ws-ev] + (reset! kalive-timer_ + (.setInterval js/window + (fn [] + (when @kalive-due?_ ; Don't ping unnecessarily + (chsk-send! chsk [:chsk/ws-ping])) + (reset! kalive-due?_ true)) + kalive-ms)) + ;; NO, better for server to send a handshake!: + ;; (merge>chsk-state! chsk {:open? true}) + )) + + ;; Fires repeatedly (on each connection attempt) while + ;; server is down: + (aset "onclose" + (fn [_ws-ev] + (merge>chsk-state! chsk {:open? false}) + (retry-fn))))))))] + + (reset! active-retry-id_ retry-id) + (reset! retry-count_ 0) (connect-fn)) chsk))) #+cljs (defrecord ChAjaxSocket - [client-id url params chs timeout-ms ajax-opts curr-xhr_ state_ packer + [client-id chs params packer url state_ + timeout-ms ajax-opts curr-xhr_ + active-retry-id_ backoff-ms-fn] + IChSocket (-chsk-send! [chsk ev {:as opts ?timeout-ms :timeout-ms ?cb :cb :keys [flush?]}] (assert-send-args ev ?timeout-ms ?cb) @@ -914,70 +923,79 @@ :apparent-success)))) - (chsk-reconnect! [chsk] - (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (chsk-destroy! [chsk] (chsk-disconnect! chsk)) + (chsk-disconnect! [chsk] + (reset! active-retry-id_ "disconnected") + (merge>chsk-state! chsk {:open? false}) (when-let [x @curr-xhr_] (.abort x))) - (chsk-destroy! [chsk] - (merge>chsk-state! chsk {:open? false :destroyed? true}) - (when-let [x @curr-xhr_] (.abort x))) + (chsk-reconnect! [chsk] + (reset! active-retry-id_ "reconnecting") + (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (when-let [x @curr-xhr_] (.abort x)) + (-chsk-connect! chsk)) - (-chsk-init! [chsk] - (let [poll-fn ; async-poll-for-update-fn - (fn poll-fn [nattempt] + (-chsk-connect! [chsk] + (let [retry-id (enc/uuid-str) + poll-fn ; async-poll-for-update-fn + (fn poll-fn [retry-count] (tracef "async-poll-for-update!") - - (when-not (:destroyed? @state_) - (let [retry-fn - (fn [] - (let [nattempt* (inc nattempt) - backoff-ms (backoff-ms-fn nattempt*)] - (warnf "Chsk is closed: will try reconnect (%s)." nattempt*) - (after-timeout backoff-ms (fn [] (poll-fn nattempt*)) - connect-fn)))] - - (reset! curr-xhr_ - (ajax-lite url - (merge ajax-opts - {:method :get :timeout-ms timeout-ms - :resp-type :text ; Prefer to do our own pstr reading - :params - (merge - - ;; Note that user params here are actually POST params for - ;; convenience. Contrast: WebSocket params sent as query - ;; params since there's no other choice there. - params ; User params first (don't clobber impl. params) - - {:_ (enc/now-udt) ; Force uncached resp - :client-id client-id} - - ;; A truthy :handshake? param will prompt server to - ;; reply immediately with a handshake response, - ;; letting us confirm that our client<->server comms - ;; are working: - (when-not (:open? @state_) {:handshake? true}))}) - - (fn ajax-cb [{:keys [?error ?content]}] - (if ?error - (cond - (= ?error :timeout) (poll-fn 0) - ;; (= ?error :abort) ; Abort => intentional, not an error - :else - (do (merge>chsk-state! chsk {:open? false}) - (retry-fn))) - - ;; The Ajax long-poller is used only for events, never cbs: - (let [content ?content - ppstr content - [clj _] (unpack packer ppstr)] - (or - (handle-when-handshake! chsk chs clj) - ;; Actually poll for an application reply: - (let [buffered-evs clj] - (receive-buffered-evs! chs buffered-evs) - (merge>chsk-state! chsk {:open? true}))) - (poll-fn 0)))))))))] + (let [retry-fn + (fn [] + (when (= @active-retry-id_ retry-id) + (let [retry-count* (inc retry-count) + backoff-ms (backoff-ms-fn retry-count*)] + (warnf "Chsk is closed: will try reconnect (%s)" + retry-count*) + (.setTimeout js/window + (fn [] (poll-fn retry-count*)) + backoff-ms))))] + + (reset! curr-xhr_ + (ajax-lite url + (merge ajax-opts + {:method :get :timeout-ms timeout-ms + :resp-type :text ; Prefer to do our own pstr reading + :params + (merge + + ;; Note that user params here are actually POST params for + ;; convenience. Contrast: WebSocket params sent as query + ;; params since there's no other choice there. + params ; User params first (don't clobber impl. params) + + {:_ (enc/now-udt) ; Force uncached resp + :client-id client-id} + + ;; A truthy :handshake? param will prompt server to + ;; reply immediately with a handshake response, + ;; letting us confirm that our client<->server comms + ;; are working: + (when-not (:open? @state_) {:handshake? true}))}) + + (fn ajax-cb [{:keys [?error ?content]}] + (if ?error + (cond + (= ?error :timeout) (poll-fn 0) + ;; (= ?error :abort) ; Abort => intentional, not an error + :else + (do (merge>chsk-state! chsk {:open? false}) + (retry-fn))) + + ;; The Ajax long-poller is used only for events, never cbs: + (let [content ?content + ppstr content + [clj _] (unpack packer ppstr)] + (or + (handle-when-handshake! chsk chs clj) + ;; Actually poll for an application reply: + (let [buffered-evs clj] + (receive-buffered-evs! chs buffered-evs) + (merge>chsk-state! chsk {:open? true}))) + + (poll-fn 0))))))))] + + (reset! active-retry-id_ retry-id) (poll-fn 0) chsk))) @@ -1081,41 +1099,43 @@ chsk (or (and (not= type :ajax) - (-chsk-init! + (-chsk-connect! (map->ChWebSocket - {:client-id client-id - :url (if-let [f (:chsk-url-fn opts)] - (f path win-location :ws) ; Deprecated - (get-chsk-url win-protocol host path :ws)) - :params params - :chs private-chs - :packer packer - :socket_ (atom nil) - :kalive-ms ws-kalive-ms - :kalive-timer_ (atom nil) - :kalive-due?_ (atom true) - :nattempt_ (atom 0) - :cbs-waiting_ (atom {}) - :state_ (atom {:type :ws :open? false - :destroyed? false}) - :backoff-ms-fn backoff-ms-fn}))) + {:client-id client-id + :chs private-chs + :params params + :packer packer + :url (if-let [f (:chsk-url-fn opts)] + (f path win-location :ws) ; Deprecated + (get-chsk-url win-protocol host path :ws)) + + :state_ (atom {:type :ws :open? false}) + :cbs-waiting_ (atom {}) + :socket_ (atom nil) + :kalive-ms ws-kalive-ms + :kalive-timer_ (atom nil) + :kalive-due?_ (atom true) + :backoff-ms-fn backoff-ms-fn + :active-retry-id_ (atom "pending") + :reconn-count (atom 0)}))) (and (not= type :ws) - (-chsk-init! + (-chsk-connect! (map->ChAjaxSocket - {:client-id client-id - :url (if-let [f (:chsk-url-fn opts)] - (f path win-location :ajax) ; Deprecated - (get-chsk-url win-protocol host path :ajax)) - :params params - :chs private-chs - :packer packer - :timeout-ms lp-timeout-ms - :curr-xhr_ (atom nil) - :state_ (atom {:type :ajax :open? false - :destroyed? false}) - :ajax-opts ajax-opts - :backoff-ms-fn backoff-ms-fn})))) + {:client-id client-id + :chs private-chs + :params params + :packer packer + :url (if-let [f (:chsk-url-fn opts)] + (f path win-location :ajax) ; Deprecated + (get-chsk-url win-protocol host path :ajax)) + + :state_ (atom {:type :ajax :open? false}) + :timeout-ms lp-timeout-ms + :ajax-opts ajax-opts + :curr-xhr_ (atom nil) + :backoff-ms-fn backoff-ms-fn + :active-retry-id_ (atom "pending")})))) _ (assert chsk "Failed to create channel socket") send-fn (partial chsk-send! chsk)