Skip to content

Commit

Permalink
Revert "postgres_driver.nim: rename table's name from "messages" to "…
Browse files Browse the repository at this point in the history
…message" (#2110)" (#2115)

This reverts commit 71cfbbc.
  • Loading branch information
alrevuelta authored Oct 6, 2023
1 parent 71cfbbc commit a0033df
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ type PostgresDriver* = ref object of ArchiveDriver
connPool: PgAsyncPool

proc dropTableQuery(): string =
"DROP TABLE message"
"DROP TABLE messages"

proc createTableQuery(): string =
"CREATE TABLE IF NOT EXISTS message (" &
"CREATE TABLE IF NOT EXISTS messages (" &
" pubsubTopic VARCHAR NOT NULL," &
" contentTopic VARCHAR NOT NULL," &
" payload VARCHAR," &
Expand All @@ -37,7 +37,7 @@ proc createTableQuery(): string =

proc insertRow(): string =
# TODO: get the sql queries from a file
"""INSERT INTO message (id, storedAt, contentTopic, payload, pubsubTopic,
"""INSERT INTO messages (id, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7);"""

const MaxNumConns = 5 #TODO: we may need to set that from app args (maybe?)
Expand Down Expand Up @@ -142,11 +142,11 @@ proc toArchiveRow(r: Row): ArchiveDriverResult[ArchiveRow] =

method getAllMessages*(s: PostgresDriver):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
## Retrieve all message from the store.
## Retrieve all messages from the store.

let rowsRes = await s.connPool.query("""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id FROM message ORDER BY storedAt ASC""",
id FROM messages ORDER BY storedAt ASC""",
newSeq[string](0))

if rowsRes.isErr():
Expand All @@ -172,7 +172,7 @@ method getMessages*(s: PostgresDriver,
ascendingOrder = true):
Future[ArchiveDriverResult[seq[ArchiveRow]]] {.async.} =
var query = """SELECT storedAt, contentTopic, payload,
pubsubTopic, version, timestamp, id FROM message"""
pubsubTopic, version, timestamp, id FROM messages"""
var statements: seq[string]
var args: seq[string]

Expand Down Expand Up @@ -257,7 +257,7 @@ proc getInt(s: PostgresDriver,
method getMessagesCount*(s: PostgresDriver):
Future[ArchiveDriverResult[int64]] {.async.} =

let intRes = await s.getInt("SELECT COUNT(1) FROM message")
let intRes = await s.getInt("SELECT COUNT(1) FROM messages")
if intRes.isErr():
return err("error in getMessagesCount: " & intRes.error)

Expand All @@ -266,7 +266,7 @@ method getMessagesCount*(s: PostgresDriver):
method getOldestMessageTimestamp*(s: PostgresDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =

let intRes = await s.getInt("SELECT MIN(storedAt) FROM message")
let intRes = await s.getInt("SELECT MIN(storedAt) FROM messages")
if intRes.isErr():
return err("error in getOldestMessageTimestamp: " & intRes.error)

Expand All @@ -275,7 +275,7 @@ method getOldestMessageTimestamp*(s: PostgresDriver):
method getNewestMessageTimestamp*(s: PostgresDriver):
Future[ArchiveDriverResult[Timestamp]] {.async.} =

let intRes = await s.getInt("SELECT MAX(storedAt) FROM message")
let intRes = await s.getInt("SELECT MAX(storedAt) FROM messages")
if intRes.isErr():
return err("error in getNewestMessageTimestamp: " & intRes.error)

Expand All @@ -287,7 +287,7 @@ method deleteMessagesOlderThanTimestamp*(
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(
"DELETE FROM message WHERE storedAt < " & $ts)
"DELETE FROM messages WHERE storedAt < " & $ts)
if execRes.isErr():
return err("error in deleteMessagesOlderThanTimestamp: " & execRes.error)

Expand All @@ -299,9 +299,9 @@ method deleteOldestMessagesNotWithinLimit*(
Future[ArchiveDriverResult[void]] {.async.} =

let execRes = await s.connPool.exec(
"""DELETE FROM message WHERE id NOT IN
"""DELETE FROM messages WHERE id NOT IN
(
SELECT id FROM message ORDER BY storedAt DESC LIMIT ?
SELECT id FROM messages ORDER BY storedAt DESC LIMIT ?
);""",
@[$limit])
if execRes.isErr():
Expand Down

0 comments on commit a0033df

Please sign in to comment.