diff --git a/migrations/message_store_postgres/content_script_version_4.nim b/migrations/message_store_postgres/content_script_version_4.nim new file mode 100644 index 0000000000..d3a6ea7cb3 --- /dev/null +++ b/migrations/message_store_postgres/content_script_version_4.nim @@ -0,0 +1,71 @@ +const ContentScriptVersion_4* = + """ +ALTER TABLE IF EXISTS messages_backup RENAME TO messages; +ALTER TABLE messages RENAME TO messages_backup; +ALTER TABLE messages_backup DROP CONSTRAINT messageIndex; + +CREATE TABLE IF NOT EXISTS messages ( + pubsubTopic VARCHAR NOT NULL, + contentTopic VARCHAR NOT NULL, + payload VARCHAR, + version INTEGER NOT NULL, + timestamp BIGINT NOT NULL, + id VARCHAR NOT NULL, + messageHash VARCHAR NOT NULL, + storedAt BIGINT NOT NULL, + meta VARCHAR, + CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt) + ) PARTITION BY RANGE (storedAt); + +DO $$ +DECLARE + min_storedAt numeric; + max_storedAt numeric; + min_storedAtSeconds integer = 0; + max_storedAtSeconds integer = 0; + partition_name TEXT; + create_partition_stmt TEXT; +BEGIN + SELECT MIN(storedAt) into min_storedAt + FROM messages_backup; + + SELECT MAX(storedAt) into max_storedAt + FROM messages_backup; + + min_storedAtSeconds := min_storedAt / 1000000000; + max_storedAtSeconds := max_storedAt / 1000000000; + + partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds; + create_partition_stmt := 'CREATE TABLE ' || partition_name || + ' PARTITION OF messages FOR VALUES FROM (' || + min_storedAt || ') TO (' || (max_storedAt + 1) || ')'; + IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN + EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt; + END IF; +END $$; + +INSERT INTO messages ( + pubsubTopic, + contentTopic, + payload, + version, + timestamp, + id, + messageHash, + storedAt + ) + SELECT pubsubTopic, + contentTopic, + payload, + version, + timestamp, + id, + messageHash, + storedAt + FROM messages_backup; + +DROP TABLE messages_backup; + +UPDATE version SET version = 4 WHERE version = 3; + +""" diff --git a/migrations/message_store_postgres/pg_migration_manager.nim b/migrations/message_store_postgres/pg_migration_manager.nim index d4de18f6d9..2293f4027f 100644 --- a/migrations/message_store_postgres/pg_migration_manager.nim +++ b/migrations/message_store_postgres/pg_migration_manager.nim @@ -1,4 +1,6 @@ -import content_script_version_1, content_script_version_2, content_script_version_3 +import + content_script_version_1, content_script_version_2, content_script_version_3, + content_script_version_4 type MigrationScript* = object version*: int @@ -12,6 +14,7 @@ const PgMigrationScripts* = MigrationScript(version: 1, scriptContent: ContentScriptVersion_1), MigrationScript(version: 2, scriptContent: ContentScriptVersion_2), MigrationScript(version: 3, scriptContent: ContentScriptVersion_3), + MigrationScript(version: 4, scriptContent: ContentScriptVersion_4), ] proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] = diff --git a/tests/testlib/wakucore.nim b/tests/testlib/wakucore.nim index 0e1665697f..1001a7116b 100644 --- a/tests/testlib/wakucore.nim +++ b/tests/testlib/wakucore.nim @@ -47,20 +47,27 @@ export waku_core.DefaultPubsubTopic, waku_core.DefaultContentTopic proc fakeWakuMessage*( payload: string | seq[byte] = "TEST-PAYLOAD", contentTopic = DefaultContentTopic, - meta = newSeq[byte](), + meta: string | seq[byte] = newSeq[byte](), ts = now(), ephemeral = false, ): WakuMessage = var payloadBytes: seq[byte] + var metaBytes: seq[byte] + when payload is string: payloadBytes = toBytes(payload) else: payloadBytes = payload + when meta is string: + metaBytes = toBytes(meta) + else: + metaBytes = meta + WakuMessage( payload: payloadBytes, contentTopic: contentTopic, - meta: meta, + meta: metaBytes, version: 2, timestamp: ts, ephemeral: ephemeral, diff --git a/tests/waku_archive/test_driver_postgres.nim b/tests/waku_archive/test_driver_postgres.nim index a42cf1fb12..75afb226fa 100644 --- a/tests/waku_archive/test_driver_postgres.nim +++ b/tests/waku_archive/test_driver_postgres.nim @@ -54,8 +54,9 @@ suite "Postgres driver": asyncTest "Insert a message": const contentTopic = "test-content-topic" + const meta = "test meta" - let msg = fakeWakuMessage(contentTopic = contentTopic) + let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta) let computedDigest = computeDigest(msg) let computedHash = computeMessageHash(DefaultPubsubTopic, msg) @@ -75,6 +76,7 @@ suite "Postgres driver": assert toHex(computedDigest.data) == toHex(digest) assert toHex(actualMsg.payload) == toHex(msg.payload) assert toHex(computedHash) == toHex(hash) + assert toHex(actualMsg.meta) == toHex(msg.meta) asyncTest "Insert and query message": const contentTopic1 = "test-content-topic-1" diff --git a/tests/waku_archive/test_driver_postgres_query.nim b/tests/waku_archive/test_driver_postgres_query.nim index e4eab0a3dd..b9e33dac6e 100644 --- a/tests/waku_archive/test_driver_postgres_query.nim +++ b/tests/waku_archive/test_driver_postgres_query.nim @@ -133,6 +133,61 @@ suite "Postgres driver - queries": check: filteredMessages == expected[2 .. 3] + asyncTest "single content topic with meta field": + ## Given + const contentTopic = "test-content-topic" + + let expected = + @[ + fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"), + fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"), + fakeWakuMessage( + @[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2" + ), + fakeWakuMessage( + @[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3" + ), + fakeWakuMessage( + @[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4" + ), + fakeWakuMessage( + @[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5" + ), + fakeWakuMessage( + @[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6" + ), + fakeWakuMessage( + @[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7" + ), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload) + + for msg in messages: + require ( + await driver.put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + ).isOk() + + ## When + let res = await driver.getMessages( + contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true + ) + + ## Then + assert res.isOk(), res.error + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2 .. 3] + asyncTest "single content topic - descending order": ## Given const contentTopic = "test-content-topic" diff --git a/tests/waku_archive/test_driver_sqlite.nim b/tests/waku_archive/test_driver_sqlite.nim index c53d7f2ea5..72f560ca9a 100644 --- a/tests/waku_archive/test_driver_sqlite.nim +++ b/tests/waku_archive/test_driver_sqlite.nim @@ -32,10 +32,11 @@ suite "SQLite driver": test "insert a message": ## Given const contentTopic = "test-content-topic" + const meta = "test meta" let driver = newSqliteArchiveDriver() - let msg = fakeWakuMessage(contentTopic = contentTopic) + let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta) let msgHash = computeMessageHash(DefaultPubsubTopic, msg) ## When @@ -51,9 +52,9 @@ suite "SQLite driver": check: storedMsg.len == 1 storedMsg.all do(item: auto) -> bool: - let (pubsubTopic, msg, _, _, hash) = item - msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and - hash == msgHash + let (pubsubTopic, actualMsg, _, _, hash) = item + actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and + hash == msgHash and msg.meta == actualMsg.meta ## Cleanup (waitFor driver.close()).expect("driver to close") diff --git a/tests/waku_archive/test_driver_sqlite_query.nim b/tests/waku_archive/test_driver_sqlite_query.nim index f9d6a27465..6ae2ce4149 100644 --- a/tests/waku_archive/test_driver_sqlite_query.nim +++ b/tests/waku_archive/test_driver_sqlite_query.nim @@ -116,6 +116,67 @@ suite "SQLite driver - query by content topic": ## Cleanup (await driver.close()).expect("driver to close") + asyncTest "single content topic with meta field": + ## Given + const contentTopic = "test-content-topic" + + let driver = newSqliteArchiveDriver() + + let expected = + @[ + fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"), + fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"), + fakeWakuMessage( + @[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2" + ), + fakeWakuMessage( + @[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3" + ), + fakeWakuMessage( + @[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4" + ), + fakeWakuMessage( + @[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5" + ), + fakeWakuMessage( + @[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6" + ), + fakeWakuMessage( + @[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7" + ), + ] + var messages = expected + + shuffle(messages) + debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload) + + for msg in messages: + require ( + await driver.put( + DefaultPubsubTopic, + msg, + computeDigest(msg), + computeMessageHash(DefaultPubsubTopic, msg), + msg.timestamp, + ) + ).isOk() + + ## When + let res = await driver.getMessages( + contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true + ) + + ## Then + check: + res.isOk() + + let filteredMessages = res.tryGet().mapIt(it[1]) + check: + filteredMessages == expected[2 .. 3] + + ## Cleanup + (await driver.close()).expect("driver to close") + asyncTest "single content topic - descending order": ## Given const contentTopic = "test-content-topic" diff --git a/waku/factory/validator_signed.nim b/waku/factory/validator_signed.nim index ddb45102f0..f33d362dee 100644 --- a/waku/factory/validator_signed.nim +++ b/waku/factory/validator_signed.nim @@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] = ctx.update(msg.payload) ctx.update(msg.contentTopic.toBytes()) ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian)) + # ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field ctx.update( if msg.ephemeral: @[1.byte] diff --git a/waku/waku_archive/driver/postgres_driver/migrations.nim b/waku/waku_archive/driver/postgres_driver/migrations.nim index a7f4814792..a3e376e0ae 100644 --- a/waku/waku_archive/driver/postgres_driver/migrations.nim +++ b/waku/waku_archive/driver/postgres_driver/migrations.nim @@ -9,7 +9,7 @@ import logScope: topics = "waku archive migration" -const SchemaVersion* = 3 # increase this when there is an update in the database schema +const SchemaVersion* = 4 # increase this when there is an update in the database schema proc breakIntoStatements*(script: string): seq[string] = ## Given a full migration script, that can potentially contain a list diff --git a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim index f7487c5518..ee492fa7bf 100644 --- a/waku/waku_archive/driver/postgres_driver/postgres_driver.nim +++ b/waku/waku_archive/driver/postgres_driver/postgres_driver.nim @@ -31,11 +31,11 @@ type PostgresDriver* = ref object of ArchiveDriver const InsertRowStmtName = "InsertRow" const InsertRowStmtDefinition = # TODO: get the sql queries from a file """INSERT INTO messages (id, messageHash, storedAt, contentTopic, payload, pubsubTopic, - version, timestamp) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT DO NOTHING;""" + version, timestamp, meta) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, CASE WHEN $9 = '' THEN NULL ELSE $9 END) ON CONFLICT DO NOTHING;""" const SelectNoCursorAscStmtName = "SelectWithoutCursorAsc" const SelectNoCursorAscStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages WHERE contentTopic IN ($1) AND messageHash IN ($2) AND pubsubTopic = $3 AND @@ -86,7 +86,7 @@ const SelectNoCursorV2AscStmtDef = const SelectNoCursorV2DescStmtName = "SelectWithoutCursorV2Desc" const SelectNoCursorV2DescStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND storedAt >= $3 AND @@ -95,7 +95,7 @@ const SelectNoCursorV2DescStmtDef = const SelectWithCursorV2DescStmtName = "SelectWithCursorV2Desc" const SelectWithCursorV2DescStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND (storedAt, id) < ($3,$4) AND @@ -105,7 +105,7 @@ const SelectWithCursorV2DescStmtDef = const SelectWithCursorV2AscStmtName = "SelectWithCursorV2Asc" const SelectWithCursorV2AscStmtDef = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages WHERE contentTopic IN ($1) AND pubsubTopic = $2 AND (storedAt, id) > ($3,$4) AND @@ -161,7 +161,7 @@ proc rowCallbackImpl( ## outRows - seq of Store-rows. This is populated from the info contained in pqResult let numFields = pqResult.pqnfields() - if numFields != 8: + if numFields != 9: error "Wrong number of fields" return @@ -176,6 +176,7 @@ proc rowCallbackImpl( var payload: string var hashHex: string var msgHash: WakuMessageHash + var meta: string try: storedAt = parseInt($(pqgetvalue(pqResult, iRow, 0))) @@ -186,6 +187,7 @@ proc rowCallbackImpl( timestamp = parseInt($(pqgetvalue(pqResult, iRow, 5))) digest = parseHexStr($(pqgetvalue(pqResult, iRow, 6))) hashHex = parseHexStr($(pqgetvalue(pqResult, iRow, 7))) + meta = parseHexStr($(pqgetvalue(pqResult, iRow, 8))) msgHash = fromBytes(hashHex.toOpenArrayByte(0, 31)) except ValueError: error "could not parse correctly", error = getCurrentExceptionMsg() @@ -194,6 +196,7 @@ proc rowCallbackImpl( wakuMessage.version = uint32(version) wakuMessage.contentTopic = contentTopic wakuMessage.payload = @(payload.toOpenArrayByte(0, payload.high)) + wakuMessage.meta = @(meta.toOpenArrayByte(0, meta.high)) outRows.add( ( @@ -220,6 +223,7 @@ method put*( let payload = toHex(message.payload) let version = $message.version let timestamp = $message.timestamp + let meta = toHex(message.meta) trace "put PostgresDriver", timestamp = timestamp @@ -228,7 +232,7 @@ method put*( InsertRowStmtDefinition, @[ digest, messageHash, rxTime, contentTopic, payload, pubsubTopic, version, - timestamp, + timestamp, meta, ], @[ int32(digest.len), @@ -239,8 +243,19 @@ method put*( int32(pubsubTopic.len), int32(version.len), int32(timestamp.len), + int32(meta.len), + ], + @[ + int32(0), + int32(0), + int32(0), + int32(0), + int32(0), + int32(0), + int32(0), + int32(0), + int32(0), ], - @[int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0), int32(0)], ) method getAllMessages*( @@ -256,7 +271,7 @@ method getAllMessages*( await s.readConnPool.pgQuery( """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, - id, messageHash FROM messages ORDER BY storedAt ASC""", + id, messageHash, meta FROM messages ORDER BY storedAt ASC""", newSeq[string](0), rowCallback, ) @@ -311,7 +326,7 @@ proc getMessagesArbitraryQuery( ## This proc allows to handle atypical queries. We don't use prepared statements for those. var query = - """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash FROM messages""" + """SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta FROM messages""" var statements: seq[string] var args: seq[string] diff --git a/waku/waku_archive/driver/sqlite_driver/queries.nim b/waku/waku_archive/driver/sqlite_driver/queries.nim index 5c837b3bcb..3aeac41cbd 100644 --- a/waku/waku_archive/driver/sqlite_driver/queries.nim +++ b/waku/waku_archive/driver/sqlite_driver/queries.nim @@ -18,7 +18,7 @@ type SqlQueryStr = string proc queryRowWakuMessageCallback( s: ptr sqlite3_stmt, - contentTopicCol, payloadCol, versionCol, senderTimestampCol: cint, + contentTopicCol, payloadCol, versionCol, senderTimestampCol, metaCol: cint, ): WakuMessage = let topic = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, contentTopicCol)) @@ -26,17 +26,21 @@ proc queryRowWakuMessageCallback( contentTopic = string.fromBytes(@(toOpenArray(topic, 0, topicLength - 1))) p = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, payloadCol)) + m = cast[ptr UncheckedArray[byte]](sqlite3_column_blob(s, metaCol)) - length = sqlite3_column_bytes(s, payloadCol) - payload = @(toOpenArray(p, 0, length - 1)) + payloadLength = sqlite3_column_bytes(s, payloadCol) + metaLength = sqlite3_column_bytes(s, metaCol) + payload = @(toOpenArray(p, 0, payloadLength - 1)) version = sqlite3_column_int64(s, versionCol) senderTimestamp = sqlite3_column_int64(s, senderTimestampCol) + meta = @(toOpenArray(m, 0, metaLength - 1)) return WakuMessage( contentTopic: ContentTopic(contentTopic), payload: payload, version: uint32(version), timestamp: Timestamp(senderTimestamp), + meta: meta, ) proc queryRowReceiverTimestampCallback( @@ -83,8 +87,8 @@ proc createTableQuery(table: string): SqlQueryStr = "CREATE TABLE IF NOT EXISTS " & table & " (" & " pubsubTopic BLOB NOT NULL," & " contentTopic BLOB NOT NULL," & " payload BLOB," & " version INTEGER NOT NULL," & " timestamp INTEGER NOT NULL," & " id BLOB," & " messageHash BLOB," & - " storedAt INTEGER NOT NULL," & " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & - ") WITHOUT ROWID;" + " storedAt INTEGER NOT NULL," & " meta BLOB," & + " CONSTRAINT messageIndex PRIMARY KEY (messageHash)" & ") WITHOUT ROWID;" proc createTable*(db: SqliteDatabase): DatabaseResult[void] = let query = createTableQuery(DbTable) @@ -129,14 +133,23 @@ proc createHistoryQueryIndex*(db: SqliteDatabase): DatabaseResult[void] = return ok() ## Insert message -type InsertMessageParams* = - (seq[byte], seq[byte], Timestamp, seq[byte], seq[byte], seq[byte], int64, Timestamp) +type InsertMessageParams* = ( + seq[byte], + seq[byte], + Timestamp, + seq[byte], + seq[byte], + seq[byte], + int64, + Timestamp, + seq[byte], +) proc insertMessageQuery(table: string): SqlQueryStr = return "INSERT INTO " & table & - "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp)" & - " VALUES (?, ?, ?, ?, ?, ?, ?, ?);" + "(id, messageHash, storedAt, contentTopic, payload, pubsubTopic, version, timestamp, meta)" & + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);" proc prepareInsertMessageStmt*( db: SqliteDatabase @@ -244,7 +257,7 @@ proc deleteOldestMessagesNotWithinLimit*( proc selectAllMessagesQuery(table: string): SqlQueryStr = return - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" & + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta" & " FROM " & table & " ORDER BY storedAt ASC" proc selectAllMessages*( @@ -258,7 +271,12 @@ proc selectAllMessages*( let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3) wakuMessage = queryRowWakuMessageCallback( - s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5 + s, + contentTopicCol = 1, + payloadCol = 2, + versionCol = 4, + senderTimestampCol = 5, + metaCol = 8, ) digest = queryRowDigestCallback(s, digestCol = 6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0) @@ -342,7 +360,7 @@ proc selectMessagesWithLimitQueryv2( var query: string query = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta" query &= " FROM " & table if where.isSome(): @@ -435,7 +453,12 @@ proc selectMessagesByHistoryQueryWithLimit*( let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3) message = queryRowWakuMessageCallback( - s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5 + s, + contentTopicCol = 1, + payloadCol = 2, + versionCol = 4, + senderTimestampCol = 5, + metaCol = 8, ) digest = queryRowDigestCallback(s, digestCol = 6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0) @@ -485,7 +508,7 @@ proc execSelectMessageByHash( proc selectMessageByHashQuery(): SqlQueryStr = var query: string - query = "SELECT contentTopic, payload, version, timestamp, messageHash" + query = "SELECT contentTopic, payload, version, timestamp, meta, messageHash" query &= " FROM " & DbTable query &= " WHERE messageHash = (?)" @@ -621,7 +644,7 @@ proc selectMessagesWithLimitQuery( var query: string query = - "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash" + "SELECT storedAt, contentTopic, payload, pubsubTopic, version, timestamp, id, messageHash, meta" query &= " FROM " & table if where.isSome(): @@ -655,7 +678,12 @@ proc selectMessagesByStoreQueryWithLimit*( proc queryRowCallback(s: ptr sqlite3_stmt) = wakuMessage = queryRowWakuMessageCallback( - s, contentTopicCol = 0, payloadCol = 1, versionCol = 2, senderTimestampCol = 3 + s, + contentTopicCol = 0, + payloadCol = 1, + versionCol = 2, + senderTimestampCol = 3, + metaCol = 4, ) let query = selectMessageByHashQuery() @@ -676,7 +704,12 @@ proc selectMessagesByStoreQueryWithLimit*( let pubsubTopic = queryRowPubsubTopicCallback(s, pubsubTopicCol = 3) message = queryRowWakuMessageCallback( - s, contentTopicCol = 1, payloadCol = 2, versionCol = 4, senderTimestampCol = 5 + s, + contentTopicCol = 1, + payloadCol = 2, + versionCol = 4, + senderTimestampCol = 5, + metaCol = 8, ) digest = queryRowDigestCallback(s, digestCol = 6) storedAt = queryRowReceiverTimestampCallback(s, storedAtCol = 0) diff --git a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim index 05ec9c229c..b817282f51 100644 --- a/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim +++ b/waku/waku_archive/driver/sqlite_driver/sqlite_driver.nim @@ -72,6 +72,7 @@ method put*( toBytes(pubsubTopic), # pubsubTopic int64(message.version), # version message.timestamp, # senderTimestamp + message.meta, # meta ) )