Skip to content

Commit

Permalink
Merge d92459a into e61e4ff
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Apr 22, 2024
2 parents e61e4ff + d92459a commit aafb959
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 25 deletions.
71 changes: 71 additions & 0 deletions migrations/message_store_postgres/content_script_version_4.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const ContentScriptVersion_4* =
"""
ALTER TABLE IF EXISTS messages_backup RENAME TO messages;
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
meta VARCHAR,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
) PARTITION BY RANGE (storedAt);
DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;
SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;
min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;
INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;
DROP TABLE messages_backup;
UPDATE version SET version = 4 WHERE version = 3;
"""
5 changes: 4 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import content_script_version_1, content_script_version_2, content_script_version_3
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4

type MigrationScript* = object
version*: int
Expand All @@ -12,6 +14,7 @@ const PgMigrationScripts* =
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
MigrationScript(version: 3, scriptContent: ContentScriptVersion_4),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
1 change: 1 addition & 0 deletions waku/factory/validator_signed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
# ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field
ctx.update(
if msg.ephemeral:
@[1.byte]
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 3 # increase this when there is an update in the database schema
const SchemaVersion* = 4 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
35 changes: 25 additions & 10 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ type PostgresDriver* = ref object of ArchiveDriver
const InsertRowStmtName = "InsertRow"
const InsertRowStmtDefinition = # TODO: get the sql queries from a file
"""INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic,
version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;"""
version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT DO NOTHING;"""

const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc"
const SelectNoCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
Expand All @@ -44,7 +44,7 @@ const SelectNoCursorAscStmtDef =

const SelectNoCursorDescStmtName = "SelectWithoutCursorDesc"
const SelectNoCursorDescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
storedAt >= $3 AND
Expand All @@ -53,7 +53,7 @@ const SelectNoCursorDescStmtDef =

const SelectWithCursorDescStmtName = "SelectWithCursorDesc"
const SelectWithCursorDescStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) < ($3,$4) AND
Expand All @@ -63,7 +63,7 @@ const SelectWithCursorDescStmtDef =

const SelectWithCursorAscStmtName = "SelectWithCursorAsc"
const SelectWithCursorAscStmtDef =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages
WHERE contentTopic IN ($1) AND
pubsubTopic = $2 AND
(storedAt, id) > ($3,$4) AND
Expand Down Expand Up @@ -119,7 +119,7 @@ proc rowCallbackImpl(
## outRows - seq of Store-rows. This is populated from the info contained in pqResult

let numFields = pqResult.pqnfields()
if numFields != 8:
if numFields != 9:
error "Wrong number of fields"
return

Expand All @@ -134,6 +134,7 @@ proc rowCallbackImpl(
var payload: string
var hashHex: string
var msgHash: WakuMessageHash
var meta: string

try:
storedAt = parseInt($(pqgetvalue(pqResult, iRow, 0)))
Expand All @@ -144,6 +145,7 @@ proc rowCallbackImpl(
timestamp = parseInt($(pqgetvalue(pqResult, iRow, 5)))
digest = parseHexStr($(pqgetvalue(pqResult, iRow, 6)))
hashHex = parseHexStr($(pqgetvalue(pqResult, iRow, 7)))
meta = parseHexStr($(pqgetvalue(pqResult, iRow, 8)))
msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31))
except ValueError:
error "could not parse correctly", error = getCurrentExceptionMsg()
Expand All @@ -152,6 +154,7 @@ proc rowCallbackImpl(
wakuMessage.version = uint32(version)
wakuMessage.contentTopic = contentTopic
wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high))
wakuMessage.meta = @(meta.toOpenArrayByte(0, meta.high))

outRows.add(
(
Expand All @@ -178,6 +181,7 @@ method put*(
let payload = toHex(message.payload)
let version = $message.version
let timestamp = $message.timestamp
let meta = toHex(message.meta)

debug "put PostgresDriver", timestamp = timestamp

Expand All @@ -186,7 +190,7 @@ method put*(
InsertRowStmtDefinition,
@[
digest, messageHash, rxTime, contentTopic, payload, pubsubTopic, version,
timestamp,
timestamp, meta,
],
@[
int32(digest.len),
Expand All @@ -197,8 +201,19 @@ method put*(
int32(pubsubTopic.len),
int32(version.len),
int32(timestamp.len),
int32(meta.len),
],
@[
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
int32(0),
],
@[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)],
)

method getAllMessages*(
Expand All @@ -214,7 +229,7 @@ method getAllMessages*(
await s.readConnPool.pgQuery(
"""SELECT storedAt, contentTopic,
payload, pubsubTopic, version, timestamp,
id, messageHash FROM messages ORDER BY storedAt ASC""",
id, messageHash, meta FROM messages ORDER BY storedAt ASC""",
newSeq[string](0),
rowCallback,
)
Expand Down Expand Up @@ -269,7 +284,7 @@ proc getMessagesArbitraryQuery(
## This proc allows to handle atypical queries. We don't use prepared statements for those.

var query =
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages"""
"""SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages"""
var statements: seq[string]
var args: seq[string]

Expand Down
49 changes: 36 additions & 13 deletions waku/waku_archive/driver/sqlite_driver/queries.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,29 @@ type SqlQueryStr = string

proc queryRowWakuMessageCallback(
s: ptr sqlite3_stmt,
contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint,
contentTopicCol, payloadCol, versionCol, senderTimestampCol, metaCol: cint,
): WakuMessage =
let
topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol))
topicLength = sqlite3_column_bytes(s, contentTopicCol)
contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength - 1)))

p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol))
m = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, metaCol))

length = sqlite3_column_bytes(s, payloadCol)
payload = @(toOpenArray(p, 0, length - 1))
payloadLength = sqlite3_column_bytes(s, payloadCol)
metaLength = sqlite3_column_bytes(s, metaCol)
payload = @(toOpenArray(p, 0, payloadLength - 1))
version = sqlite3_column_int64(s, versionCol)
senderTimestamp = sqlite3_column_int64(s, senderTimestampCol)
meta = @(toOpenArray(m, 0, metaLength - 1))

return WakuMessage(
contentTopic: ContentTopic(contentTopic),
payload: payload,
version: uint32(version),
timestamp: Timestamp(senderTimestamp),
meta: meta,
)

proc queryRowReceiverTimestampCallback(
Expand Down Expand Up @@ -83,8 +87,8 @@ proc createTableQuery(table: string): SqlQueryStr =
"CREATE TABLE IF NOT EXISTS " & table & " (" & " pubsubTopic BLOB NOT NULL," &
" contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," &
" timestamp INTEGER NOT NULL," & " id BLOB," & " messageHash BLOB," &
" storedAt INTEGER NOT NULL," & " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" &
") WITHOUT ROWID;"
" storedAt INTEGER NOT NULL," & " meta BLOB," &
" CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & ") WITHOUT ROWID;"

proc createTable*(db: SqliteDatabase): DatabaseResult[void] =
let query = createTableQuery(DbTable)
Expand Down Expand Up @@ -129,14 +133,23 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] =
return ok()

## Insert message
type InsertMessageParams* =
(seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp)
type InsertMessageParams* = (
seq[byte],
seq[byte],
Timestamp,
seq[byte],
seq[byte],
seq[byte],
int64,
Timestamp,
seq[byte],
)

proc insertMessageQuery(table: string): SqlQueryStr =
return
"INSERT INTO " & table &
"(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?);"
"(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp, meta)" &
" VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);"

proc prepareInsertMessageStmt*(
db: SqliteDatabase
Expand Down Expand Up @@ -244,7 +257,7 @@ proc deleteOldestMessagesNotWithinLimit*(

proc selectAllMessagesQuery(table: string): SqlQueryStr =
return
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" &
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta" &
" FROM " & table & " ORDER BY storedAt ASC"

proc selectAllMessages*(
Expand All @@ -258,7 +271,12 @@ proc selectAllMessages*(
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
wakuMessage = queryRowWakuMessageCallback(
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
s,
contentTopicCol = 1,
payloadCol = 2,
versionCol = 4,
senderTimestampCol = 5,
metaCol = 8,
)
digest = queryRowDigestCallback(s, digestCol = 6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
Expand Down Expand Up @@ -354,7 +372,7 @@ proc selectMessagesWithLimitQuery(
var query: string

query =
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash"
"SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta"
query &= " FROM " & table

if where.isSome():
Expand Down Expand Up @@ -457,7 +475,12 @@ proc selectMessagesByHistoryQueryWithLimit*(
let
pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3)
message = queryRowWakuMessageCallback(
s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5
s,
contentTopicCol = 1,
payloadCol = 2,
versionCol = 4,
senderTimestampCol = 5,
metaCol = 8,
)
digest = queryRowDigestCallback(s, digestCol = 6)
storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0)
Expand Down
1 change: 1 addition & 0 deletions waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ method put*(
toBytes(pubsubTopic), # pubsubTopic
int64(message.version), # version
message.timestamp, # senderTimestamp
message.meta, # meta
)
)

Expand Down

0 comments on commit aafb959

Please sign in to comment.