diff --git a/src/taoensso/sente.cljc b/src/taoensso/sente.cljc index 42bfbbf..6d1b5d4 100644 --- a/src/taoensso/sente.cljc +++ b/src/taoensso/sente.cljc @@ -305,8 +305,7 @@ (def ^:private next-idx! (enc/counter)) (declare - ^:private send-buffered-server-evs>ws-clients! - ^:private send-buffered-server-evs>ajax-clients! + ^:private send-buffered-server-evs>clients! ^:private default-client-side-ajax-timeout-ms) (defn allow-origin? @@ -572,11 +571,8 @@ (let [buffered-evs-ppstr (pack packer buffered-evs)] (tracef "buffered-evs-ppstr: %s" buffered-evs-ppstr) - (case conn-type - :ws (send-buffered-server-evs>ws-clients! conns_ - uid buffered-evs-ppstr upd-conn!) - :ajax (send-buffered-server-evs>ajax-clients! conns_ - uid buffered-evs-ppstr))))))] + (send-buffered-server-evs>clients! conn-type + conns_ uid buffered-evs-ppstr)))))] (if (= ev [:chsk/close]) ; Currently undocumented (do @@ -884,29 +880,23 @@ (errorf "ring-req->server-ch-resp error: %s (%s)" error uid sch-uuid))}))))})) -(defn- send-buffered-server-evs>ws-clients! - "Actually pushes buffered events (as packed-str) to all uid's WebSocket conns." - [conns_ uid buffered-evs-pstr upd-conn!] - (tracef "send-buffered-server-evs>ws-clients!: %s" buffered-evs-pstr) - (doseq [[client-id [?sch _udt]] (get-in @conns_ [:ws uid])] - (when-let [sch ?sch] - (upd-conn! :ws uid client-id) - (interfaces/sch-send! sch :websocket buffered-evs-pstr)))) - -(defn- send-buffered-server-evs>ajax-clients! - "Actually pushes buffered events (as packed-str) to all uid's Ajax conns. - Allows some time for possible Ajax poller reconnects." - [conns_ uid buffered-evs-pstr] - (tracef "send-buffered-server-evs>ajax-clients!: %s" buffered-evs-pstr) +(defn- send-buffered-server-evs>clients! + "Actually pushes buffered events (as packed-str) to all uid's conns. + Allows some time for possible reconnects." + [conn-type conns_ uid buffered-evs-pstr] + (tracef "send-buffered-server-evs>clients!: %s %s" conn-type buffered-evs-pstr) + (have? [:el #{:ajax :ws}] conn-type) + (let [ms-backoffs [90 180 360 720 1440] ; Mean 2790s ;; All connected/possibly-reconnecting client uuids: - client-ids-unsatisfied (keys (get-in @conns_ [:ajax uid]))] + client-ids-unsatisfied (keys (get-in @conns_ [conn-type uid])) + websocket? (= conn-type :ws)] (when-not (empty? client-ids-unsatisfied) ;; (tracef "client-ids-unsatisfied: %s" client-ids-unsatisfied) (go-loop [n 0 client-ids-satisfied #{}] (let [?pulled ; nil or { [ ]} - (swap-in! conns_ [:ajax uid] + (swap-in! conns_ [conn-type uid] (fn [m] ; { [ ]} (let [ks-to-pull (remove client-ids-satisfied (keys m))] ;; (tracef "ks-to-pull: %s" ks-to-pull) @@ -915,12 +905,12 @@ (swapped (reduce (fn [m k] - (let [[?sch udt] (get m k)] - ;; Nb don't change udt; for Ajax conns_ we only - ;; want udt updated on poll or close, not on - ;; activity (as with ws conns_) - (assoc m k [nil udt #_(enc/now-udt)]))) - + (let [[?sch udt] (get m k) + new-entry + (if websocket? + [?sch (enc/now-udt)] + [nil udt])] + (assoc m k new-entry))) m ks-to-pull) (select-keys m ks-to-pull))))))] @@ -933,7 +923,7 @@ (let [sent? (when-let [sch ?sch] ;; Will noop + return false if sch already closed: - (interfaces/sch-send! ?sch (not :websocket) + (interfaces/sch-send! sch websocket? buffered-evs-pstr))] (if sent? (conj s client-id) s)))