From 2b4ca4d0ff5ccae3f6713d84708496f7c14c5228 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Tue, 31 Oct 2023 14:46:46 +0100 Subject: [PATCH] chore: Optimize postgres - use of rowCallback approach (#2171) * db_postgres, postgres_driver: better performance by using callback. There were a bunch of milliseconds being lost due to multiple-row processing. This commit aims to have the minimum possible row process time. * pgasyncpool: clarifying logic around pool conn management. * db_postgres: removing duplicate code and more searchable proc names. --- waku/common/databases/db_postgres/dbconn.nim | 47 +++-- .../databases/db_postgres/pgasyncpool.nim | 79 ++++---- .../postgres_driver/postgres_driver.nim | 173 ++++++++++-------- .../postgres_driver/postgres_healthcheck.nim | 4 +- 4 files changed, 160 insertions(+), 143 deletions(-) diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 43464120d8..ce9e54c2b8 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -9,6 +9,8 @@ import include db_postgres +type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe.} + ## Connection management proc check*(db: DbConn): Result[void, string] = @@ -43,11 +45,11 @@ proc open*(connString: string): ok(conn) -proc rows*(db: DbConn, - query: SqlQuery, - args: seq[string]): - Future[Result[seq[Row], string]] {.async.} = - ## Runs the SQL getting results. +proc sendQuery(db: DbConn, + query: SqlQuery, + args: seq[string]): + Future[Result[void, string]] {.async.} = + ## This proc can be used directly for queries that don't retrieve values back. if db.status != CONNECTION_OK: let checkRes = db.check() @@ -71,7 +73,13 @@ proc rows*(db: DbConn, return err("failed pqsendQuery: unknown reason") - var ret = newSeq[Row](0) + return ok() + +proc waitQueryToFinish(db: DbConn, + rowCallback: DataProc = nil): + Future[Result[void, string]] {.async.} = + ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) + ## For other queries, like "INSERT", 'rowCallback' should be nil. while true: @@ -84,22 +92,33 @@ proc rows*(db: DbConn, return err("failed pqconsumeInput: unknown reason") if db.pqisBusy() == 1: - await sleepAsync(0.milliseconds) # Do not block the async runtime + await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime continue - var pqResult = db.pqgetResult() + let pqResult = db.pqgetResult() if pqResult == nil: # Check if its a real error or just end of results let checkRes = db.check() if checkRes.isErr(): return err("error in rows: " & checkRes.error) - return ok(ret) # reached the end of the results + return ok() # reached the end of the results - var cols = pqResult.pqnfields() - var row = cols.newRow() - for i in 0'i32 .. pqResult.pqNtuples() - 1: - pqResult.setRow(row, i, cols) # puts the value in the row - ret.add(row) + if not rowCallback.isNil(): + rowCallback(pqResult) pqclear(pqResult) + +proc dbConnQuery*(db: DbConn, + query: SqlQuery, + args: seq[string], + rowCallback: DataProc): + Future[Result[void, string]] {.async, gcsafe.} = + + (await db.sendQuery(query, args)).isOkOr: + return err("error in dbConnQuery calling sendQuery: " & $error) + + (await db.waitQueryToFinish(rowCallback)).isOkOr: + return err("error in dbConnQuery calling waitQueryToFinish: " & $error) + + return ok() diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index 2e5fa4e079..0757c31f34 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -8,15 +8,11 @@ else: import std/[sequtils,nre, strformat], stew/results, - chronicles, chronos import ./dbconn, ../common -logScope: - topics = "postgres asyncpool" - type PgAsyncPoolState {.pure.} = enum Closed, Live, @@ -107,6 +103,16 @@ proc close*(pool: PgAsyncPool): return ok() +proc getFirstFreeConnIndex(pool: PgAsyncPool): + DatabaseResult[int] = + for index in 0..