Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

268 fix connection pool #269

Merged
merged 17 commits into from
Dec 20, 2023
39 changes: 21 additions & 18 deletions example/benchmark.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import ../src/allographer/query_builder

randomize()
let rdb = dbOpen(PostgreSQL, "database", "user", "pass", "postgres", 5432, 95, 30, shouldDisplayLog=false)
# let rdb = dbOpen(MariaDB, "database", "user", "pass", "mariadb", 3306, 95, 30, shouldDisplayLog=false)
# let rdb = dbOpen(SQLite3, "db.sqlite3", 95, 30, shouldDisplayLog=false)
# let rdb = dbOpen(SurrealDB, "test", "test", "user", "pass", "http://surreal", 8000, 500, 30, shouldDisplayLog=false).waitFor()
let stdRdb = open("postgres:5432", "user", "pass", "database")
const range1_10000 = 1..10000

Expand All @@ -27,23 +30,25 @@ proc migrate() {.async.} =
rdb.create(
table("World", [
Column.increments("id"),
# Column.increments("index"), # for surreal
Column.integer("randomNumber").default(0)
]),
table("Fortune", [
Column.increments("id"),
# Column.increments("index"), # for surreal
Column.string("message")
])
)

seeder rdb, "World":
seeder(rdb, "World"):
var data = newSeq[JsonNode]()
for i in 1..10000:
for i in range1_10000:
data.add(
%*{"randomNumber": rand(1..10000)}
%*{"randomNumber": rand(range1_10000)}
)
await rdb.table("World").insert(data)

