diff --git a/src/taoensso/sente.cljc b/src/taoensso/sente.cljc index de51cf9..10b4e6c 100644 --- a/src/taoensso/sente.cljc +++ b/src/taoensso/sente.cljc @@ -30,8 +30,8 @@ * Client-side events: [:chsk/handshake [ nil[4] ]] - [:chsk/state [ ]] - [:chsk/recv ] ; Server>user push + [:chsk/state [ ]] + [:chsk/recv ] ; Server>user push [:chsk/ws-ping] * Server-side events: @@ -52,8 +52,9 @@ :last-ws-close - ?{:udt _ :ev :clean? _ :code _ :reason _} :last-close - ?{:udt _ :reason _}, with reason e/o - #{nil :requested-disconnect :requested-reconnect - :downgrading-ws-to-ajax :unexpected} + #{nil :clean :unexpected :requested-disconnect + :requested-reconnect :downgrading-ws-to-ajax + :ws-ping-timeout} :udt-next-reconnect - Approximate udt of next scheduled auto-reconnect attempt Notable implementation details: @@ -984,41 +985,57 @@ (defn- swap-chsk-state! "Atomically swaps the value of chk's :state_ atom." [chsk f] - (let [[old-state new-state] + (let [[old-state new-state] #_(swap-vals! (:state_ chsk) f) ; Clj 1.9+ (swap-in! (:state_ chsk) (fn [old-state] - (let [new-state (f old-state) - new-state - (if (:first-open? old-state) - (assoc new-state :first-open? false) - new-state) - - new-state - (if (:open? new-state) - (dissoc new-state :udt-next-reconnect) - new-state)] - - (swapped new-state [old-state new-state]))))] + (let [new-state (f old-state)] + (enc/swapped new-state [old-state new-state]))))] (when (not= old-state new-state) - (let [output [old-state new-state]] - ;; (debugf "Chsk state change: %s" output) - (put! (get-in chsk [:chs :state]) [:chsk/state output]) - output)))) + (let [old-open? (boolean (:open? old-state)) + new-open? (boolean (:open? new-state)) + + open-changed? (not= new-open? old-open? ) + opened? (and new-open? (not old-open?)) + closed? (and (not new-open?) old-open?) + first-open? (and opened? (not (:ever-opened? old-state))) + + new-state ; Add transient state transitions, not in @state_ + (if-not open-changed? + (do new-state) + (enc/assoc-when new-state + :open-changed? true + :opened? opened? + :closed? closed? + :first-open? first-open?))] + + (cond + opened? (timbre/infof "Client chsk now open") + closed? (timbre/warnf "Client chsk now closed, reason: %s" + (get-in new-state [:last-close :reason] "unknown"))) + + (let [output [old-state new-state open-changed?]] + (put! (get-in chsk [:chs :state]) [:chsk/state output]) + open-changed?))))) (defn- chsk-state->closed [state reason] (have? map? state) - (have? [:el #{:requested-disconnect - :requested-reconnect - :downgrading-ws-to-ajax - :unexpected}] reason) - (if (or (:open? state) (not= reason :unexpected)) - (-> state - (dissoc :udt-next-reconnect) - (assoc - :open? false - :last-close {:udt (enc/now-udt) :reason reason})) - state)) + (have? + [:el #{:clean :unexpected + :requested-disconnect + :requested-reconnect + :downgrading-ws-to-ajax + :ws-ping-timeout}] + reason) + + (let [closing? (:open? state) + m state + m (dissoc m :udt-next-reconnect) + m (assoc m :open? false)] + + (if closing? + (assoc m :last-close {:udt (enc/now-udt) :reason reason}) + (do m)))) (defn- cb-chan-as-fn "Experimental, undocumented. Allows a core.async channel to be provided @@ -1062,8 +1079,7 @@ :open? true :ever-opened? true :uid ?uid - :handshake-data ?handshake-data - :first-open? first-handshake?} + :handshake-data ?handshake-data} handshake-ev [:chsk/handshake @@ -1075,9 +1091,13 @@ :first-handshake? first-handshake?}]] (assert-event handshake-ev) - (swap-chsk-state! chsk #(merge % new-state)) - (put! (:internal chs) handshake-ev) + (swap-chsk-state! chsk + (fn [m] + (-> m + (dissoc :udt-next-reconnect) + (merge new-state)))) + (put! (:internal chs) handshake-ev) :handled)) #?(:clj @@ -1185,6 +1205,11 @@ (comment (get-client-csrf-token-str false "token")) +(def client-unloading?_ (atom false)) +#?(:cljs + (.addEventListener goog/global "beforeunload" + (fn [event] (reset! client-unloading?_ true) nil))) + (defrecord ChWebSocket ;; WebSocket-only IChSocket implementation ;; Handles (re)connections, cbs, etc. @@ -1195,17 +1220,18 @@ ws-kalive-ms ws-kalive-ping-timeout-ms ws-opts backoff-ms-fn ; (fn [nattempt]) -> msecs cbs-waiting_ ; { ...} - socket_ + socket_ ; ?[ ] udt-last-comms_ ws-constructor] IChSocket (-chsk-disconnect! [chsk reason] (reset! instance-handle_ nil) ; Disable auto retry - (swap-chsk-state! chsk #(chsk-state->closed % reason)) - (when-let [s @socket_] - #?(:clj (.closeBlocking ^WebSocketClient s) - :cljs (.close s 1000 "CLOSE_NORMAL")))) + (let [closed? (swap-chsk-state! chsk #(chsk-state->closed % reason))] + (when-let [[s _sid] @socket_] + #?(:clj (.close ^WebSocketClient s 1000 "CLOSE_NORMAL") + :cljs (.close s 1000 "CLOSE_NORMAL"))) + closed?)) (-chsk-reconnect! [chsk] (-chsk-disconnect! chsk :requested-reconnect) @@ -1230,14 +1256,19 @@ (when-let [cb-fn* (pull-unused-cb-fn! cbs-waiting_ ?cb-uuid)] (cb-fn* :chsk/timeout))))) - (try - #?(:cljs (.send @socket_ ppstr) - :clj (.send ^WebSocketClient @socket_ ^String ppstr)) + (or + (when-let [[s _sid] @socket_] + (try + #?(:cljs (.send s ppstr) + :clj (.send ^WebSocketClient s ^String ppstr)) - (reset! udt-last-comms_ (enc/now-udt)) - :apparent-success - (catch #?(:clj Throwable :cljs :default) t - (errorf t "Chsk send error") + (reset! udt-last-comms_ (enc/now-udt)) + :apparent-success + (catch #?(:clj Throwable :cljs :default) t + (errorf t "Chsk send error") + nil))) + + (do (when-let [cb-uuid ?cb-uuid] (let [cb-fn* (or (pull-unused-cb-fn! cbs-waiting_ cb-uuid) (have ?cb-fn))] @@ -1250,9 +1281,17 @@ connect-fn (fn connect-fn [] (when (have-handle?) - (let [retry-fn + (let [;; ID for the particular candidate socket to be returned from + ;; this particular connect-fn call + this-socket-id (enc/uuid-str) + own-socket? + (fn [] + (when-let [[_s sid] @socket_] + (= sid this-socket-id))) + + retry-fn (fn [] ; Backoff then recur - (when (have-handle?) + (when (and (have-handle?) (not @client-unloading?_)) (let [retry-count* (swap! retry-count_ inc) backoff-ms (backoff-ms-fn retry-count*) udt-next-reconnect (+ (enc/now-udt) backoff-ms)] @@ -1269,22 +1308,24 @@ on-error #?(:cljs (fn [ws-ev] - (errorf ; ^:meta {:raw-console? true} - "WebSocket error: %s" - (try - (js->clj ws-ev) - (catch :default _ ws-ev))) + (when (own-socket?) + (errorf ; ^:meta {:raw-console? true} + "WebSocket error: %s" + (try + (js->clj ws-ev) + (catch :default _ ws-ev))) - (swap-chsk-state! chsk - #(assoc % :last-ws-error - {:udt (enc/now-udt), :ev ws-ev}))) + (swap-chsk-state! chsk + #(assoc % :last-ws-error + {:udt (enc/now-udt), :ev ws-ev})))) :clj (fn [ex] - (errorf ex "WebSocket error") - (swap-chsk-state! chsk - #(assoc % :last-ws-error - {:udt (enc/now-udt), :ex ex})))) + (when (own-socket?) + (errorf ex "WebSocket error") + (swap-chsk-state! chsk + #(assoc % :last-ws-error + {:udt (enc/now-udt), :ex ex}))))) on-message ; Nb receives both push & cb evs! (fn #?(:cljs [ws-ev] :clj [ppstr]) @@ -1300,7 +1341,7 @@ (reset! udt-last-comms_ (enc/now-udt)) (or - (when (handshake? clj) + (when (and (own-socket?) (handshake? clj)) (receive-handshake! :ws chsk clj) (reset! retry-count_ 0) :handshake) @@ -1317,38 +1358,34 @@ (let [buffered-evs clj] (receive-buffered-evs! chs buffered-evs)))))) - ;; Fires repeatedly (on each connection attempt) while - ;; server is down: on-close - (fn #?(:cljs [ws-ev] :clj [code reason remote]) - (let [last-ws-close - #?(:clj - {:udt (enc/now-udt) - :clean? (= code org.java_websocket.framing.CloseFrame/NORMAL) - :code code - :reason reason} - - :cljs - {:udt (enc/now-udt) - :ev ws-ev - :clean? (enc/oget ws-ev "wasClean") - :code (enc/oget ws-ev "code") - :reason (enc/oget ws-ev "reason")})] - - ;; Firefox calls "onclose" while unloading, - ;; Ref. http://goo.gl/G5BYbn: - (if (:clean? last-ws-close) - (do - (debugf "Clean WebSocket close, will not attempt reconnect") - (swap-chsk-state! chsk - #(assoc % :last-ws-close last-ws-close))) - (do - (swap-chsk-state! chsk - #(assoc (chsk-state->closed % :unexpected) - :last-ws-close last-ws-close)) - (retry-fn))))) - - ?socket + ;; Fires repeatedly (on each connection attempt) while server down + (fn #?(:cljs [ws-ev] :clj [code reason _remote?]) + (when (own-socket?) + (let [;; For codes, Ref. https://www.rfc-editor.org/rfc/rfc6455.html#section-7.1.5 + last-ws-close ; For advanced debugging, etc. + #?(:clj + {:udt (enc/now-udt) + :code code + :reason reason + :clean? (= code 1000)} + + :cljs + {:udt (enc/now-udt) + :code (enc/oget ws-ev "code") + :reason (enc/oget ws-ev "reason") + :clean? (boolean (enc/oget ws-ev "wasClean")) + :ev ws-ev}) + + reason* (if (:clean? last-ws-close) :clean :unexpected)] + + (swap-chsk-state! chsk + #(assoc (chsk-state->closed % reason*) + :last-ws-close last-ws-close)) + + (retry-fn)))) + + ?new-socket (try (ws-constructor (merge ws-opts @@ -1367,15 +1404,20 @@ (errorf t "WebSocket error") nil))] - (if-not ?socket - (retry-fn) ; Couldn't even get a socket + (if-let [new-socket ?new-socket] (do - ;; Clean up the old socket if any exists - (when-let [old-socket @socket_] - #?(:clj (.close ^WebSocketClient old-socket) - :cljs (.close old-socket))) + (when-let [[old-s _old-sid] + (reset-in! socket_ + [new-socket this-socket-id])] + + ;; Close old socket if one exists + (timbre/tracef "Old client WebSocket will be closed") + #?(:clj (.close ^WebSocketClient old-s 1000 "CLOSE_NORMAL") + :cljs (.close old-s 1000 "CLOSE_NORMAL"))) + new-socket) - (reset! socket_ ?socket))))))] + ;; Couldn't create a socket + (retry-fn)))))] (reset! retry-count_ 0) (connect-fn) @@ -1436,8 +1478,9 @@ IChSocket (-chsk-disconnect! [chsk reason] (reset! instance-handle_ nil) ; Disable auto retry - (swap-chsk-state! chsk #(chsk-state->closed % reason)) - (when-let [x @curr-xhr_] (.abort x))) + (let [closed? (swap-chsk-state! chsk #(chsk-state->closed % reason))] + (when-let [x @curr-xhr_] (.abort x)) + closed?)) (-chsk-reconnect! [chsk] (-chsk-disconnect! chsk :requested-reconnect) @@ -1484,8 +1527,7 @@ (if (= ?error :timeout) (when ?cb-fn (?cb-fn :chsk/timeout)) (do - (swap-chsk-state! chsk - #(chsk-state->closed % :unexpected)) + (swap-chsk-state! chsk #(chsk-state->closed % :unexpected)) (when ?cb-fn (?cb-fn :chsk/error)))) (let [content ?content @@ -1508,7 +1550,7 @@ (when (have-handle?) (let [retry-fn (fn [] ; Backoff then recur - (when (have-handle?) + (when (and (have-handle?) (not @client-unloading?_)) (let [retry-count* (inc retry-count) backoff-ms (backoff-ms-fn retry-count*) udt-next-reconnect (+ (enc/now-udt) backoff-ms)] @@ -1558,8 +1600,7 @@ ;; (= ?error :abort) ; Abort => intentional, not an error :else (do - (swap-chsk-state! chsk - #(chsk-state->closed % :unexpected)) + (swap-chsk-state! chsk #(chsk-state->closed % :unexpected)) (retry-fn))) ;; The Ajax long-poller is used only for events, never cbs: