Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a caching layer #15

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
:aliases {:test {:extra-paths ["test"]
:extra-deps {org.clojure/clojure {:mvn/version "1.10.3"}
com.clojure-goes-fast/clj-async-profiler {:mvn/version "0.5.1"}
org.clojure/core.cache {:mvn/version "1.0.225"}
com.github.ben-manes.caffeine/caffeine {:mvn/version "3.0.5"}
funcool/promesa {:mvn/version "2.0.1"}
manifold/manifold {:mvn/version "0.1.8"}
com.h2database/h2 {:mvn/version "1.4.199"}
Expand Down
2 changes: 2 additions & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
:profiles {:dev {:global-vars {*warn-on-reflection* true}
:dependencies [[org.clojure/clojure "1.10.3"]
[com.clojure-goes-fast/clj-async-profiler "0.5.1"]
[org.clojure/core.cache "1.0.225"]
[com.github.ben-manes.caffeine/caffeine "3.0.5"]
[funcool/promesa "2.0.1"]
[manifold "0.1.8"]
[com.h2database/h2 "1.4.199"]
Expand Down
33 changes: 16 additions & 17 deletions src/porsas/async.clj
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
(ns porsas.async
(:require [porsas.core :as p])
(:require [porsas.cache :as cache]
[porsas.core :as p])
(:import (io.vertx.pgclient PgPool PgConnectOptions)
(io.vertx.sqlclient PoolOptions Tuple RowSet)
io.vertx.sqlclient.impl.ArrayTuple
io.vertx.pgclient.impl.RowImpl
(io.vertx.core Vertx Handler AsyncResult VertxOptions)
(java.util Collection HashMap Map)
java.util.Collection
(clojure.lang PersistentVector)
(java.util.concurrent CompletableFuture Executor CompletionStage)
(java.util.function Function)))
Expand Down Expand Up @@ -100,21 +101,19 @@
| key | description |
| --------------|-------------|
| `:row` | Optional function of `tuple->value` or a [[RowCompiler]] to convert rows into values
| `:cache` | Optional [[java.util.Map]] instance to hold the compiled rowmappers"
| `:cache` | Optional [[porsas.cache/Cache]] instance to hold the compiled rowmappers"
([] (context {}))
([{:keys [row cache] :or {cache (HashMap.)}}]
(let [cache (or cache (reify Map (get [_ _]) (put [_ _ _]) (entrySet [_])))
->row (fn [sql ^RowSet rs]
(let [cols (col-map rs)
row (cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))]
(.put ^Map cache sql row)
row))]
([{:keys [row cache]}]
(let [cache (or cache ((requiring-resolve 'porsas.cache.caffeine/create-cache)))
->row (fn [_sql ^RowSet rs]
(let [cols (col-map rs)]
(cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))))]
(reify
p/Cached
(cache [_] (into {} cache))
cache/Cached
(cache [_] (cache/elements cache))
Context
(-query-one [_ pool sqlvec]
(let [sql (-get-sql sqlvec)
Expand All @@ -130,7 +129,7 @@
it (.iterator rs)]
(if-not (.hasNext it)
(.complete cf nil)
(let [row (or (.get ^Map cache sql) (->row sql rs))]
(let [row (cache/lookup-or-set cache sql #(->row %1 rs))]
(.complete cf (row (.next it))))))
(.completeExceptionally cf (.cause ^AsyncResult res)))))))
cf))
Expand All @@ -146,7 +145,7 @@
(if (.succeeded ^AsyncResult res)
(let [rs ^RowSet (.result ^AsyncResult res)
it (.iterator rs)
row (or (.get ^Map cache sql) (->row sql rs))]
row (cache/lookup-or-set cache sql #(->row %1 rs))]
(loop [res []]
(if (.hasNext it)
(recur (conj res (row (.next it))))
Expand Down
9 changes: 9 additions & 0 deletions src/porsas/cache.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
(ns porsas.cache)

(defprotocol Cache
(lookup-or-set [this k value-fn] "Lookups a value in the cache based on `k` and if not found sets its value based on `value-fn` and returns it.")
(elements [this] "Returns the actual state of the cache."))

(defprotocol Cached
(cache [this]))

20 changes: 20 additions & 0 deletions src/porsas/cache/caffeine.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(ns porsas.cache.caffeine
(:require [porsas.cache :as pc])
(:import [com.github.benmanes.caffeine.cache Caffeine Cache]
java.util.function.Function))

(defn ->function ^Function [f]
(reify Function
(apply [_this t]
(f t))))

(defrecord CaffeineCache [c]
pc/Cache
(lookup-or-set [this k value-fn]
(.get ^Cache (:c this) k (->function value-fn)))
(elements [this] (into {} (.asMap ^Cache (:c this)))))

(defn create-cache []
(->CaffeineCache (-> (Caffeine/newBuilder)
(.maximumSize 10000)
(.build))))
12 changes: 12 additions & 0 deletions src/porsas/cache/core.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
(ns porsas.cache.core
(:require [clojure.core.cache.wrapped :as c]
[porsas.cache :as pc]))

(defrecord CoreCache [c]
pc/Cache
(lookup-or-set [this k value-fn]
(c/lookup-or-miss (:c this) k value-fn))
(elements [this] (into {} @(:c this))))

(defn create-cache []
(->CoreCache (c/lru-cache-factory {})))
3 changes: 0 additions & 3 deletions src/porsas/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@
(defprotocol GetValue
(get-value [this i]))

(defprotocol Cached
(cache [this]))

;;
;; Implementation
;;
Expand Down
48 changes: 23 additions & 25 deletions src/porsas/jdbc.clj
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
(ns porsas.jdbc
(:require [porsas.core :as p])
(:require [porsas.cache :as cache]
[porsas.core :as p])
(:import (java.sql Connection PreparedStatement ResultSet ResultSetMetaData)
(javax.sql DataSource)
(java.util Iterator Map)
java.util.Iterator
(clojure.lang PersistentVector)))

;;
Expand Down Expand Up @@ -118,42 +119,39 @@
| --------------|-------------|
| `:row` | Optional function of `rs->value` or a [[RowCompiler]] to convert rows into values
| `:key` | Optional function of `rs-meta i->key` to create key for map-results
| `:cache` | Optional [[java.util.Map]] instance to hold the compiled rowmappers"
| `:cache` | Optional [[porsas.cache/Cache]] instance to hold the compiled rowmappers"
([] (context {}))
([{:keys [row key cache] :or {key (unqualified-key)
cache (java.util.HashMap.)}}]
(let [cache (or cache (reify Map (get [_ _]) (put [_ _ _]) (entrySet [_])))
->row (fn [sql rs]
(let [cols (col-map rs key)
row (cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))]
(.put ^Map cache sql row)
row))]
([{:keys [row key cache] :or {key (unqualified-key)}}]
(let [c (or cache ((requiring-resolve 'porsas.cache.caffeine/create-cache)))
->row (fn [_sql rs]
(let [cols (col-map rs key)]
(cond
(satisfies? p/RowCompiler row) (p/compile-row row (map last cols))
row row
:else (p/rs->map-of-cols cols))))]
(reify
p/Cached
(cache [_] (into {} cache))
cache/Cached
(cache [_] (cache/elements cache))
Context
(-query-one [_ connection sqlvec]
(let [sql (-get-sql sqlvec)
(let [sql (-get-sql sqlvec)
params (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
ps (.prepareStatement ^Connection connection sql)]
(try
(prepare! ps params)
(let [rs (.executeQuery ps)
row (or (.get ^Map cache sql) (->row sql rs))]
(if (.next rs) (row rs)))
(let [rs (.executeQuery ps)
row (cache/lookup-or-set c sql #(->row %1 rs))]
(when (.next rs) (row rs)))
(finally
(.close ps)))))
(-query [_ connection sqlvec]
(let [sql (-get-sql sqlvec)
it (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
it (-get-parameter-iterator sqlvec)
ps (.prepareStatement ^Connection connection sql)]
(try
(prepare! ps it)
(let [rs (.executeQuery ps)
row (or (.get ^Map cache sql) (->row sql rs))]
(let [rs (.executeQuery ps)
row (cache/lookup-or-set c sql #(->row %1 rs))]
(loop [res []]
(if (.next rs)
(recur (conj res (row rs)))
Expand Down
21 changes: 9 additions & 12 deletions src/porsas/next.clj
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
(ns porsas.next
(:require [next.jdbc.result-set :as rs]
[porsas.jdbc :as pj]
[porsas.core :as p])
(:import (java.sql ResultSet)
(java.util HashMap)))
[porsas.cache :as cache]
[porsas.core :as p]
[porsas.jdbc :as pj])
(:import (java.sql ResultSet)))

(defn caching-row-builder
"A [[next.jdbc.result-set/RowBuilder]] implementation using porsas. WIP."
([]
(caching-row-builder (pj/qualified-key)))
([key]
(let [cache (HashMap.)] ;; TODO: make bounded
([key & {:keys [cache]}]
(let [cache (or cache ((requiring-resolve 'porsas.cache.caffeine/create-cache)))]
(fn [^ResultSet rs opts]
(let [sql (:next.jdbc/sql-string opts)
->row (or (.get cache sql)
(let [->row (p/rs-> 1 nil (map last (pj/col-map rs key)))]
(.put cache sql ->row)
->row))]
(let [sql (:next.jdbc/sql-params opts)
->row (cache/lookup-or-set cache sql (fn [_] (p/rs-> 1 nil (map last (pj/col-map rs key)))))]
(reify
p/Cached
cache/Cached
(cache [_] (into {} cache))
rs/RowBuilder
(->row [_] (->row rs))
Expand Down
7 changes: 3 additions & 4 deletions test/porsas/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@
[manifold.deferred :as d])
(:import io.vertx.pgclient.PgPool))

(def pool-opts {:uri "postgresql://localhost:5432/porsas"
:user "user"
:password "password"})
(def pool-opts {:uri "postgresql://localhost:5432/porsas"
:user "postgres"})

(t/deftest async
(let [pool (pa/pool pool-opts)]
Expand All @@ -17,7 +16,7 @@
(pa/then :name)
(deref))))

(t/is (= "io.reactiverse.pgclient.PgException: relation \"non_existing\" does not exist"
(t/is (= "io.vertx.pgclient.PgException: ERROR: relation \"non_existing\" does not exist (42P01)"
(-> (pa/query-one pool ["SELECT * from non_existing where id=$1" 1])
(pa/then :name)
(pa/catch #(-> % .getMessage))
Expand Down
20 changes: 20 additions & 0 deletions test/porsas/cache_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
(ns porsas.cache-test
(:require [clojure.test :as t]
[clojure.template :refer [do-template]]
[porsas.cache :as sut]
[porsas.cache.core :as core]
[porsas.cache.caffeine :as caffeine]))

(t/deftest cache
(do-template
[t cache]
(let [c cache]
(t/testing t
(t/is (= {} (sut/elements c)))

(t/is (= ::a (sut/lookup-or-set c :a (constantly ::a))))
(t/is (= ::a (sut/lookup-or-set c :a (constantly ::b))))

(t/is (= {:a ::a} (sut/elements c)))))
"core cache" (core/create-cache)
"caffeine" (caffeine/create-cache)))