From e1e05afb02b319b5cbacea57dba122ededc619b0 Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Wed, 4 Sep 2024 10:17:28 -0400 Subject: [PATCH] chore: per limit split of PostgreSQL queries (#3008) --- .../postgres_driver/postgres_driver.nim | 81 +++++++++++++------ 1 file changed, 57 insertions(+), 24 deletions(-) diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index 6c0160805e..6c8ba28d6a 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[nre, options, sequtils, strutils, strformat, times], + std/[nre, options, sequtils, strutils, strformat, times, sugar], stew/[byteutils, arrayops], results, chronos, @@ -128,7 +128,9 @@ const SelectCursorByHashDef = """SELECT timestamp FROM messages WHERE messageHash = $1""" -const DefaultMaxNumConns = 50 +const + DefaultMaxNumConns = 50 + MaxHashesPerQuery = 100 proc new*( T: type PostgresDriver, @@ -815,30 +817,27 @@ proc getMessagesByMessageHashes( debug "end of getMessagesByMessageHashes" return ok(rows) -method getMessages*( - s: PostgresDriver, - includeData = true, - contentTopics = newSeq[ContentTopic](0), - pubsubTopic = none(PubsubTopic), - cursor = none(ArchiveCursor), - startTime = none(Timestamp), - endTime = none(Timestamp), - hashes = newSeq[WakuMessageHash](0), - maxPageSize = DefaultPageSize, - ascendingOrder = true, - requestId = "", +proc getMessagesWithinLimits( + self: PostgresDriver, + includeData: bool, + contentTopics: seq[ContentTopic], + pubsubTopic: Option[PubsubTopic], + cursor: Option[ArchiveCursor], + startTime: Option[Timestamp], + endTime: Option[Timestamp], + hashes: seq[WakuMessageHash], + maxPageSize: uint, + ascendingOrder: bool, + requestId: string, ): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = - debug "beginning of getMessages" - - const MAX_ALLOWED_HASHES = 100 - if hashes.len > MAX_ALLOWED_HASHES: - return err(fmt"can not attend queries with more than {MAX_ALLOWED_HASHES} hashes") + if hashes.len > MaxHashesPerQuery: + return err(fmt"can not attend queries with more than {MaxHashesPerQuery} hashes") let hexHashes = hashes.mapIt(toHex(it)) if cursor.isNone() and pubsubTopic.isNone() and contentTopics.len == 0 and startTime.isNone() and endTime.isNone() and hexHashes.len > 0: - return await s.getMessagesByMessageHashes( + return await self.getMessagesByMessageHashes( "'" & hexHashes.join("','") & "'", maxPageSize, requestId ) @@ -846,7 +845,7 @@ method getMessages*( startTime.isSome() and endTime.isSome(): ## Considered the most common query. Therefore, we use prepared statements to optimize it. if includeData: - return await s.getMessagesPreparedStmt( + return await self.getMessagesPreparedStmt( contentTopics.join(","), PubsubTopic(pubsubTopic.get()), cursor, @@ -858,7 +857,7 @@ method getMessages*( requestId, ) else: - return await s.getMessageHashesPreparedStmt( + return await self.getMessageHashesPreparedStmt( contentTopics.join(","), PubsubTopic(pubsubTopic.get()), cursor, @@ -872,16 +871,50 @@ method getMessages*( else: if includeData: ## We will run atypical query. In this case we don't use prepared statemets - return await s.getMessagesArbitraryQuery( + return await self.getMessagesArbitraryQuery( contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, ascendingOrder, requestId, ) else: - return await s.getMessageHashesArbitraryQuery( + return await self.getMessageHashesArbitraryQuery( contentTopics, pubsubTopic, cursor, startTime, endTime, hexHashes, maxPageSize, ascendingOrder, requestId, ) +method getMessages*( + s: PostgresDriver, + includeData = true, + contentTopics = newSeq[ContentTopic](0), + pubsubTopic = none(PubsubTopic), + cursor = none(ArchiveCursor), + startTime = none(Timestamp), + endTime = none(Timestamp), + hashes = newSeq[WakuMessageHash](0), + maxPageSize = DefaultPageSize, + ascendingOrder = true, + requestId = "", +): Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} = + debug "beginning of getMessages" + + let rows = collect(newSeq): + for i in countup(0, hashes.len, MaxHashesPerQuery): + let stop = min(i + MaxHashesPerQuery, hashes.len) + + let splittedHashes = hashes[i ..< stop] + + let subRows = + ?await s.getMessagesWithinLimits( + includeData, contentTopics, pubsubTopic, cursor, startTime, endTime, + splittedHashes, maxPageSize, ascendingOrder, requestId, + ) + + for row in subRows: + row + + debug "end of getMessages" + + return ok(rows) + proc getStr( s: PostgresDriver, query: string ): Future[ArchiveDriverResult[string]] {.async.} =