From e34d7f8a491f313c7857f6fd3f981cb6d77534ec Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Tue, 12 Apr 2016 16:17:21 +0700 Subject: [PATCH] v1.9.0 SNAPSHOT v1.9.0 is a significant, non-breaking upgrade focused on addressing a couple minor issues that were recently identified (#201 and #230). While in the code, also took the opportunity to refactor some implementation details. - @ptaoussanis This is a squashed commit that includes: - Extend ref example to incl. a pair of buttons to excercise async push features. - Drop (experimental) flexi packer. - [#161] Clojure-side Transit optimizations Specifically: 1. Now cache read-handlers and write-handlers. Can't tell if this actually realistically helps perf since I've got no handler maps to test against; feedback welcome. 2. Now cache (thread-local) writer (allows baos reuse). Doesn't seem to actually realistically help perf from what I can tell. Might nix this later since this does add some complexity to the impl. - [#201] Add support for more flexible conn-type upgrade/downgrade Initial downgrade strategy here is simple, may or may not turn out to be useful; still need to check. Point was to get a more flexible base that we can build on in future. - [#230] Server-side ping to help GC non-terminating WebSocket conns. If a WebSocket connection is closed without normal termination (e.g. as caused by sudden loss of power, airplane mode, etc.) - it could hang around in conns_ for an extended period of time until the underlying TCP connection was identified as dead. This mods Sente's keep-alive ping from client->server to server->client, allowing the server to identify (and auto-gc) dead but abnormally terminated WebSocket connections. Big thanks to @altV for helping to catch + diagnose this issue. This change should also help to suppress possible "idle connection" Heroku warnings, Ref. https://goo.gl/zLR0Gk - [#150 #159] Allow server to gc lp conns Using the infrastructure in place for #230, have decided to now initiate Ajax timeouts from the server side instead of the client side. As with #230, this'll help the underlying http server gc any abnormal connections. In particular, this should (?) resolve #159 (for Immutant) w/o the need for the earlier Immutant-side workaround introduced for that issue. --- example-project/project.clj | 20 +- example-project/src/example/client.cljs | 20 +- example-project/src/example/server.clj | 91 +- project.clj | 25 +- src/taoensso/sente.cljx | 1200 ++++++++++------- src/taoensso/sente/interfaces.cljx | 24 +- src/taoensso/sente/packers/transit.cljx | 248 ++-- .../sente/server_adapters/immutant.clj | 24 +- 8 files changed, 887 insertions(+), 765 deletions(-) diff --git a/example-project/project.clj b/example-project/project.clj index 16ebd6a..1017272 100644 --- a/example-project/project.clj +++ b/example-project/project.clj @@ -1,4 +1,4 @@ -(defproject com.taoensso.examples/sente "1.8.2-alpha1" +(defproject com.taoensso.examples/sente "1.9.0-SNAPSHOT" :description "Sente, reference web-app example project" :url "https://github.com/ptaoussanis/sente" :license {:name "Eclipse Public License" @@ -10,23 +10,21 @@ *assert* true} :dependencies - [;; [org.clojure/clojure "1.7.0"] - [org.clojure/clojure "1.8.0"] - - [org.clojure/clojurescript "1.7.170"] + [[org.clojure/clojure "1.8.0"] + [org.clojure/clojurescript "1.9.36"] [org.clojure/core.async "0.2.374"] [org.clojure/tools.nrepl "0.2.12"] ; Optional, for Cider - [com.taoensso/sente "1.8.2-alpha1"] ; <--- Sente + [com.taoensso/sente "1.9.0-SNAPSHOT"] ; <--- Sente [com.taoensso/timbre "4.3.1"] ;;; ---> Choose (uncomment) a supported web server <--- - [http-kit "2.2.0-alpha1"] - ;; [org.immutant/web "2.1.0"] ; v2.1+ recommended + [http-kit "2.2.0-alpha2"] + ;; [org.immutant/web "2.1.4"] ;; [nginx-clojure/nginx-clojure-embed "0.4.2"] ; Needs v0.4.2+ - [ring "1.4.0"] - [ring/ring-defaults "0.2.0"] ; Includes `ring-anti-forgery`, etc. + [ring "1.5.0"] + [ring/ring-defaults "0.2.1"] ; Includes `ring-anti-forgery`, etc. ;; [ring-anti-forgery "1.0.0"] [compojure "1.5.0"] ; Or routing lib of your choice @@ -42,7 +40,7 @@ [lein-ancient "0.6.10"] [com.cemerick/austin "0.1.6"] [lein-cljsbuild "1.1.3"] - [cider/cider-nrepl "0.11.0"] ; Optional, for use with Emacs + [cider/cider-nrepl "0.12.0"] ; Optional, for use with Emacs ] :cljsbuild diff --git a/example-project/src/example/client.cljs b/example-project/src/example/client.cljs index 1df9c61..6d66a92 100644 --- a/example-project/src/example/client.cljs +++ b/example-project/src/example/client.cljs @@ -36,7 +36,7 @@ ;; Serializtion format, must use same val for client + server: packer :edn ; Default packer, a good choice in most cases - ;; (sente-transit/get-flexi-packer :edn) ; Experimental, needs Transit dep + ;; (sente-transit/get-transit-packer) ; Needs Transit dep {:keys [chsk ch-recv send-fn state]} (sente/make-channel-socket-client! @@ -109,6 +109,24 @@ (chsk-send! [:example/button2 {:had-a-callback? "indeed"}] 5000 (fn [cb-reply] (->output! "Callback reply: %s" cb-reply)))))) +(when-let [target-el (.getElementById js/document "btn3")] + (.addEventListener target-el "click" + (fn [ev] + (->output! "Button 3 was clicked (will ask server to test rapid async push)") + (chsk-send! [:example/test-rapid-push])))) + +(when-let [target-el (.getElementById js/document "btn4")] + (.addEventListener target-el "click" + (fn [ev] + (->output! "Button 4 was clicked (will toggle async broadcast loop)") + (chsk-send! [:example/toggle-broadcast] 5000 + (fn [cb-reply] + (when (cb-success? cb-reply) + (let [loop-enabled? cb-reply] + (if loop-enabled? + (->output! "Async broadcast loop now enabled") + (->output! "Async broadcast loop now disabled"))))))))) + (when-let [target-el (.getElementById js/document "btn-login")] (.addEventListener target-el "click" (fn [ev] diff --git a/example-project/src/example/server.clj b/example-project/src/example/server.clj index f592124..858fbec 100644 --- a/example-project/src/example/server.clj +++ b/example-project/src/example/server.clj @@ -55,7 +55,7 @@ (let [;; Serializtion format, must use same val for client + server: packer :edn ; Default packer, a good choice in most cases - ;; (sente-transit/get-flexi-packer :edn) ; Experimental, needs Transit dep + ;; (sente-transit/get-transit-packer) ; Needs Transit dep {:keys [ch-recv send-fn ajax-post-fn ajax-get-or-ws-handshake-fn connected-uids]} @@ -69,6 +69,12 @@ (def connected-uids connected-uids) ; Watchable, read-only atom ) +;; We can watch this atom for changes if we like +(add-watch connected-uids :connected-uids + (fn [_ _ old new] + (when (not= old new) + (infof "Connected uids change: %s" new)))) + ;;;; Ring handlers (defn landing-pg-handler [ring-req] @@ -77,8 +83,12 @@ [:p "An Ajax/WebSocket" [:strong " (random choice!)"] " has been configured for this example"] [:hr] [:p [:strong "Step 1: "] " try hitting the buttons:"] - [:button#btn1 {:type "button"} "chsk-send! (w/o reply)"] - [:button#btn2 {:type "button"} "chsk-send! (with reply)"] + [:p + [:button#btn1 {:type "button"} "chsk-send! (w/o reply)"] + [:button#btn2 {:type "button"} "chsk-send! (with reply)"]] + [:p + [:button#btn3 {:type "button"} "Test rapid server>user async pushes"] + [:button#btn4 {:type "button"} "Toggle server>user async broadcast push loop"]] ;; [:p [:strong "Step 2: "] " observe std-out (for server output) and below (for client output):"] [:textarea#output {:style "width: 100%; height: 200px;"}] @@ -122,6 +132,41 @@ (ring.middleware.defaults/wrap-defaults ring-routes ring.middleware.defaults/site-defaults)) +;;;; Some server>user async push examples + +(defn test-fast-server>user-pushes + "Quickly pushes 100 events to all connected users. Note that this'll be + fast+reliable even over Ajax!" + [] + (doseq [uid (:any @connected-uids)] + (doseq [i (range 100)] + (chsk-send! uid [:fast-push/is-fast (str "hello " i "!!")])))) + +(comment (test-fast-server>user-pushes)) + +(def broadcast-enabled?_ (atom true)) + +(defn start-example-broadcaster! + "As an example of server>user async pushes, setup a loop to broadcast an + event to all connected users every 10 seconds" + [] + (let [broadcast! + (fn [i] + (let [uids (:any @connected-uids)] + (debugf "Broadcasting server>user: %s uids" (count uids)) + (doseq [uid uids] + (chsk-send! uid + [:some/broadcast + {:what-is-this "An async broadcast pushed from server" + :how-often "Every 10 seconds" + :to-whom uid + :i i}]))))] + + (go-loop [i 0] + (user-pushes)) + +(defmethod -event-msg-handler :example/toggle-broadcast + [{:as ev-msg :keys [?reply-fn]}] + (let [loop-enabled? (swap! broadcast-enabled?_ not)] + (?reply-fn loop-enabled?))) + ;; TODO Add your (defmethod -event-msg-handler [ev-msg] )s here... ;;;; Sente event router (our `event-msg-handler` loop) @@ -157,38 +210,6 @@ (sente/start-server-chsk-router! ch-chsk event-msg-handler))) -;;;; Some server>user async push examples - -(defn start-example-broadcaster! - "As an example of server>user async pushes, setup a loop to broadcast an - event to all connected users every 10 seconds" - [] - (let [broadcast! - (fn [i] - (debugf "Broadcasting server>user: %s" @connected-uids) - (doseq [uid (:any @connected-uids)] - (chsk-send! uid - [:some/broadcast - {:what-is-this "An async broadcast pushed from server" - :how-often "Every 10 seconds" - :to-whom uid - :i i}])))] - - (go-loop [i 0] - (user-pushes - "Quickly pushes 100 events to all connected users. Note that this'll be - fast+reliable even over Ajax!" - [] - (doseq [uid (:any @connected-uids)] - (doseq [i (range 100)] - (chsk-send! uid [:fast-push/is-fast (str "hello " i "!!")])))) - -(comment (test-fast-server>user-pushes)) - ;;;; Init stuff (defonce web-server_ (atom nil)) ; {:server _ :port _ :stop-fn (fn [])} diff --git a/project.clj b/project.clj index 98100e1..392f216 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject com.taoensso/sente "1.8.2-alpha1" +(defproject com.taoensso/sente "1.9.0-SNAPSHOT" :author "Peter Taoussanis " :description "Realtime web comms for Clojure/Script" :url "https://github.com/ptaoussanis/sente" @@ -13,7 +13,7 @@ :dependencies [[org.clojure/clojure "1.5.1"] [org.clojure/core.async "0.2.374"] - [com.taoensso/encore "2.49.0"] + [com.taoensso/encore "2.53.1"] [org.clojure/tools.reader "0.10.0"] [com.taoensso/timbre "4.3.1"]] @@ -24,18 +24,19 @@ :1.6 {:dependencies [[org.clojure/clojure "1.6.0"]]} :1.7 {:dependencies [[org.clojure/clojure "1.7.0"]]} :1.8 {:dependencies [[org.clojure/clojure "1.8.0"]]} + :1.9 {:dependencies [[org.clojure/clojure "1.9.0-alpha5"]]} :test {:dependencies [[com.cognitect/transit-clj "0.8.285"] [com.cognitect/transit-cljs "0.8.237"] [org.clojure/test.check "0.9.0"]] :plugins []} - :provided {:dependencies [[org.clojure/clojurescript "1.7.170"]]} + :provided {:dependencies [[org.clojure/clojurescript "1.9.36"]]} :dev - [:1.7 :test + [:1.9 :test :server-jvm {:dependencies - [[http-kit "2.2.0-alpha1"] - [org.immutant/web "2.1.3"] + [[http-kit "2.2.0-alpha2"] + [org.immutant/web "2.1.4"] [nginx-clojure "0.4.4"]] :plugins [;;; These must be in :dev, Ref. https://github.com/lynaghk/cljx/issues/47: @@ -46,7 +47,7 @@ [lein-ancient "0.6.10"] ;; [com.cemerick/austin "0.1.4"] [com.cemerick/clojurescript.test "0.3.3"] - [lein-codox "0.9.4"]]}]} + [lein-codox "0.9.5"]]}]} :cljx {:builds @@ -69,19 +70,17 @@ {:language :clojure ; [:clojure :clojurescript] ; No support? :source-paths ["target/classes"] :source-uri - {#"target/classes" - "https://github.com/ptaoussanis/sente/blob/master/src/{classpath}x#L{line}" - #".*" - "https://github.com/ptaoussanis/sente/blob/master/{filepath}#L{line}"}} + {#"target/classes" "https://github.com/ptaoussanis/sente/blob/master/src/{classpath}x#L{line}" + #".*" "https://github.com/ptaoussanis/sente/blob/master/{filepath}#L{line}"}} :aliases {"test-all" ["do" "clean," "cljx" "once," - "with-profile" "+1.6:+1.7:+1.8" "test," + "with-profile" "+1.9:+1.8:+1.7:+1.6" "test," ;; "with-profile" "+test" "cljsbuild" "test" ] "build-once" ["do" "cljx" "once," "cljsbuild" "once"] "deploy-lib" ["do" "build-once," "deploy" "clojars," "install"] - "start-dev" ["with-profile" "+server-jvm" "repl" ":headless"]} + "start-dev" ["with-profile" "+dev" "repl" ":headless"]} :repositories {"sonatype-oss-public" "https://oss.sonatype.org/content/groups/public/"}) diff --git a/src/taoensso/sente.cljx b/src/taoensso/sente.cljx index 491e0ec..661f93b 100644 --- a/src/taoensso/sente.cljx +++ b/src/taoensso/sente.cljx @@ -1,5 +1,5 @@ (ns taoensso.sente - "Channel sockets. Otherwise known as The Shiz. + "Channel sockets for Clojure/Script. Protocol | client>server | client>server ?+ ack/reply | server>user push * WebSockets: ✓ [1] ✓ @@ -11,8 +11,8 @@ Abbreviations: * chsk - Channel socket (Sente's own pseudo \"socket\") - * server-ch - Underlying web server's async channel that implement Sente's - server channel interface + * server-ch - Underlying web server's async channel that implement + Sente's server channel interface * sch - server-ch alias * uid - User-id. An application-level user identifier used for async push. May have semantic meaning (e.g. username, email address), @@ -20,35 +20,36 @@ * cb - Callback * tout - Timeout * ws - WebSocket/s - * pstr - Packed string. Arbitrary Clojure data serialized as a string - (e.g. edn) for client<->server comms. + * pstr - Packed string. Arbitrary Clojure data serialized as a + string (e.g. edn) for client<->server comms + * udt - Unix timestamp (datetime long) Special messages: - * Callback wrapping: [ ] for [1],[2] + * Callback wrapping: [ ] for [1], [2] * Callback replies: :chsk/closed, :chsk/timeout, :chsk/error + * Client-side events: - [:chsk/handshake [ ]] - [:chsk/state ] - [:chsk/recv <[buffered-evs]>] ; server>user push - [:chsk/ws-error ] ; Experimental, subject to change + [:chsk/handshake [ ]] + [:chsk/state ] + [:chsk/recv ] ; Server>user push + [:chsk/ws-error ] ; Alpha, subject to change * Server-side events: - [:chsk/ws-ping] [:chsk/bad-package ] - [:chsk/bad-event ] - [:chsk/uidport-open] - [:chsk/uidport-close] + [:chsk/bad-event ] + [:chsk/uidport-open ] + [:chsk/uidport-close ] Notable implementation details: * core.async is used liberally where brute-force core.async allows for - significant implementation simplifications. We lean on core.async's strong + significant implementation simplifications. We lean on core.async's efficiency here. * For WebSocket fallback we use long-polling rather than HTTP 1.1 streaming (chunked transfer encoding). Http-kit _does_ support chunked transfer encoding but a small minority of browsers &/or proxies do not. Instead of implementing all 3 modes (WebSockets, streaming, long-polling) - it seemed - reasonable to focus on the two extremes (performance + compatibility). In - any case client support for WebSockets is growing rapidly so fallback + reasonable to focus on the two extremes (performance + compatibility). + In any case client support for WebSockets is growing rapidly so fallback modes will become increasingly irrelevant while the extra simplicity will continue to pay dividends. @@ -82,12 +83,10 @@ [cljs.core.async.macros :as asyncm :refer (go go-loop)])) (if (vector? taoensso.encore/encore-version) - (enc/assert-min-encore-version [2 11 0]) - (enc/assert-min-encore-version 2.11)) - -(def sente-version "Handy for debugging build issues" [1 8 2 "alpha1"]) + (enc/assert-min-encore-version [2 53 1]) + (enc/assert-min-encore-version 2.53)) -;; (timbre/set-level! :trace) ; For debugging +;; (timbre/set-level! :trace) ; Uncomment for debugging ;;;; Events ;; Clients & server both send `event`s and receive (i.e. route) `event-msg`s: @@ -99,7 +98,7 @@ (not (vector? x)) :wrong-type (not (#{1 2} (count x))) :wrong-length :else (let [[ev-id _] x] - (cond (not (keyword? ev-id)) :wrong-id-type + (cond (not (keyword? ev-id)) :wrong-id-type (not (namespace ev-id)) :unnamespaced-id :else nil)))) @@ -109,16 +108,16 @@ (defn assert-event [x] (when-let [?err (validate-event x)] - (let [err-fmt + (let [err-msg (str - (case ?err - :wrong-type "Malformed event (wrong type)." - :wrong-length "Malformed event (wrong length)." - (:wrong-id-type :unnamespaced-id) - "Malformed event (`ev-id` should be a namespaced keyword)." - :else "Malformed event (unknown error).") - " Event should be of `[ev-id ?ev-data]` form: %s")] - (throw (ex-info (format err-fmt (str x)) {:malformed-event x}))))) + (case ?err + :wrong-type "Malformed event (wrong type)." + :wrong-length "Malformed event (wrong length)." + (:wrong-id-type :unnamespaced-id) + "Malformed event (`ev-id` should be a namespaced keyword)." + :else "Malformed event (unknown error).") + " Event should be of `[ev-id ?ev-data]` form: " x)] + (throw (ex-info err-msg {:malformed-event x}))))) (defn client-event-msg? [x] (and @@ -143,7 +142,6 @@ (enc/chan? ch-recv) (ifn? send-fn) (enc/atom? connected-uids) - ;; (map? ring-req) (enc/nblank-str? client-id) (event? event) @@ -171,58 +169,75 @@ ;; * Payloads are packed for client<->server transit. ;; * Packing includes ->str encoding, and may incl. wrapping to carry cb info. -(defn- unpack* "pstr->clj" [packer pstr] - (try - (interfaces/unpack packer (have string? pstr)) - (catch #+clj Throwable #+cljs :default t - (debugf "Bad package: %s (%s)" pstr t) - [:chsk/bad-package pstr]))) - -(defn- with-?meta [x ?m] (if (seq ?m) (with-meta x ?m) x)) -(defn- pack* "clj->prefixed-pstr" - ([packer ?packer-meta clj] - (str "-" ; => Unwrapped (no cb metadata) - (interfaces/pack packer (with-?meta clj ?packer-meta)))) - - ([packer ?packer-meta clj ?cb-uuid] - (let [;;; Keep wrapping as light as possible: - ?cb-uuid (if (= ?cb-uuid :ajax-cb) 0 ?cb-uuid) - wrapped-clj (if ?cb-uuid [clj ?cb-uuid] [clj])] - (str "+" ; => Wrapped (cb metadata) - (interfaces/pack packer (with-?meta wrapped-clj ?packer-meta)))))) - -(defn- pack [& args] - (let [pstr (apply pack* args)] - (tracef "Packing: %s -> %s" args pstr) - pstr)) - (defn- unpack "prefixed-pstr->[clj ?cb-uuid]" [packer prefixed-pstr] (have? string? prefixed-pstr) - (let [prefix (enc/substr prefixed-pstr 0 1) - pstr (enc/substr prefixed-pstr 1) - clj (unpack* packer pstr) ; May be un/wrapped - wrapped? (case prefix "-" false "+" true) + (let [wrapped? (enc/str-starts-with? prefixed-pstr "+") + pstr (subs prefixed-pstr 1) + clj + (try + (interfaces/unpack packer pstr) + (catch #+clj Throwable #+cljs :default t + (debugf "Bad package: %s (%s)" pstr t) + [:chsk/bad-package pstr])) + [clj ?cb-uuid] (if wrapped? clj [clj nil]) ?cb-uuid (if (= 0 ?cb-uuid) :ajax-cb ?cb-uuid)] + (tracef "Unpacking: %s -> %s" prefixed-pstr [clj ?cb-uuid]) [clj ?cb-uuid])) +(defn- with-?meta [x ?m] (if (seq ?m) (with-meta x ?m) x)) + +(defn- pack "clj->prefixed-pstr" + ([packer ?packer-meta clj] + (let [pstr + (str "-" ; => Unwrapped (no cb metadata) + (interfaces/pack packer (with-?meta clj ?packer-meta)))] + (tracef "Packing (unwrapped): %s -> %s" [?packer-meta clj] pstr) + pstr)) + + ([packer ?packer-meta clj ?cb-uuid] + (let [;;; Keep wrapping as light as possible: + ?cb-uuid (if (= ?cb-uuid :ajax-cb) 0 ?cb-uuid) + wrapped-clj (if ?cb-uuid [clj ?cb-uuid] [clj]) + pstr + (str "+" ; => Wrapped (cb metadata) + (interfaces/pack packer (with-?meta wrapped-clj ?packer-meta)))] + (tracef "Packing (wrapped): %s -> %s" [?packer-meta clj ?cb-uuid] pstr) + pstr))) + +(deftype EdnPacker [] + interfaces/IPacker + (pack [_ x] (enc/pr-edn x)) + (unpack [_ s] (enc/read-edn s))) + +(def ^:private default-edn-packer (EdnPacker.)) + +(defn- coerce-packer [x] + (if (= x :edn) + default-edn-packer + (have #(satisfies? interfaces/IPacker %) x))) + (comment - (do (require '[taoensso.sente.packers.transit :as transit]) - (def edn-packer interfaces/edn-packer) - (def flexi-packer (transit/get-flexi-packer))) - (unpack edn-packer (pack edn-packer nil "hello")) - (unpack flexi-packer (pack flexi-packer nil "hello")) - (unpack flexi-packer (pack flexi-packer {} [:foo/bar {}] "my-cb-uuid")) - (unpack flexi-packer (pack flexi-packer {:json true} [:foo/bar {}] "my-cb-uuid")) - (unpack flexi-packer (pack flexi-packer {} [:foo/bar {}] :ajax-cb))) + (do + (require '[taoensso.sente.packers.transit :as transit]) + (def ^:private default-transit-json-packer (transit/get-transit-packer))) + + (let [pack interfaces/pack + unpack interfaces/unpack + data {:a :A :b :B :c "hello world"}] + + (enc/qb 10000 + (let [pk default-edn-packer] (unpack pk (pack pk data))) + (let [pk default-transit-json-packer] (unpack pk (pack pk data)))))) ;;;; Server API (declare ^:private send-buffered-server-evs>ws-clients! - ^:private send-buffered-server-evs>ajax-clients!) + ^:private send-buffered-server-evs>ajax-clients! + ^:private default-client-side-ajax-timeout-ms) (defn make-channel-socket-server! "Takes a web server adapter[1] and returns a map with keys: @@ -236,9 +251,12 @@ :user-id-fn ; (fn [ring-req]) -> unique user-id for server>user push. :csrf-token-fn ; (fn [ring-req]) -> CSRF token for Ajax POSTs. :handshake-data-fn ; (fn [ring-req]) -> arb user data to append to handshake evs. + :ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity + ; w/in given msecs. + :lp-timeout-ms ; Timeout (repoll) long-polling Ajax conns after given msecs. :send-buf-ms-ajax ; [2] :send-buf-ms-ws ; [2] - :packer ; :edn (default), or an IPacker implementation (experimental). + :packer ; :edn (default), or an IPacker implementation. [1] e.g. `taoensso.sente.server-adapters.http-kit/http-kit-adapter` or `taoensso.sente.server-adapters.immutant/immutant-adapter`. @@ -251,9 +269,12 @@ after send call (larger values => larger batch windows)." [web-server-adapter ; Actually a server-ch-adapter, but that may be confusing - & [{:keys [recv-buf-or-n send-buf-ms-ajax send-buf-ms-ws + & [{:keys [recv-buf-or-n ws-kalive-ms lp-timeout-ms + send-buf-ms-ajax send-buf-ms-ws user-id-fn csrf-token-fn handshake-data-fn packer] - :or {recv-buf-or-n (async/sliding-buffer 1000) + :or {recv-buf-or-n (async/sliding-buffer 1000) + ws-kalive-ms (enc/ms :secs 25) ; < Heroku 55s timeout + lp-timeout-ms (enc/ms :secs 20) ; < Heroku 30s timeout send-buf-ms-ajax 100 send-buf-ms-ws 30 user-id-fn (fn [ring-req] (get-in ring-req [:session :uid])) @@ -264,17 +285,18 @@ handshake-data-fn (fn [ring-req] nil) packer :edn}}]] - {:pre [(have? enc/pos-int? send-buf-ms-ajax send-buf-ms-ws) - (have? #(satisfies? interfaces/IServerChanAdapter %) - web-server-adapter)]} + (have? enc/pos-int? send-buf-ms-ajax send-buf-ms-ws) + (have? #(satisfies? interfaces/IServerChanAdapter %) web-server-adapter) - (let [packer (interfaces/coerce-packer packer) + (let [max-ms default-client-side-ajax-timeout-ms] + (when (>= lp-timeout-ms max-ms) + (throw + (ex-info (str ":lp-timeout-ms must be < " max-ms) + {:lp-timeout-ms lp-timeout-ms + :default-client-side-ajax-timeout-ms max-ms})))) + + (let [packer (coerce-packer packer) ch-recv (chan recv-buf-or-n) - conns_ (atom {:ws {} ; { { }} - :ajax {} ; { { [ ]}} - }) - connected-uids_ (atom {:ws #{} :ajax #{} :any #{}}) - send-buffers_ (atom {:ws {} :ajax {}}) ; { [ <#{ev-uuids}>]} user-id-fn (fn [ring-req client-id] @@ -282,13 +304,38 @@ ;; of security implications. (or (user-id-fn (assoc ring-req :client-id client-id)) ::nil-uid)) + ;; :ws udts used for ws-kalive (to check for activity in window period) + ;; :ajax udts used for lp-timeout (as a way to check active conn identity) + conns_ (atom {:ws {} :ajax {}}) ; { { [ ]}} + send-buffers_ (atom {:ws {} :ajax {}}) ; { [ <#{ev-uuids}>]} + connected-uids_ (atom {:ws #{} :ajax #{} :any #{}}) ; Public + + upd-conn! + (fn + ([conn-type uid client-id] ; Update udt + (swap-in! conns_ [conn-type uid client-id] + (fn [?v] + (let [[?sch _udt] ?v + new-udt (enc/now-udt)] + (enc/swapped + [?sch new-udt] + {:init? (nil? ?v) :udt new-udt :?sch ?sch}))))) + + ([conn-type uid client-id new-?sch] ; Update sch + udt + (swap-in! conns_ [conn-type uid client-id] + (fn [?v] + (let [new-udt (enc/now-udt)] + (enc/swapped + [new-?sch new-udt] + {:init? (nil? ?v) :udt new-udt :?sch new-?sch})))))) + connect-uid! - (fn [type uid] {:pre [(have? uid)]} + (fn [conn-type uid] {:pre [(have? uid)]} (let [newly-connected? (swap-in! connected-uids_ [] (fn [{:keys [ws ajax any] :as old-m}] (let [new-m - (case type + (case conn-type :ws {:ws (conj ws uid) :ajax ajax :any (conj any uid)} :ajax {:ws ws :ajax (conj ajax uid) :any (conj any uid)})] (swapped new-m @@ -299,7 +346,7 @@ :newly-connected))))))] newly-connected?)) - upd-connected-uid! ; Useful for atomic disconnects + upd-connected-uid! (fn [uid] {:pre [(have? uid)]} (let [newly-disconnected? (swap-in! connected-uids_ [] @@ -313,41 +360,45 @@ {:ws (if any-ws-clients? (conj ws uid) (disj ws uid)) :ajax (if any-ajax-clients? (conj ajax uid) (disj ajax uid)) :any (if any-clients? (conj any uid) (disj any uid))}] + (swapped new-m (let [old-any (:any old-m) new-any (:any new-m)] (when (and (contains? old-any uid) (not (contains? new-any uid))) :newly-disconnected))))))] + newly-disconnected?)) send-fn ; server>user (by uid) push (fn [user-id ev & [{:as opts :keys [flush?]}]] - (let [uid (if (= user-id :sente/all-users-without-uid) ::nil-uid user-id) - _ (tracef "Chsk send: (->uid %s) %s" uid ev) - _ (assert uid - (str "Support for sending to `nil` user-ids has been REMOVED. " - "Please send to `:sente/all-users-without-uid` instead.")) - _ (assert-event ev) + (let [uid (if (= user-id :sente/all-users-without-uid) ::nil-uid user-id) + _ (tracef "Chsk send: (->uid %s) %s" uid ev) + _ (assert uid + (str "Support for sending to `nil` user-ids has been REMOVED. " + "Please send to `:sente/all-users-without-uid` instead.")) + _ (assert-event ev) + ev-uuid (enc/uuid-str) flush-buffer! - (fn [type] + (fn [conn-type] (when-let [pulled - (swap-in! send-buffers_ [type] + (swap-in! send-buffers_ [conn-type] (fn [m] ;; Don't actually flush unless the event buffered ;; with _this_ send call is still buffered (awaiting ;; flush). This means that we'll have many (go ;; block) buffer flush calls that'll noop. They're - ;; cheap, and this approach is preferable to + ;; cheap, and this approach is preferable to ;; alternatives like flush workers. (let [[_ ev-uuids] (get m uid)] (if (contains? ev-uuids ev-uuid) (swapped (dissoc m uid) (get m uid)) (swapped m nil)))))] + (let [[buffered-evs ev-uuids] pulled] (have? vector? buffered-evs) (have? set? ev-uuids) @@ -357,11 +408,13 @@ buffered-evs-ppstr (pack packer combined-packer-meta buffered-evs)] + (tracef "buffered-evs-ppstr: %s (with meta %s)" buffered-evs-ppstr combined-packer-meta) - (case type - :ws (send-buffered-server-evs>ws-clients! conns_ - uid buffered-evs-ppstr) + + (case conn-type + :ws (send-buffered-server-evs>ws-clients! conns_ + uid buffered-evs-ppstr upd-conn!) :ajax (send-buffered-server-evs>ajax-clients! conns_ uid buffered-evs-ppstr))))))] @@ -369,20 +422,19 @@ (do (debugf "Chsk closing (client may reconnect): %s" uid) (when flush? - (doseq [type [:ws :ajax]] - (flush-buffer! type))) + (flush-buffer! :ws) + (flush-buffer! :ajax)) - (doseq [server-ch (vals (get-in @conns_ [:ws uid]))] - (interfaces/sch-close! server-ch)) + (doseq [[?sch _udt] (vals (get-in @conns_ [:ws uid]))] + (when-let [sch ?sch] (interfaces/sch-close! sch))) - (doseq [[?server-ch _] (vals (get-in @conns_ [:ajax uid]))] - (when-let [server-ch ?server-ch] - (interfaces/sch-close! server-ch)))) + (doseq [[?sch _udt] (vals (get-in @conns_ [:ajax uid]))] + (when-let [sch ?sch] (interfaces/sch-close! sch)))) (do ;; Buffer event - (doseq [type [:ws :ajax]] - (swap-in! send-buffers_ [type uid] + (doseq [conn-type [:ws :ajax]] + (swap-in! send-buffers_ [conn-type uid] (fn [?v] (if-not ?v [[ev] #{ev-uuid}] @@ -393,20 +445,30 @@ ;;; Flush event buffers after relevant timeouts: ;; * May actually flush earlier due to another timeout. ;; * We send to _all_ of a uid's connections. - ;; * Broadcasting is possible but I'd suggest doing it rarely, and - ;; only to users we know/expect are actually online. - (go (when-not flush? (ch-recv! ch-recv (merge ev-msg-const @@ -428,24 +498,17 @@ ;; user's convenience. non-lp-POSTs don't actually need a ;; client-id for Sente's own implementation: :client-id client-id #_"unnecessary-for-non-lp-POSTs" - :ring-req ring-req :event clj :uid (user-id-fn ring-req client-id) - :?reply-fn - (when has-cb? - (fn reply-fn [resp-clj] ; Any clj form - (tracef "Chsk send (ajax reply): %s" resp-clj) - ;; true iff apparent success: - (interfaces/sch-send! server-ch - (pack packer (meta resp-clj) resp-clj) - :close-after-send)))})) - - (when-not has-cb? - (tracef "Chsk send (ajax reply): dummy-cb-200") - (interfaces/sch-send! server-ch - (pack packer nil :chsk/dummy-cb-200) - :close-after-send))))})) + :?reply-fn (when has-cb? reply-fn)})) + + (if has-cb? + (when-let [ms lp-timeout-ms] + (go + (ch-recv! ch-recv - (merge ev-msg-const - {:client-id client-id - :ring-req ring-req - :event event - :?reply-fn ?reply-fn - :uid uid}))) - - handshake! + (fn self + ([event ] (self event nil)) + ([event ?reply-fn] + (put-server-event-msg>ch-recv! ch-recv + (merge ev-msg-const + {:client-id client-id + :ring-req ring-req + :event event + :?reply-fn ?reply-fn + :uid uid})))) + + send-handshake! (fn [server-ch] - (tracef "Handshake!") + (tracef "send-handshake!") (let [?handshake-data (handshake-data-fn ring-req) handshake-ev - (if-not (nil? ?handshake-data) ; Micro optimization - [:chsk/handshake [uid csrf-token ?handshake-data]] - [:chsk/handshake [uid csrf-token]])] - (interfaces/sch-send! server-ch + (if (nil? ?handshake-data) ; Micro optimization + [:chsk/handshake [uid csrf-token]] + [:chsk/handshake [uid csrf-token ?handshake-data]])] + (interfaces/-sch-send! server-ch (pack packer nil handshake-ev) (not websocket?))))] (if (str/blank? client-id) (let [err-msg "Client's Ring request doesn't have a client id. Does your server have the necessary keyword Ring middleware (`wrap-params` & `wrap-keyword-params`)?"] - (errorf (str err-msg ": %s") ring-req) + (errorf (str err-msg ": %s") ring-req) ; Careful re: % in req (throw (ex-info err-msg {:ring-req ring-req}))) (interfaces/ring-req->server-ch-resp web-server-adapter ring-req {:on-open (fn [server-ch] (if websocket? - (do ; WebSocket handshake - (tracef "New WebSocket channel: %s (%s)" - uid (str server-ch)) ; _Must_ call `str` on server-ch - (reset-in! conns_ [:ws uid client-id] server-ch) - (when (connect-uid! :ws uid) - (receive-event-msg! [:chsk/uidport-open])) - (handshake! server-ch)) - ;; Ajax handshake/poll connection: - (let [initial-conn-from-client? - (swap-in! conns_ [:ajax uid client-id] - (fn [?v] (swapped [server-ch (enc/now-udt)] (nil? ?v)))) + ;; WebSocket handshake + (let [_ (tracef "New WebSocket channel: %s (%s)" uid sch-uuid) + updated-conn (upd-conn! :ws uid client-id server-ch) + udt-open (:udt updated-conn)] - handshake? (or initial-conn-from-client? - (:handshake? params))] + (when (connect-uid! :ws uid) + (receive-event-msg! [:chsk/uidport-open uid])) + + (send-handshake! server-ch) + + ;; Start ws-kalive loop + ;; This also works to gc ws conns that were suddenly + ;; terminated (e.g. by turning on airplane mode) + (when-let [ms ws-kalive-ms] + (go-loop [udt-t0 udt-open] + (client (should auto-close conn if it's + ;; gone dead) + (interfaces/-sch-send! server-ch + (pack packer nil :chsk/ws-ping) + (not :close-after-send))) + (recur udt-t1)))))) + + ;; Ajax handshake/poll + (let [_ (tracef "New Ajax handshake/poll: %s (%s)" uid sch-uuid) + updated-conn (upd-conn! :ajax uid client-id server-ch) + udt-open (:udt updated-conn) + handshake? (or (:init? updated-conn) (:handshake? params))] (when (connect-uid! :ajax uid) - (receive-event-msg! [:chsk/uidport-open])) - - ;; Client will immediately repoll: - (when handshake? (handshake! server-ch))))) + (receive-event-msg! [:chsk/uidport-open uid])) + + (if handshake? + ; Client will immediately repoll + (send-handshake! server-ch) + + (when-let [ms lp-timeout-ms] + (go + (= udt-disconnected - ?udt-last-connected))] - (if-not disconnected? - (swapped ?m (not :disconnected)) - (let [new-m (dissoc ?m client-id)] - (swapped - (if (empty? new-m) :swap/dissoc new-m) - :disconnected))))))] - (when disconnected? (when (upd-connected-uid! uid) - (receive-event-msg! [:chsk/uidport-close])))))))))}))))})) + (receive-event-msg! [:chsk/uidport-close uid])))))))))}))))})) (defn- send-buffered-server-evs>ws-clients! "Actually pushes buffered events (as packed-str) to all uid's WebSocket conns." - [conns_ uid buffered-evs-pstr] + [conns_ uid buffered-evs-pstr upd-conn!] (tracef "send-buffered-server-evs>ws-clients!: %s" buffered-evs-pstr) - (doseq [server-ch (vals (get-in @conns_ [:ws uid]))] - (interfaces/sch-send! server-ch buffered-evs-pstr))) + (doseq [[client-id [?sch _udt]] (get-in @conns_ [:ws uid])] + (when-let [sch ?sch] + (upd-conn! :ws uid client-id) + (interfaces/-sch-send! sch buffered-evs-pstr + (not :close-after-send))))) (defn- send-buffered-server-evs>ajax-clients! "Actually pushes buffered events (as packed-str) to all uid's Ajax conns. Allows some time for possible Ajax poller reconnects." - [conns_ uid buffered-evs-pstr & [{:keys [nmax-attempts ms-base ms-rand] - ;; <= 7 attempts at ~135ms ea = 945ms - :or {nmax-attempts 7 - ms-base 90 - ms-rand 90}}]] - (comment (* 7 (+ 90 (/ 90 2.0)))) + [conns_ uid buffered-evs-pstr] (tracef "send-buffered-server-evs>ajax-clients!: %s" buffered-evs-pstr) - (let [;; All connected/possibly-reconnecting client uuids: + (let [nmax-attempts 7 + ms-base 90 + ms-rand 90 + ;; (* 7 (+ 90 (/ 90 2.0))) ~= 945ms + + ;; All connected/possibly-reconnecting client uuids: client-ids-unsatisfied (keys (get-in @conns_ [:ajax uid]))] + (when-not (empty? client-ids-unsatisfied) ;; (tracef "client-ids-unsatisfied: %s" client-ids-unsatisfied) (go-loop [n 0 client-ids-satisfied #{}] - (let [?pulled ; nil or { [ ]} + (let [?pulled ; nil or { [ ]} (swap-in! conns_ [:ajax uid] - (fn [m] ; { [ ]} + (fn [m] ; { [ ]} (let [ks-to-pull (remove client-ids-satisfied (keys m))] ;; (tracef "ks-to-pull: %s" ks-to-pull) (if (empty? ks-to-pull) @@ -605,22 +688,29 @@ (swapped (reduce (fn [m k] - (let [[?server-ch udt-last-connected] (get m k)] - (assoc m k [nil udt-last-connected]))) + (let [[?sch _udt] (get m k)] + (assoc m k [nil #_udt (enc/now-udt)]))) + m ks-to-pull) (select-keys m ks-to-pull))))))] + (have? [:or nil? map?] ?pulled) + (let [?newly-satisfied (when ?pulled (reduce-kv - (fn [s client-id [?server-ch _]] - (if (or (nil? ?server-ch) - ;; server-ch may have closed already (`send!` will noop): - (not (interfaces/sch-send! ?server-ch buffered-evs-pstr - :close-after-send))) - s - (conj s client-id))) #{} ?pulled)) + (fn [s client-id [?sch _udt]] + (let [sent? + (when-let [sch ?sch] + ;; Will noop + return false if sch already closed: + (interfaces/sch-send! ?sch buffered-evs-pstr + :close-after-send))] + + (if sent? (conj s client-id) s))) + #{} ?pulled)) + now-satisfied (into client-ids-satisfied ?newly-satisfied)] + ;; (tracef "now-satisfied: %s" now-satisfied) (when (and (< n nmax-attempts) (some (complement now-satisfied) client-ids-unsatisfied)) @@ -633,11 +723,17 @@ #+cljs (def ajax-lite "Alias of `taoensso.encore/ajax-lite`" enc/ajax-lite) #+cljs (defprotocol IChSocket - (-chsk-connect! [chsk] "Implementation detail") - (-chsk-send! [chsk ev opts] "Implementation detail") - (chsk-destroy! [chsk] "DEPRECATED: Please use `chsk-disconnect!` instead") - (chsk-disconnect! [chsk] "Disconnects channel socket") - (chsk-reconnect! [chsk] "Reconnects channel socket (useful for reauthenticating after login/logout, etc.)")) + (-chsk-connect! [chsk]) + (-chsk-disconnect! [chsk reconn?]) + (-chsk-reconnect! [chsk]) + (-chsk-send! [chsk ev opts])) + +#+cljs (defn chsk-connect! [chsk] (-chsk-connect! chsk)) +#+cljs (defn chsk-destroy! "Deprecated" [chsk] (-chsk-disconnect! chsk false)) +#+cljs (defn chsk-disconnect! [chsk] (-chsk-disconnect! chsk false)) +#+cljs +(defn chsk-reconnect! "Useful for reauthenticating after login/logout, etc." + [chsk] (-chsk-reconnect! chsk)) #+cljs (defn chsk-send! @@ -646,18 +742,23 @@ ([chsk ev ?timeout-ms ?cb] (chsk-send! chsk ev {:timeout-ms ?timeout-ms :cb ?cb})) ([chsk ev opts] - (tracef "Chsk send: (%s) %s" (assoc opts :cb (boolean (:cb opts))) ev) - (-chsk-send! chsk ev opts))) + (tracef "Chsk send: (%s) %s" (assoc opts :cb (boolean (:cb opts))) ev) + (-chsk-send! chsk ev opts))) + +#+cljs +(defn- chsk-send->closed! [?cb-fn] + (warnf "Chsk send against closed chsk.") + (when ?cb-fn (?cb-fn :chsk/closed)) + false) #+cljs (defn- assert-send-args [x ?timeout-ms ?cb] (assert-event x) (assert (or (and (nil? ?timeout-ms) (nil? ?cb)) (and (enc/nneg-int? ?timeout-ms))) - (format "cb requires a timeout; timeout-ms should be a +ive integer: %s" - ?timeout-ms)) + (str "cb requires a timeout; timeout-ms should be a +ive integer: " ?timeout-ms)) (assert (or (nil? ?cb) (ifn? ?cb) (enc/chan? ?cb)) - (format "cb should be nil, an ifn, or a channel: %s" (type ?cb)))) + (str "cb should be nil, an ifn, or a channel: " (type ?cb)))) #+cljs (defn- pull-unused-cb-fn! [cbs-waiting_ ?cb-uuid] @@ -671,19 +772,27 @@ (swap-in! state_ [] (fn [old-state] (let [new-state (merge old-state merge-state) + ;; Is this a reasonable way of helping client distinguish - ;; cause of an auto reconnect? Didn't give it much thought... - new-state (if-not (and (:requested-reconnect-pending? old-state) - (:open? new-state) - (not (:open? old-state))) - new-state - (-> new-state - (dissoc :requested-reconnect-pending?) - (assoc :requested-reconnect? true)))] + ;; cause of an auto reconnect? Didn't give it much + ;; thought... + requested-reconnect? + (and (:requested-reconnect-pending? old-state) + (do (:open? new-state)) + (not (:open? old-state))) + + new-state + (if requested-reconnect? + (-> new-state + (dissoc :requested-reconnect-pending?) + (assoc :requested-reconnect? true)) + (dissoc new-state :requested-reconnect?))] + (swapped new-state [old-state new-state]))))] + (when (not= old-state new-state) ;; (debugf "Chsk state change: %s" new-state) - (put! (:state chs) new-state) + (put! (:state chs) [:chsk/state new-state]) new-state))) #+cljs @@ -692,14 +801,17 @@ instead of a cb-fn. The channel will receive values of form [.cb ]." [?cb ev] - (if (or (nil? ?cb) (ifn? ?cb)) ?cb - (do (have? enc/chan? ?cb) - (assert-event ev) - (let [[ev-id _] ev - cb-ch ?cb] - (fn [reply] - (put! cb-ch [(keyword (str (enc/fq-name ev-id) ".cb")) - reply])))))) + (if (or (nil? ?cb) (ifn? ?cb)) + ?cb + (do + (have? enc/chan? ?cb) + (assert-event ev) + (let [[ev-id _] ev + cb-ch ?cb] + (fn [reply] + (put! cb-ch + [(keyword (str (enc/fq-name ev-id) ".cb")) + reply])))))) #+cljs (defn- receive-buffered-evs! [chs clj] @@ -707,51 +819,75 @@ (let [buffered-evs (have vector? clj)] (doseq [ev buffered-evs] (assert-event ev) + ;; Should never receive :chsk/* events from server here: + (let [[id] ev] (assert (not= (namespace id) "chsk"))) (put! (:chsk-state! chsk - {:open? true - :uid ?uid - :csrf-token ?csrf-token +#+cljs +(defn- receive-handshake! [chsk-type chsk clj] + (have? [:el #{:ws :ajax}] chsk-type) + (have? handshake? clj) + (tracef "receive-handshake! (%s): %s" chsk-type clj) + (let [[_ [?uid ?csrf-token ?handshake-data]] clj + {:keys [chs ever-opened?_]} chsk + first-handshake? (compare-and-set! ever-opened?_ false true) + new-state + {:type chsk-type ; :auto -> e/o #{:ws :ajax}, etc. + :open? true + :ever-opened? true + :uid ?uid + :csrf-token ?csrf-token + :handshake-data ?handshake-data + :first-open? first-handshake?} + + handshake-ev + [:chsk/handshake + [?uid ?csrf-token ?handshake-data first-handshake?]]] + + (assert-event handshake-ev) + (when (str/blank? ?csrf-token) + (warnf "SECURITY WARNING: no CSRF token available for use by Sente")) + + (merge>chsk-state! chsk new-state) + (put! (:internal chs) handshake-ev) + + :handled)) - ;; Could also just merge ?handshake-data into chsk state here, but - ;; it seems preferable (?) to instead register a unique - ;; :chsk/handshake event - }) +#+cljs +(defrecord ChWebSocket + ;; WebSocket-only IChSocket implementation + ;; Handles (re)connections, cbs, etc. - (assert-event handshake-ev) - (put! (:internal chs) handshake-ev) + [client-id chs params packer url + state_ ; {:type _ :open? _ :uid _ :csrf-token _} + active-retry-id_ retry-count_ ever-opened?_ + backoff-ms-fn ; (fn [nattempt]) -> msecs + cbs-waiting_ ; { ...} + socket_ err-fn] - :handled)))) + IChSocket + (-chsk-disconnect! [chsk reconn?] + (reset! active-retry-id_ "_disable-auto-retry") + (if reconn? + (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (merge>chsk-state! chsk {:open? false})) + (when-let [s @socket_] (.close s 1000 "CLOSE_NORMAL"))) -#+cljs -(defrecord ChWebSocket ; Handles (re)connections, keep-alives, cbs, etc. - [client-id chs params packer url - state_ ; {:type _ :open? _ :uid _ :csrf-token _} - cbs-waiting_ ; { ...} - socket_ kalive-ms kalive-timer_ kalive-due?_ - backoff-ms-fn ; (fn [nattempt]) -> msecs - active-retry-id_ retry-count_] + (-chsk-reconnect! [chsk] + (-chsk-disconnect! chsk :reconn) + (-chsk-connect! chsk)) - IChSocket - (-chsk-send! [chsk ev {:as opts ?timeout-ms :timeout-ms ?cb :cb :keys [flush?]}] - (assert-send-args ev ?timeout-ms ?cb) - (let [?cb-fn (cb-chan-as-fn ?cb ev)] + (-chsk-send! [chsk ev opts] + (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts + _ (assert-send-args ev ?timeout-ms ?cb) + ?cb-fn (cb-chan-as-fn ?cb ev)] (if-not (:open? @state_) ; Definitely closed - (do (warnf "Chsk send against closed chsk.") - (when ?cb-fn (?cb-fn :chsk/closed))) + (chsk-send->closed! ?cb-fn) ;; TODO Buffer before sending (but honor `:flush?`) (let [?cb-uuid (when ?cb-fn (enc/uuid-str 6)) @@ -766,7 +902,6 @@ (try (.send @socket_ ppstr) - (reset! kalive-due?_ false) :apparent-success (catch js/Error e (errorf e "Chsk send error") @@ -776,19 +911,6 @@ (cb-fn* :chsk/error))) false)))))) - (chsk-destroy! [chsk] (chsk-disconnect! chsk)) - (chsk-disconnect! [chsk] - (reset! active-retry-id_ "disconnected") - (when-let [t @kalive-timer_] (.clearInterval js/window t)) - (merge>chsk-state! chsk {:open? false}) - (when-let [s @socket_] (.close s 1000 "CLOSE_NORMAL"))) - - (chsk-reconnect! [chsk] - (reset! active-retry-id_ "reconnecting") - (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) - (when-let [s @socket_] (.close s 3000 "SENTE_RECONNECT")) - (-chsk-connect! chsk)) - (-chsk-connect! [chsk] (when-let [WebSocket (or (enc/oget js/window "WebSocket") (enc/oget js/window "MozWebSocket"))] @@ -800,9 +922,7 @@ (when (= @active-retry-id_ retry-id) (let [retry-count* (swap! retry-count_ inc) backoff-ms (backoff-ms-fn retry-count*)] - (.clearInterval js/window @kalive-timer_) - (warnf "Chsk is closed: will try reconnect (%s)" - retry-count*) + (warnf "Chsk is closed: will try reconnect (%s)" retry-count*) (.setTimeout js/window connect-fn backoff-ms)))) ?socket @@ -823,22 +943,30 @@ (aset "onerror" (fn [ws-ev] (errorf "WebSocket error: %s" ws-ev) - ;; Experimental, for #214; (put! (:internal chs) [:chsk/ws-error ws-ev]) + (when-let [ef err-fn] (ef chsk)) nil)) (aset "onmessage" ; Nb receives both push & cb evs! (fn [ws-ev] - (let [;; Nb may or may NOT satisfy `event?` since we also - ;; receive cb replies here! This is actually why - ;; we prefix our pstrs to indicate whether they're - ;; wrapped or not. - ppstr (enc/oget ws-ev "data") + (let [ppstr (enc/oget ws-ev "data") [clj ?cb-uuid] (unpack packer ppstr)] + + ;; Nb may or may NOT satisfy `event?` since we + ;; also receive cb replies here! This is actually + ;; why we prefix our pstrs to indicate whether + ;; they're wrapped or not. ;; (assert-event clj) ;; NO! + (or - (and (handle-when-handshake! chsk chs clj) - (reset! retry-count_ 0)) + (when (handshake? clj) + (receive-handshake! :ws chsk clj) + (reset! retry-count_ 0)) + + (when (= clj :chsk/ws-ping) + #_(receive-buffered-evs! chs [[:debug/ws-ping]]) + :noop) + (if-let [cb-uuid ?cb-uuid] (if-let [cb-fn (pull-unused-cb-fn! cbs-waiting_ cb-uuid)] @@ -847,16 +975,9 @@ (let [buffered-evs clj] (receive-buffered-evs! chs buffered-evs))))))) - (aset "onopen" + #_(aset "onopen" (fn [_ws-ev] - (reset! kalive-timer_ - (.setInterval js/window - (fn [] - (when @kalive-due?_ ; Don't ping unnecessarily - (chsk-send! chsk [:chsk/ws-ping])) - (reset! kalive-due?_ true)) - kalive-ms)) - ;; NO, better for server to send a handshake!: + ;; NO, better for server to send a handshake: ;; (merge>chsk-state! chsk {:open? true}) )) @@ -870,7 +991,7 @@ ;; Firefox calls "onclose" while unloading, ;; Ref. http://goo.gl/G5BYbn: - (if (and clean? (not= reason "SENTE_RECONNECT")) + (if clean? (debugf "Clean WebSocket close, will not attempt reconnect") (do (merge>chsk-state! chsk {:open? false}) @@ -882,25 +1003,59 @@ chsk)))) #+cljs -(defrecord ChAjaxSocket ; Handles (re)polling, etc. - [client-id chs params packer url state_ - timeout-ms ajax-opts curr-xhr_ - active-retry-id_ - backoff-ms-fn] +(defn- new-ChWebSocket [opts] + (map->ChWebSocket + (merge + {:state_ (atom {:type :ws :open? false}) + :active-retry-id_ (atom "_pending") + :retry-count_ (atom 0) + :ever-opened?_ (atom false) + :cbs-waiting_ (atom {}) + :socket_ (atom nil)} + opts))) + +(def ^:private default-client-side-ajax-timeout-ms + "We must set *some* client-side timeout otherwise an unpredictable (and + probably too short) browser default will be used. Must be > server's + :lp-timeout-ms." + (enc/ms :secs 60)) + +#+cljs +(defrecord ChAjaxSocket + ;; Ajax-only IChSocket implementation + ;; Handles (re)polling, etc. + + [client-id chs params packer url state_ + active-retry-id_ ever-opened?_ + backoff-ms-fn + ajax-opts curr-xhr_] IChSocket - (-chsk-send! [chsk ev {:as opts ?timeout-ms :timeout-ms ?cb :cb :keys [flush?]}] - (assert-send-args ev ?timeout-ms ?cb) - (let [?cb-fn (cb-chan-as-fn ?cb ev)] + (-chsk-disconnect! [chsk reconn?] + (reset! active-retry-id_ "_disable-auto-retry") + (if reconn? + (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) + (merge>chsk-state! chsk {:open? false})) + (when-let [x @curr-xhr_] (.abort x))) + + (-chsk-reconnect! [chsk] + (-chsk-disconnect! chsk :reconn) + (-chsk-connect! chsk)) + + (-chsk-send! [chsk ev opts] + (let [{?timeout-ms :timeout-ms ?cb :cb :keys [flush?]} opts + _ (assert-send-args ev ?timeout-ms ?cb) + ?cb-fn (cb-chan-as-fn ?cb ev)] (if-not (:open? @state_) ; Definitely closed - (do (warnf "Chsk send against closed chsk.") - (when ?cb-fn (?cb-fn :chsk/closed))) + (chsk-send->closed! ?cb-fn) ;; TODO Buffer before sending (but honor `:flush?`) (let [csrf-token (:csrf-token @state_)] (ajax-lite url (merge ajax-opts - {:method :post :timeout-ms ?timeout-ms + {:method :post + :timeout-ms (or ?timeout-ms (:timeout-ms ajax-opts) + default-client-side-ajax-timeout-ms) :resp-type :text ; We'll do our own pstr decoding :headers (merge (:headers ajax-opts) ; 1st (don't clobber impl.): @@ -931,25 +1086,14 @@ (let [content ?content resp-ppstr content [resp-clj _] (unpack packer resp-ppstr)] - (if ?cb-fn (?cb-fn resp-clj) + (if ?cb-fn + (?cb-fn resp-clj) (when (not= resp-clj :chsk/dummy-cb-200) (warnf "Cb reply w/o local cb-fn: %s" resp-clj))) (merge>chsk-state! chsk {:open? true}))))) :apparent-success)))) - (chsk-destroy! [chsk] (chsk-disconnect! chsk)) - (chsk-disconnect! [chsk] - (reset! active-retry-id_ "disconnected") - (merge>chsk-state! chsk {:open? false}) - (when-let [x @curr-xhr_] (.abort x))) - - (chsk-reconnect! [chsk] - (reset! active-retry-id_ "reconnecting") - (merge>chsk-state! chsk {:open? false :requested-reconnect-pending? true}) - (when-let [x @curr-xhr_] (.abort x)) - (-chsk-connect! chsk)) - (-chsk-connect! [chsk] (let [retry-id (enc/uuid-str) poll-fn ; async-poll-for-update-fn @@ -960,16 +1104,15 @@ (when (= @active-retry-id_ retry-id) (let [retry-count* (inc retry-count) backoff-ms (backoff-ms-fn retry-count*)] - (warnf "Chsk is closed: will try reconnect (%s)" - retry-count*) - (.setTimeout js/window - (fn [] (poll-fn retry-count*)) - backoff-ms))))] + (warnf "Chsk is closed: will try reconnect (%s)" retry-count*) + (.setTimeout js/window (fn [] (poll-fn retry-count*)) backoff-ms))))] (reset! curr-xhr_ (ajax-lite url (merge ajax-opts - {:method :get :timeout-ms timeout-ms + {:method :get ; :timeout-ms timeout-ms + :timeout-ms (or (:timeout-ms ajax-opts) + default-client-side-ajax-timeout-ms) :resp-type :text ; Prefer to do our own pstr reading :params (merge @@ -998,178 +1141,222 @@ (retry-fn))) ;; The Ajax long-poller is used only for events, never cbs: - (let [content ?content - ppstr content - [clj _] (unpack packer ppstr)] - (or - (handle-when-handshake! chsk chs clj) - ;; Actually poll for an application reply: - (let [buffered-evs clj] - (receive-buffered-evs! chs buffered-evs) - (merge>chsk-state! chsk {:open? true}))) - - (poll-fn 0))))))))] + (let [content ?content + ppstr content + [clj] (unpack packer ppstr) + handshake? (handshake? clj)] + + (when handshake? (receive-handshake! :ajax chsk clj)) + + (merge>chsk-state! chsk {:open? true}) + (poll-fn 0) ; Repoll asap + + (when-not handshake? + (or + (when (= clj :chsk/timeout) + #_(receive-buffered-evs! chs [[:debug/timeout]]) + :noop) + + (let [buffered-evs clj] ; An application reply + (receive-buffered-evs! chs buffered-evs)))))))))))] (reset! active-retry-id_ retry-id) (poll-fn 0) chsk))) #+cljs -(defn- get-chsk-url [protocol chsk-host chsk-path type] - (let [protocol (case type :ajax protocol - :ws (if (= protocol "https:") "wss:" "ws:"))] - (str protocol "//" (enc/path chsk-host chsk-path)))) +(defn- new-ChAjaxSocket [opts] + (map->ChAjaxSocket + (merge + {:state_ (atom {:type :ajax :open? false}) + :active-retry-id_ (atom "_pending") + :ever-opened?_ (atom false) + :curr-xhr_ (atom nil)} + opts))) + +#+cljs +(defrecord ChAutoSocket + ;; Dynamic WebSocket/Ajax IChSocket implementation + ;; Wraps a swappable ChWebSocket/ChAjaxSocket + + [ws-chsk-opts ajax-chsk-opts + state_ ; {:type _ :open? _ :uid _ :csrf-token _} + impl_ ; ChWebSocket or ChAjaxSocket + ] + + IChSocket + (-chsk-disconnect! [chsk reconn?] + (when-let [impl @impl_] + (-chsk-disconnect! impl reconn?))) + + ;; Possibly reset impl type: + (-chsk-reconnect! [chsk] + (when-let [impl @impl_] + (-chsk-disconnect! impl :reconn) + (-chsk-connect! chsk))) + + (-chsk-send! [chsk ev opts] + (if-let [impl @impl_] + (-chsk-send! impl ev opts) + (let [{?cb :cb} opts + ?cb-fn (cb-chan-as-fn ?cb ev)] + (chsk-send->closed! ?cb-fn)))) + + (-chsk-connect! [chsk] + (let [ajax-chsk-opts (assoc ajax-chsk-opts :state_ state_) + ajax-conn! (fn [] (-chsk-connect! (new-ChAjaxSocket ajax-chsk-opts))) + + ws-err-fn ; Called on WebSocket's onerror + (fn [impl] + ;; Starting with something simple here as a proof of concept; + ;; TODO Consider smarter downgrade/upgrade strategies here later + (when-let [ever-opened?_ (:ever-opened?_ impl)] + (when-not @ever-opened?_ + (warnf "Permanently downgrading :auto chsk -> :ajax") + (-chsk-disconnect! impl false) + (reset! impl_ (ajax-conn!))))) + + ws-chsk-opts (assoc ws-chsk-opts :state_ state_ :err-fn ws-err-fn) + ws-conn! (fn [] (-chsk-connect! (new-ChWebSocket ws-chsk-opts)))] + + (reset! impl_ (or (ws-conn!) (ajax-conn!))) + chsk))) + +#+cljs +(defn- new-ChAutoSocket [opts] + (map->ChAutoSocket + (merge + {:state_ (atom {:type :auto :open? false}) + :impl_ (atom nil)} + opts))) + +#+cljs +(defn- get-chsk-url [protocol host path type] + (let [protocol (case type + :ajax protocol + :ws (if (= protocol "https:") "wss:" "ws:"))] + (str protocol "//" (enc/path host path)))) #+cljs (defn make-channel-socket-client! "Returns nil on failure, or a map with keys: - :ch-recv ; core.async channel to receive `event-msg`s (internal or from clients). - ; May `put!` (inject) arbitrary `event`s to this channel. + :ch-recv ; core.async channel to receive `event-msg`s (internal or from + ; clients). May `put!` (inject) arbitrary `event`s to this channel. :send-fn ; (fn [event & [?timeout-ms ?cb-fn]]) for client>server send. :state ; Watchable, read-only (atom {:type _ :open? _ :uid _ :csrf-token _}). :chsk ; IChSocket implementer. You can usu. ignore this. Common options: - :type ; e/o #{:auto :ws :ajax}. You'll usually want the default (:auto) - :host ; Server host (defaults to current page's host) - :params ; Map of any params to incl. in chsk Ring requests (handy for - ; application-level auth, etc.) - :ws-kalive-ms ; Ping to keep a WebSocket conn alive if no activity w/in given - ; number of milliseconds - :lp-timeout-ms ; Ping to keep a long-polling (Ajax) conn alive '' [1] - :packer ; :edn (default), or an IPacker implementation (experimental) - :ajax-opts ; Base opts map provided to `taoensso.encore/ajax-lite` - :wrap-recv-evs? ; Should events from server be wrapped in [:chsk/recv _]? - - [1] If you're using Immutant and override the default :lp-timeout-ms, you'll - need to provide the same timeout value to - `taoensso.sente.server-adapters.immutant/make-immutant-adapter` and use - the result of that function as the web server adapter to your server-side - `make-channel-socket-server!`." + :type ; e/o #{:auto :ws :ajax}. You'll usually want the default (:auto). + :host ; Server host (defaults to current page's host). + :params ; Map of any params to incl. in chsk Ring requests (handy + ; for application-level auth, etc.). + :packer ; :edn (default), or an IPacker implementation. + :ajax-opts ; Base opts map provided to `taoensso.encore/ajax-lite`. + :wrap-recv-evs? ; Should events from server be wrapped in [:chsk/recv _]?" + [path & - [{:keys [type host params recv-buf-or-n ws-kalive-ms lp-timeout-ms packer + [{:keys [type host params recv-buf-or-n packer client-id ajax-opts wrap-recv-evs? backoff-ms-fn] :as opts :or {type :auto recv-buf-or-n (async/sliding-buffer 2048) ; Mostly for buffered-evs - ws-kalive-ms 25000 ; < Heroku 30s conn timeout - lp-timeout-ms 25000 ; '' packer :edn client-id (or (:client-uuid opts) ; Backwards compatibility - (enc/uuid-str)) + (enc/uuid-str)) ;; TODO Deprecated. Default to false later, then eventually just ;; drop this option altogether? - here now for back compatibility: wrap-recv-evs? true backoff-ms-fn enc/exp-backoff}} + _deprecated-more-opts]] - {:pre [(have? [:in #{:ajax :ws :auto}] type) - (have? enc/nblank-str? client-id)]} - - (when (not (nil? _deprecated-more-opts)) - (warnf "`make-channel-socket-client!` fn signature CHANGED with Sente v0.10.0.")) - (when (contains? opts :lp-timeout) - (warnf ":lp-timeout opt has CHANGED; please use :lp-timout-ms.")) - - (let [packer (interfaces/coerce-packer packer) - - win-location (enc/get-window-location) - win-protocol (:protocol win-location) - host (or host (:host win-location)) - path (or path (:pathname win-location)) - - private-chs {:state (chan (async/sliding-buffer 10)) - :internal (chan (async/sliding-buffer 10)) - := max expected buffered-evs size: + :ChWebSocket - {:client-id client-id - :chs private-chs - :params params - :packer packer - :url (if-let [f (:chsk-url-fn opts)] - (f path win-location :ws) ; Deprecated - (get-chsk-url win-protocol host path :ws)) - - :state_ (atom {:type :ws :open? false}) - :cbs-waiting_ (atom {}) - :socket_ (atom nil) - :kalive-ms ws-kalive-ms - :kalive-timer_ (atom nil) - :kalive-due?_ (atom true) - :backoff-ms-fn backoff-ms-fn - :active-retry-id_ (atom "pending") - :retry-count_ (atom 0)}))) - - (and (not= type :ws) - (-chsk-connect! - (map->ChAjaxSocket - {:client-id client-id - :chs private-chs - :params params - :packer packer - :url (if-let [f (:chsk-url-fn opts)] - (f path win-location :ajax) ; Deprecated - (get-chsk-url win-protocol host path :ajax)) - - :state_ (atom {:type :ajax :open? false}) - :timeout-ms lp-timeout-ms - :ajax-opts ajax-opts - :curr-xhr_ (atom nil) - :backoff-ms-fn backoff-ms-fn - :active-retry-id_ (atom "pending")}))))] + (-chsk-connect! + (case type + :ws (new-ChWebSocket ws-chsk-opts) + :ajax (new-ChAjaxSocket ajax-chsk-opts) + :auto (new-ChAutoSocket auto-chsk-opts)))] (if-let [chsk ?chsk] (let [send-fn (partial chsk-send! chsk) - public-ch-recv + + ;; TODO map< is deprecated, prefer transducers (needs clj 1.7+) + + ev-ch + (async/merge + [(do (:internal private-chs)) + (do (:state private-chs)) + (let [ev-msg [ev] (let [[ev-id ev-?data :as ev] (as-event ev)] - {:ch-recv public-ch-recv - :send-fn send-fn - :state (:state_ chsk) - :event ev - :id ev-id - :?data ev-?data})) - public-ch-recv)] + {:ch-recv ev-ch + :send-fn send-fn + :state (:state_ chsk) + :event ev + :id ev-id + :?data ev-?data})) + ev-ch)] {:chsk chsk - :ch-recv public-ch-recv ; `ev`s->`ev-msg`s ch + :ch-recv ev-msg-ch ; Public `ev`s->`ev-msg`s ch :send-fn send-fn :state (:state_ chsk)}) @@ -1184,7 +1371,7 @@ (go-loop [] (let [[v p] (async/alts! [ch-recv ch-ctrl]) - stop? (enc/kw-identical? p ch-ctrl)] + stop? (= p ch-ctrl)] (when-not stop? (let [{:as event-msg :keys [event]} v @@ -1251,7 +1438,7 @@ #+clj (defn start-chsk-router-loop! - "DEPRECATED: Please use `start-chsk-router!` instead." + "DEPRECATED: Please use `start-chsk-router!` instead" [event-msg-handler ch-recv] (start-server-chsk-router! ch-recv ;; Old handler form: (fn [ev-msg ch-recv]) @@ -1259,21 +1446,20 @@ #+cljs (defn start-chsk-router-loop! - "DEPRECATED: Please use `start-chsk-router!` instead." + "DEPRECATED: Please use `start-chsk-router!` instead" [event-handler ch-recv] (start-client-chsk-router! ch-recv ;; Old handler form: (fn [ev ch-recv]) (fn [ev-msg] (event-handler (:event ev-msg) (:ch-recv ev-msg))))) -(defn set-logging-level! - "DEPRECATED. Please use `timbre/set-level!` instead." - [level] (timbre/set-level! level)) - -#+cljs -(def ajax-call "DEPRECATED: Please use `ajax-lite` instead." - enc/ajax-lite) +(def set-logging-level! "DEPRECATED. Please use `timbre/set-level!` instead" timbre/set-level!) +#+cljs (def ajax-call "DEPRECATED: Please use `ajax-lite` instead" enc/ajax-lite) #+cljs (def default-chsk-url-fn "DEPRECATED" - (fn [path {:as location :keys [adjusted-protocol host pathname]} websocket?] - (str adjusted-protocol "//" host (or path pathname)))) + (fn [path {:as location :keys [protocol host pathname]} websocket?] + (let [protocol + (if websocket? + (if (= protocol "https:") "wss:" "ws:") + protocol)] + (str protocol "//" host (or path pathname))))) diff --git a/src/taoensso/sente/interfaces.cljx b/src/taoensso/sente/interfaces.cljx index e0837a7..4c97b5f 100644 --- a/src/taoensso/sente/interfaces.cljx +++ b/src/taoensso/sente/interfaces.cljx @@ -20,13 +20,13 @@ (defprotocol IServerChan ;; Wraps a web server's own async channel/comms interface to abstract away ;; implementation differences - (sch-open? [server-ch] "Returns true iff the server channel is currently open") + (sch-open? [server-ch] "Returns true iff the channel is currently open.") (sch-close! [server-ch] - "Closes the server channel and returns true iff the channel was open when - called.") + "If the channel is open when called: closes the channel and returns true. + Otherwise noops and returns false.") (-sch-send! [server-ch msg close-after-send?] - "Sends a message to server channel. Returns true iff the channel was open - when called.")) + "If the channel is open when called: sends a message over channel and + returns true. Otherwise noops and returns false.")) (defn sch-send! "Sends a message to server channel. Returns true iff the channel was open @@ -57,17 +57,3 @@ arbitrary Clojure data <-> serialized strings." (pack [_ x]) (unpack [_ x])) - -(deftype EdnPacker [] - IPacker - (pack [_ x] (enc/pr-edn x)) - (unpack [_ s] (enc/read-edn s))) - -(def edn-packer "Default Edn packer" (->EdnPacker)) - -(defn coerce-packer [x] - (if (= x :edn) - edn-packer - (do (assert (satisfies? IPacker x) - (str "Given packer doesn't satisfy IPacker protocol?")) - x))) diff --git a/src/taoensso/sente/packers/transit.cljx b/src/taoensso/sente/packers/transit.cljx index 6ec886c..736d295 100644 --- a/src/taoensso/sente/packers/transit.cljx +++ b/src/taoensso/sente/packers/transit.cljx @@ -1,182 +1,112 @@ (ns taoensso.sente.packers.transit - "Experimental - subject to change! + "Alpha - subject to change! Optional Transit-format[1] IPacker implementation for use with Sente. [1] https://github.com/cognitect/transit-format." {:author "Peter Taoussanis, @ckarlsen84"} #+clj - (:require [clojure.string :as str] - [clojure.tools.reader.edn :as edn] - [taoensso.encore :as encore] - [taoensso.timbre :as timbre] - [cognitect.transit :as transit] - [taoensso.sente.interfaces :as interfaces :refer (pack unpack)]) + (:require + [clojure.string :as str] + [taoensso.encore :as enc :refer (have have! have?)] + [taoensso.timbre :as timbre] + [cognitect.transit :as transit] + [taoensso.sente.interfaces :as interfaces :refer (pack unpack)]) #+clj (:import [java.io ByteArrayInputStream ByteArrayOutputStream]) #+cljs - (:require [clojure.string :as str] - [cljs.reader :as edn] - [taoensso.encore :as encore :refer (format)] - [cognitect.transit :as transit] - [taoensso.sente.interfaces :as interfaces :refer (pack unpack)])) - -;; TODO Nb note that Transit-cljs doesn't seem to actually have msgpack support -;; for the moment + (:require + [clojure.string :as str] + [taoensso.encore :as enc :refer-macros (have have! have?)] + [cognitect.transit :as transit] + [taoensso.sente.interfaces :as interfaces :refer (pack unpack)])) +#+clj (defn- get-charset [transit-fmt] ;; :msgpack appears to need ISO-8859-1 to retain binary data correctly when ;; string-encoded, all other (non-binary) formats can get UTF-8: - (if (= transit-fmt :msgpack) "ISO-8859-1" "UTF-8")) + (if (enc/kw-identical? transit-fmt :msgpack) "ISO-8859-1" "UTF-8")) + +#+clj +(def ^:private cache-read-handlers + "reader-opts -> reader-opts with cached read handler map" + (let [cache (enc/memoize_ (fn [m] (transit/read-handler-map m)))] + (fn [reader-opts] + (if-let [m (:handlers reader-opts)] + (assoc reader-opts :handlers (cache m)) + reader-opts)))) + +#+clj +(def ^:private cache-write-handlers + "writer-opts -> writer-opts with cached write handler map" + (let [cache (enc/memoize_ (fn [m] (transit/write-handler-map m)))] + (fn [writer-opts] + (if-let [m (:handlers writer-opts)] + (assoc writer-opts :handlers (cache m)) + writer-opts)))) + +#+clj +(def ^:private transit-writer-fn-proxy + (enc/thread-local-proxy + (fn [fmt opts] + (let [^String charset (get-charset fmt) + opts (cache-write-handlers opts) + ^ByteArrayOutputStream baos (ByteArrayOutputStream. 512) + writer (transit/writer baos fmt opts)] + (fn [x] + (transit/write writer x) + (let [result (.toString baos charset)] + (.reset baos) + result)))))) + +(def ^:private get-transit-writer-fn + "Returns thread-safe (fn [x-to-write])" + #+cljs + (enc/memoize_ + (fn [fmt opts] + (let [writer (transit/writer fmt opts)] + (fn [x] (transit/write writer x))))) -;;; TODO -;; * Invesigate the actual cost of cljs+clj side writer/reader construction - -;; is it worth caching these? -;; * Is it worth considering a cljs-side baos/bais pool? + #+clj + (fn [fmt opts] + (let [thread-local-transit-writer-fn (.get ^ThreadLocal transit-writer-fn-proxy)] + (thread-local-transit-writer-fn fmt opts)))) -(def ^:private transit-writer* - #+cljs (encore/memoize_ (fn [ fmt opts] (transit/writer fmt opts))) - #+clj (fn [baos fmt opts] (transit/writer baos fmt opts))) +(def ^:private get-transit-reader-fn + "Returns thread-safe (fn [str-to-read])" + #+cljs + (enc/memoize_ + (fn [fmt opts] + (let [reader (transit/reader fmt opts)] + (fn [s] (transit/read reader s))))) -(def ^:private transit-reader* - #+cljs (encore/memoize_ (fn [ fmt opts] (transit/reader fmt opts))) - #+clj (fn [bais fmt opts] (transit/reader bais fmt opts))) + #+clj + (fn [fmt opts] + (let [^String charset (get-charset fmt) + opts (cache-read-handlers opts)] + (fn [s] + (let [ba (.getBytes ^String s ^String charset) + ^ByteArrayInputStream bais (ByteArrayInputStream. ba) + reader (transit/reader bais fmt opts)] + (transit/read reader)))))) (deftype TransitPacker [transit-fmt writer-opts reader-opts] taoensso.sente.interfaces/IPacker - (pack [_ x] - #+cljs (transit/write (transit-writer* transit-fmt writer-opts) x) - #+clj (let [charset (get-charset transit-fmt) - ^ByteArrayOutputStream baos (ByteArrayOutputStream. 512)] - (transit/write (transit-writer* baos transit-fmt writer-opts) x) - (.toString baos ^String charset))) - - (unpack [_ s] - #+cljs (transit/read (transit-reader* transit-fmt reader-opts) s) - #+clj (let [charset (get-charset transit-fmt) - ba (.getBytes ^String s ^String charset) - ^ByteArrayInputStream bais (ByteArrayInputStream. ba)] - (transit/read (transit-reader* bais transit-fmt reader-opts))))) - -(def ^:private default-edn-packer interfaces/edn-packer) ; Alias -(def ^:private default-json-packer (->TransitPacker :json {} {})) -;; (def ^:private default-msgpack-packer (->TransitPacker :msgpack {} {})) - -;;;; FlexiPacker ; EXPERIMENTAL - -(defn- max-flexi-format? [fmt] (= fmt :json #_:msgpack)) -(def ^:private max-flexi-format - (let [ordered-formats [nil :edn :json #_:msgpack] - scored-formats (zipmap ordered-formats (next (range)))] - (fn [xs] (apply max-key scored-formats xs)))) - -(comment (max-flexi-format [#_:msgpack :json :edn])) - -(defn- auto-flexi-format [x] - (cond - (string? x) ; Large strings are common for HTML, etc. - (let [c (count x)] - (cond ;; (> c 500) :msgpack - (> c 300) :json)) - - (and (sequential? x) (counted? x)) - (let [c (count x)] - (cond ;; (> c 50) :msgpack - (> c 20) :json - ;; TODO Try heuristically? (check random sample, etc.) - )))) - -(comment (auto-flexi-format (take 100 (range)))) - -(deftype FlexiPacker [default-fmt edn-packer json-packer] - taoensso.sente.interfaces/IPacker - (pack [_ x] - (let [?meta-format (when-let [m (meta x)] - (max-flexi-format (filter m (keys m)))) - ?auto-format (when-not ?meta-format (auto-flexi-format x)) - ;; ?auto-format (when-not (max-flexi-format? ?meta-format) - ;; (auto-flexi-format x)) - fmt (max-flexi-format [?auto-format ?meta-format default-fmt])] - (case fmt - ;; :msgpack (str "m" (pack msgpack-packer x)) - :json (str "j" (pack json-packer x)) - :edn (str "e" (pack edn-packer x))))) - - (unpack [_ s] - (let [prefix (encore/substr s 0 1) - s* (encore/substr s 1)] - (case prefix - ;; "m" (unpack msgpack-packer s*) - "j" (unpack json-packer s*) - "e" (unpack edn-packer s*) - (throw (ex-info (str "Malformed FlexiPacker data: " s) - {:s s})))))) - -(defn get-flexi-packer - "Experimental (pre-alpha): subject to change. - Returns an IPacker implementation that un/packs data with a variable format - determined by the data's size, metadata, or the provided `default-fmt` when no - metadata is present. - - (def fpack (partial pack (get-flexi-packer :edn))) - (fpack ^:edn {:a :A :b :B}) => \"e{:a :A, :b :B}\" - (fpack ^:json {:a :A :b :B}) => \"j[\"^ \",\"~:a\",\"~:A\",\"~:b\",\"~:B\"]\" - (fpack ^:msgpack {:a :A :b :B} => \"m\202£~:a£~:A£~:b£~:B\"" - - [& [default-fmt edn-packer json-packer]] - (let [default-fmt (or default-fmt :edn) - edn-packer (or edn-packer default-edn-packer) - json-packer (or json-packer default-json-packer)] - (assert (#{:edn ; Not a transit format - ;; Transit formats: - :json :json-verbose #_:msgpack} default-fmt)) - (->FlexiPacker default-fmt edn-packer json-packer))) - -(def default-flexi-packer (get-flexi-packer :edn)) + (pack [_ x] ((get-transit-writer-fn transit-fmt writer-opts) x)) + (unpack [_ s] ((get-transit-reader-fn transit-fmt reader-opts) s))) + +(defn get-transit-packer "Returns a new TransitPacker" + ([ ] (get-transit-packer :json {} {})) + ([transit-fmt] (get-transit-packer transit-fmt {} {})) + ([transit-fmt writer-opts reader-opts] + ;; No transit-cljs support for msgpack atm + (have? [:el #{:json #_:msgpack}] transit-fmt) + (have? map? writer-opts reader-opts) + (TransitPacker. transit-fmt writer-opts reader-opts))) (comment - (let [fpacker (get-flexi-packer)] - (def fpack (partial pack fpacker)) - (def funpack (partial unpack fpacker))) - (count (fpack ^:edn {:a :A :b :B})) - (count (fpack ^:json {:a :A :b :B})) - (count (fpack ^:msgpack {:a :A :b :B})) - (funpack (fpack ^:msgpack {:a :A :b :B :utf8 "ಬಾ ಇಲ್ಲಿ ಸಂಭವಿಸ"}))) - -(comment ; Packer benchmarks - (let [data - {:sm "Hello this is just a small string" - :md {:a :A :b :B :c :C :d "This is a slightly larger datum, yo"} - :lg ^:json ; ^:msgpack - {:a "ahjkhfkjdhfkjdhfjkhdfjkhdjkfhdfkjhdfkjsdsfsifsuifuiosudfd" - :b "fdjhfkjdhfjkdhfjkdhfjkhdjfkhdjkfhdfjkhsfsfiueiuiuiufiuiid" - :c "fdjhfs[pdopoeiroejlkjfdklfjdkjfkdjfkldsfdfueiuiyqqqhahdhf" - :d "fdkjoiwueuoiuwdm,sn,mndfdifdiofudfuoidfdfdfdfe3iuqiiuausj" - :e "ejhfiurhiuui2ureoiuoieuroiueoirueioureisfdfddjghiuyiuyeyu" - :f [1 383 398498 2 9 3389 893 9 309 290349 3782 1273 4447 933] - :g #{:a :b :c :d :e :f :g :h :hello/foo :hello/bar :hello/baz}}} - - data (:lg data) ; <-- Tweak input data size here - size (fn [packer] (count (pack packer data))) - bench (fn [packer] (encore/round (encore/qbench 10000 - (unpack packer (pack packer data))))) - - edn-packer default-edn-packer - json-packer default-json-packer] - - {:size {:edn (size edn-packer) - :json (size json-packer) - ;; :msgpack (size msgpack-packer) - :flexi (size (get-flexi-packer))} - :time {:edn (bench edn-packer) - :json (bench json-packer) - ;; :msgpack (bench msgpack-packer) - :flexi (bench (get-flexi-packer))}}) - - {:size {:edn 35, :json 43, :msgpack 41}, - :time {:edn 81, :json 316, :msgpack 515}} - {:size {:edn 63, :json 86, :msgpack 67}, - :time {:edn 228, :json 284, :msgpack 613}} - {:size {:edn 448, :json 510, :msgpack 444, :flexi 445}, - :time {:edn 3027, :json 1054, :msgpack 2054, :flexi 2213}}) + (def tp (get-transit-packer)) + (enc/qb 10000 + (unpack tp (pack tp [:chsk/ws-ping "foo"])) + (enc/read-edn (enc/pr-edn [:chsk/ws-ping "foo"])))) diff --git a/src/taoensso/sente/server_adapters/immutant.clj b/src/taoensso/sente/server_adapters/immutant.clj index 933624e..625b915 100644 --- a/src/taoensso/sente/server_adapters/immutant.clj +++ b/src/taoensso/sente/server_adapters/immutant.clj @@ -11,9 +11,7 @@ (-sch-send! [im-ch msg close-after-send?] (immutant/send! im-ch msg {:close? close-after-send?}))) -(deftype ImmutantServerChanAdapter - [lp-timeout-ms ; Nb Ref. https://goo.gl/t4RolO - ] +(deftype ImmutantServerChanAdapter [] i/IServerChanAdapter (ring-req->server-ch-resp [server-ch-adapter ring-req callbacks-map] (let [{:keys [on-open on-msg on-close]} callbacks-map] @@ -25,24 +23,10 @@ (fn [im-ch {:keys [code reason] :as status-map}] (on-close im-ch status-map))) :on-message (when on-msg (fn [im-ch message] (on-msg im-ch message))) - :timeout (if (:websocket? ring-req) 0 lp-timeout-ms))))) + :timeout 0 ; Deprecated, Ref. https://goo.gl/t4RolO + )))) -(defn make-immutant-adapter - "Returns a new Immutant adapter. Useful for overriding the default - :lp-timeout-ms option that specifies server-side timeout for - Ajax (long-polling) connections. - - NB: if you override the :lp-timeout-ms option in your client-side call - to `make-channel-socket!`, you'll need to provide that same value here. - - If you aren't customizing the client-side :lp-timeout-ms, you - can safely use the default Immutant adapter (`immutant-adapter` or - `sente-web-server-adapter`)." - - [{:keys [lp-timeout-ms] - :or {lp-timeout-ms 25000}}] - - (ImmutantServerChanAdapter. lp-timeout-ms)) +(defn make-immutant-adapter [_opts] (ImmutantServerChanAdapter.)) (def immutant-adapter (make-immutant-adapter nil)) (def sente-web-server-adapter