seeder rdb, "Fortune":
seeder(rdb, "Fortune"):
data = @[
%*{"id": 1, "message": "fortune: No such file or directory"},
%*{"id": 2, "message": "A computer scientist is someone who fixes things that aren''t broken."},
Expand All @@ -65,12 +70,13 @@ proc migrate() {.async.} =
let getFirstPrepare = stdRdb.prepare("getFirst", sql""" SELECT * FROM "World" WHERE id = $1 LIMIT 1 """, 1)
let updatePrepare = stdRdb.prepare("updatePrepare", sql""" UPDATE "World" SET "randomNumber" = $1 WHERE id = $2 """, 2)

const countNum = 500

proc query():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var futures = newSeq[Future[seq[string]]](countNum)
for i in 1..countNum:
let n = rand(range1_10000)
futures[i-1] = rdb.table("World").findPlain(n)
futures[i-1] = rdb.select().table("World").findPlain(n)
let resp = all(futures).await
let response = resp.map(
proc(x:seq[string]):JsonNode =
Expand All @@ -80,7 +86,6 @@ proc query():Future[seq[JsonNode]] {.async.} =
return response

proc queryRaw():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var futures = newSeq[Future[seq[string]]](countNum)
for i in 1..countNum:
let n = rand(range1_10000)
Expand All @@ -95,7 +100,6 @@ proc queryRaw():Future[seq[JsonNode]] {.async.} =


proc queryStd():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var resp:seq[Row]
for i in 1..countNum:
resp.add(stdRdb.getRow(getFirstPrepare, i))
Expand All @@ -107,23 +111,25 @@ proc queryStd():Future[seq[JsonNode]] {.async.} =


proc update():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var response = newSeq[JsonNode](countNum)
var futures = newSeq[Future[void]](countNum)
for i in 1..countNum:
let index = rand(range1_10000)
let number = rand(range1_10000)
futures[i-1] = (proc():Future[void] =
discard rdb.select("id", "randomNumber").table("World").findPlain(index)
rdb.table("World").where("id", "=", index).update(%*{"randomNumber": number})
futures[i-1] = (proc():Future[void] {.async.} =
discard rdb.select("id", "randomNumber").table("World").findPlain(index).await
rdb.table("World").where("id", "=", index).update(%*{"randomNumber": number}).await

# for surreal
# discard rdb.select("id", "randomNumber").table("World").where("index", "=", index).first().await
# rdb.table("World").where("index", "=", index).update(%*{"randomNumber": number}).await
)()
response[i-1] = %*{"id":index, "randomNumber": number}
await all(futures)
return response


proc updateRaw():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var response = newSeq[JsonNode](countNum)
var futures = newSeq[Future[void]](countNum)
for i in 1..countNum:
Expand All @@ -139,7 +145,6 @@ proc updateRaw():Future[seq[JsonNode]] {.async.} =


proc updateRawStd():Future[seq[JsonNode]] {.async.} =
const countNum = 500
var response = newSeq[JsonNode](countNum)
var futures = newSeq[Future[void]](countNum)
for i in 1..countNum:
Expand All @@ -162,10 +167,9 @@ proc timeProcess[T](name:string, cb:proc():Future[T]) {.async.}=
var resultStr = ""

for i in 1..times:
sleep(500)
sleep(100)
start = cpuTime()
for _ in 1..20:
discard cb().await
discard cb().await
eachTime = cpuTime() - start
sumTime += eachTime
if i > 1: resultStr.add("\n")
Expand All @@ -182,7 +186,6 @@ proc main() =
migrate().waitFor

timeProcess("query", query).waitFor
# timeProcess("queryTime", queryTime).waitFor
timeProcess("queryRaw", queryRaw).waitFor
timeProcess("queryStd", queryStd).waitFor
timeProcess("update", update).waitFor
Expand Down
20 changes: 10 additions & 10 deletions src/allographer/query_builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,61 +6,61 @@ when NimMajor == 2:

when isExistsSqlite:
import ./v2/query_builder/models/sqlite/sqlite_types; export sqlite_types
import ./v2/query_builder/models/sqlite/sqlite_connections; export sqlite_connections
import ./v2/query_builder/models/sqlite/sqlite_query; export sqlite_query
import ./v2/query_builder/models/sqlite/sqlite_exec; export sqlite_exec
import ./v2/query_builder/models/sqlite/sqlite_transaction; export sqlite_transaction

when isExistsPostgres:
import ./v2/query_builder/models/postgres/postgres_types; export postgres_types
import ./v2/query_builder/models/postgres/postgres_connections; export postgres_connections
import ./v2/query_builder/models/postgres/postgres_query; export postgres_query
import ./v2/query_builder/models/postgres/postgres_exec; export postgres_exec
import ./v2/query_builder/models/postgres/poatgres_transaction; export poatgres_transaction

when isExistsMariadb:
import ./v2/query_builder/models/mariadb/mariadb_types; export mariadb_types
import ./v2/query_builder/models/mariadb/mariadb_connections; export mariadb_connections
import ./v2/query_builder/models/mariadb/mariadb_query; export mariadb_query
import ./v2/query_builder/models/mariadb/mariadb_exec; export mariadb_exec
import ./v2/query_builder/models/mariadb/mariadb_transaction; export mariadb_transaction

when isExistsMysql:
import ./v2/query_builder/models/mysql/mysql_types; export mysql_types
import ./v2/query_builder/models/mysql/mysql_connections; export mysql_connections
import ./v2/query_builder/models/mysql/mysql_query; export mysql_query
import ./v2/query_builder/models/mysql/mysql_exec; export mysql_exec
import ./v2/query_builder/models/mysql/mysql_transaction; export mysql_transaction

when isExistsSurrealdb:
import ./v2/query_builder/models/surreal/surreal_types; export surreal_types
import ./v2/query_builder/models/surreal/surreal_connections; export surreal_connections
import ./v2/query_builder/models/surreal/surreal_query; export surreal_query
import ./v2/query_builder/models/surreal/surreal_exec; export surreal_exec
elif NimMajor == 1:
import ./v1/query_builder/enums; export enums
import ./v1/query_builder/error; export error

when isExistsSqlite:
import ./v1/query_builder/models/sqlite/sqlite_types; export sqlite_types
import ./v1/query_builder/models/sqlite/sqlite_connections; export sqlite_connections
import ./v1/query_builder/models/sqlite/sqlite_query; export sqlite_query
import ./v1/query_builder/models/sqlite/sqlite_exec; export sqlite_exec
import ./v1/query_builder/models/sqlite/sqlite_transaction; export sqlite_transaction

when isExistsPostgres:
import ./v1/query_builder/models/postgres/postgres_types; export postgres_types
import ./v1/query_builder/models/postgres/postgres_connections; export postgres_connections
import ./v1/query_builder/models/postgres/postgres_query; export postgres_query
import ./v1/query_builder/models/postgres/postgres_exec; export postgres_exec
import ./v1/query_builder/models/postgres/poatgres_transaction; export poatgres_transaction

when isExistsMariadb:
import ./v1/query_builder/models/mariadb/mariadb_types; export mariadb_types
import ./v1/query_builder/models/mariadb/mariadb_connections; export mariadb_connections
import ./v1/query_builder/models/mariadb/mariadb_query; export mariadb_query
import ./v1/query_builder/models/mariadb/mariadb_exec; export mariadb_exec
import ./v1/query_builder/models/mariadb/mariadb_transaction; export mariadb_transaction

when isExistsMysql:
import ./v1/query_builder/models/mysql/mysql_types; export mysql_types
import ./v1/query_builder/models/mysql/mysql_connections; export mysql_connections
import ./v1/query_builder/models/mysql/mysql_query; export mysql_query
import ./v1/query_builder/models/mysql/mysql_exec; export mysql_exec
import ./v1/query_builder/models/mysql/mysql_transaction; export mysql_transaction

when isExistsSurrealdb:
import ./v1/query_builder/models/surreal/surreal_types; export surreal_types
import ./v1/query_builder/models/surreal/surreal_connections; export surreal_connections
import ./v1/query_builder/models/surreal/surreal_query; export surreal_query
import ./v1/query_builder/models/surreal/surreal_exec; export surreal_exec
9 changes: 9 additions & 0 deletions src/allographer/v1/query_builder/libs/sqlite/sqlite_impl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ./sqlite_lib

proc query*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[(seq[Row], DbRows)] {.async.} =
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var dbRows: DbRows
var rows = newSeq[seq[string]]()
for row in db.instantRows(dbRows, query, args):
Expand All @@ -20,6 +21,7 @@ proc query*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[(se

proc queryPlain*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[seq[Row]] {.async.} =
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var rows = newSeq[seq[string]]()
for row in db.instantRowsPlain(query, args):
var columns = newSeq[string](row.len)
Expand All @@ -30,6 +32,7 @@ proc queryPlain*(db:PSqlite3, query:string, args:seq[string], timeout:int):Futur


proc getColumnTypes*(db:PSqlite3, query: string):Future[seq[(string, string)]] {.async.} =
sleepAsync(0).await
var dbRows: DbRows
var columns = newSeq[(string, string)]()
for row in db.instantRows(dbRows, query, newSeq[string]()):
Expand All @@ -40,6 +43,7 @@ proc getColumnTypes*(db:PSqlite3, query: string):Future[seq[(string, string)]] {
proc exec*(db:PSqlite3, query: string, args: JsonNode, columns:seq[(string, string)], timeout:int) {.async.} =
## args is `JArray`
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
# var q = dbFormat(query, strArges)
var stmt: PStmt
var res:bool
Expand Down Expand Up @@ -93,6 +97,7 @@ proc exec*(db:PSqlite3, query: string, args: JsonNode, timeout:int) {.async.} =
## used for rdb.raw().exec()
## args are `JArray`
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var stmt: PStmt
var res:bool
if prepare_v2(db, query.cstring, query.len.cint, stmt, nil) == SQLITE_OK:
Expand Down Expand Up @@ -132,6 +137,7 @@ proc exec*(db:PSqlite3, query: string, args: JsonNode, timeout:int) {.async.} =
proc exec*(db:PSqlite3, query: string, args: seq[string], timeout:int) {.async.} =
## Not used anymore
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var q = dbFormat(query, args)
var stmt: PStmt
var res:bool
Expand All @@ -148,11 +154,13 @@ proc exec*(db:PSqlite3, query: string, args: seq[string], timeout:int) {.async.}

proc getColumns*(db:PSqlite3, query:string, args:seq[string], timeout:int):Future[seq[string]] {.async.} =
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var dbRows: DbRows
return db.getColumns(dbRows, query, args)


proc prepare*(db:PSqlite3, query:string, timeout:int):Future[PStmt] {.async.} =
sleepAsync(0).await
if prepare_v2(db, query, query.len.cint, result, nil) != SQLITE_OK:
discard finalize(result)
dbError(db)
Expand All @@ -164,6 +172,7 @@ proc preparedQuery*(db:PSqlite3, args:seq[string] = @[], sqliteStmt:PStmt):Futur
sqliteStmt.bindParam(i+1, row)
# run query
assert(not db.isNil, "Database not connected.")
sleepAsync(0).await
var dbRows: DbRows
var rows = newSeq[seq[string]]()
for row in db.instantRows(dbRows, sqliteStmt):
Expand Down
36 changes: 0 additions & 36 deletions src/allographer/v1/query_builder/libs/surreal/surreal_impl.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,42 +7,6 @@ import ../../error
import ./surreal_rdb
import ./surreal_lib

type SurrealImpl* = ref object

# proc open*(_:type SurrealImpl, namespace="", database="",user="", password="",
# host="", port:int32 = 0, maxConnections=1, timeout=30):Future[SurrealConnections] {.async.} =
# var pools = newSeq[SurrealConn](maxConnections)
# for i in 0..<maxConnections:
# let client = newAsyncHttpClient()
# var headers = newHttpHeaders(true)
# headers["NS"] = namespace
# headers["DB"] = database
# headers["Accept"] = "application/json"
# headers["Authorization"] = "Basic " & base64.encode(user & ":" & password)
# client.headers = headers

# var url = &"{host}:{port}/status"
# var resp = client.get(url).await
# if(resp.status != $Http200):
# dbError(&"Cannot connect to SurrealDb {host}:{port}")

# url = &"{host}:{port}/sql"
# resp = client.post(url, &"DEFINE NAMESPACE `{namespace}`; USE NS `{namespace}`; DEFINE DATABASE `{database}`").await
# if(resp.status != $Http200):
# dbError(&"Cannot connect to SurrealDb {host}:{port}")

# pools[i] = SurrealConn(
# conn: client,
# host:host,
# port:port,
# isBusy: false,
# createdAt: getTime().toUnix(),
# )
# return SurrealConnections(
# pools: pools,
# timeout: timeout
# )


proc query*(db:SurrealConn, query: string, args: seq[string], timeout:int):Future[JsonNode] {.async.} =
## return JArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,35 +15,33 @@ proc select*(self:MariadbConnections, columnsArg:varargs[string]):MariadbQuery =
else:
query["select"] = %columnsArg

let MariadbQuery = MariadbQuery(
let mariadbQuery = MariadbQuery(
log: self.log,
pools: self.pools,
timeout: self.timeout,
info: self.info,
query: query,
queryString: "",
placeHolder: newJArray(),
isInTransaction: self.isInTransaction,
transactionConn: self.transactionConn,
)
return MariadbQuery
return mariadbQuery


proc table*(self:MariadbConnections, tableArg: string): MariadbQuery =
let query = newJObject()
query["table"] = %tableArg

let MariadbQuery = MariadbQuery(
let mariadbQuery = MariadbQuery(
log: self.log,
pools: self.pools,
timeout: self.timeout,
query: query,
queryString: "",
placeHolder: newJArray(),
isInTransaction: self.isInTransaction,
transactionConn: self.transactionConn,
)
return MariadbQuery
return mariadbQuery


proc raw*(self:MariadbConnections, sql:string, arges=newJArray()): RawMariadbQuery =
Expand All @@ -53,7 +51,6 @@ proc raw*(self:MariadbConnections, sql:string, arges=newJArray()): RawMariadbQue
let rawQueryRdb = RawMariadbQuery(
log: self.log,
pools: self.pools,
timeout: self.timeout,
info: self.info,
query: newJObject(),
queryString: sql.strip(),
Expand Down
Loading