Skip to content

Commit

Permalink
feat: supporting meta field in store (#2609)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored and Ivansete-status committed May 14, 2024
1 parent 5a4d8ed commit f821791
Show file tree
Hide file tree
Showing 12 changed files with 286 additions and 36 deletions.
71 changes: 71 additions & 0 deletions migrations/message_store_postgres/content_script_version_4.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
const ContentScriptVersion_4* =
"""
ALTER TABLE IF EXISTS messages_backup RENAME TO messages;
ALTER TABLE messages RENAME TO messages_backup;
ALTER TABLE messages_backup DROP CONSTRAINT messageIndex;
CREATE TABLE IF NOT EXISTS messages (
pubsubTopic VARCHAR NOT NULL,
contentTopic VARCHAR NOT NULL,
payload VARCHAR,
version INTEGER NOT NULL,
timestamp BIGINT NOT NULL,
id VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
storedAt BIGINT NOT NULL,
meta VARCHAR,
CONSTRAINT messageIndex PRIMARY KEY (messageHash, storedAt)
) PARTITION BY RANGE (storedAt);
DO $$
DECLARE
min_storedAt numeric;
max_storedAt numeric;
min_storedAtSeconds integer = 0;
max_storedAtSeconds integer = 0;
partition_name TEXT;
create_partition_stmt TEXT;
BEGIN
SELECT MIN(storedAt) into min_storedAt
FROM messages_backup;
SELECT MAX(storedAt) into max_storedAt
FROM messages_backup;
min_storedAtSeconds := min_storedAt / 1000000000;
max_storedAtSeconds := max_storedAt / 1000000000;
partition_name := 'messages_' || min_storedAtSeconds || '_' || max_storedAtSeconds;
create_partition_stmt := 'CREATE TABLE ' || partition_name ||
' PARTITION OF messages FOR VALUES FROM (' ||
min_storedAt || ') TO (' || (max_storedAt + 1) || ')';
IF min_storedAtSeconds > 0 AND max_storedAtSeconds > 0 THEN
EXECUTE create_partition_stmt USING partition_name, min_storedAt, max_storedAt;
END IF;
END $$;
INSERT INTO messages (
pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
)
SELECT pubsubTopic,
contentTopic,
payload,
version,
timestamp,
id,
messageHash,
storedAt
FROM messages_backup;
DROP TABLE messages_backup;
UPDATE version SET version = 4 WHERE version = 3;
"""
5 changes: 4 additions & 1 deletion migrations/message_store_postgres/pg_migration_manager.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import content_script_version_1, content_script_version_2, content_script_version_3
import
content_script_version_1, content_script_version_2, content_script_version_3,
content_script_version_4

type MigrationScript* = object
version*: int
Expand All @@ -12,6 +14,7 @@ const PgMigrationScripts* =
MigrationScript(version: 1, scriptContent: ContentScriptVersion_1),
MigrationScript(version: 2, scriptContent: ContentScriptVersion_2),
MigrationScript(version: 3, scriptContent: ContentScriptVersion_3),
MigrationScript(version: 4, scriptContent: ContentScriptVersion_4),
]

proc getMigrationScripts*(currentVersion: int64, targetVersion: int64): seq[string] =
Expand Down
11 changes: 9 additions & 2 deletions tests/testlib/wakucore.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,27 @@ export waku_core.DefaultPubsubTopic, waku_core.DefaultContentTopic
proc fakeWakuMessage*(
payload: string | seq[byte] = "TEST-PAYLOAD",
contentTopic = DefaultContentTopic,
meta = newSeq[byte](),
meta: string | seq[byte] = newSeq[byte](),
ts = now(),
ephemeral = false,
): WakuMessage =
var payloadBytes: seq[byte]
var metaBytes: seq[byte]

when payload is string:
payloadBytes = toBytes(payload)
else:
payloadBytes = payload

when meta is string:
metaBytes = toBytes(meta)
else:
metaBytes = meta

WakuMessage(
payload: payloadBytes,
contentTopic: contentTopic,
meta: meta,
meta: metaBytes,
version: 2,
timestamp: ts,
ephemeral: ephemeral,
Expand Down
4 changes: 3 additions & 1 deletion tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ suite "Postgres driver":

asyncTest "Insert a message":
const contentTopic = "test-content-topic"
const meta = "test meta"

let msg = fakeWakuMessage(contentTopic = contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)

let computedDigest = computeDigest(msg)
let computedHash = computeMessageHash(DefaultPubsubTopic, msg)
Expand All @@ -75,6 +76,7 @@ suite "Postgres driver":
assert toHex(computedDigest.data) == toHex(digest)
assert toHex(actualMsg.payload) == toHex(msg.payload)
assert toHex(computedHash) == toHex(hash)
assert toHex(actualMsg.meta) == toHex(msg.meta)

asyncTest "Insert and query message":
const contentTopic1 = "test-content-topic-1"
Expand Down
55 changes: 55 additions & 0 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,61 @@ suite "Postgres driver - queries":
check:
filteredMessages == expected[2 .. 3]

asyncTest "single content topic with meta field":
## Given
const contentTopic = "test-content-topic"

let expected =
@[
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
fakeWakuMessage(
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
),
fakeWakuMessage(
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
),
fakeWakuMessage(
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
),
fakeWakuMessage(
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
),
fakeWakuMessage(
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
),
fakeWakuMessage(
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
),
]
var messages = expected

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()

## When
let res = await driver.getMessages(
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
)

## Then
assert res.isOk(), res.error

let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expected[2 .. 3]

asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"
Expand Down
9 changes: 5 additions & 4 deletions tests/waku_archive/test_driver_sqlite.nim
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ suite "SQLite driver":
test "insert a message":
## Given
const contentTopic = "test-content-topic"
const meta = "test meta"

let driver = newSqliteArchiveDriver()

let msg = fakeWakuMessage(contentTopic = contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic, meta = meta)
let msgHash = computeMessageHash(DefaultPubsubTopic, msg)

## When
Expand All @@ -51,9 +52,9 @@ suite "SQLite driver":
check:
storedMsg.len == 1
storedMsg.all do(item: auto) -> bool:
let (pubsubTopic, msg, _, _, hash) = item
msg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
hash == msgHash
let (pubsubTopic, actualMsg, _, _, hash) = item
actualMsg.contentTopic == contentTopic and pubsubTopic == DefaultPubsubTopic and
hash == msgHash and msg.meta == actualMsg.meta

## Cleanup
(waitFor driver.close()).expect("driver to close")
61 changes: 61 additions & 0 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,67 @@ suite "SQLite driver - query by content topic":
## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "single content topic with meta field":
## Given
const contentTopic = "test-content-topic"

let driver = newSqliteArchiveDriver()

let expected =
@[
fakeWakuMessage(@[byte 0], ts = ts(00), meta = "meta-0"),
fakeWakuMessage(@[byte 1], ts = ts(10), meta = "meta-1"),
fakeWakuMessage(
@[byte 2], contentTopic = contentTopic, ts = ts(20), meta = "meta-2"
),
fakeWakuMessage(
@[byte 3], contentTopic = contentTopic, ts = ts(30), meta = "meta-3"
),
fakeWakuMessage(
@[byte 4], contentTopic = contentTopic, ts = ts(40), meta = "meta-4"
),
fakeWakuMessage(
@[byte 5], contentTopic = contentTopic, ts = ts(50), meta = "meta-5"
),
fakeWakuMessage(
@[byte 6], contentTopic = contentTopic, ts = ts(60), meta = "meta-6"
),
fakeWakuMessage(
@[byte 7], contentTopic = contentTopic, ts = ts(70), meta = "meta-7"
),
]
var messages = expected

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()

## When
let res = await driver.getMessages(
contentTopic = @[contentTopic], maxPageSize = 2, ascendingOrder = true
)

## Then
check:
res.isOk()

let filteredMessages = res.tryGet().mapIt(it[1])
check:
filteredMessages == expected[2 .. 3]

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "single content topic - descending order":
## Given
const contentTopic = "test-content-topic"
Expand Down
1 change: 1 addition & 0 deletions waku/factory/validator_signed.nim
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ proc msgHash*(pubSubTopic: string, msg: WakuMessage): array[32, byte] =
ctx.update(msg.payload)
ctx.update(msg.contentTopic.toBytes())
ctx.update(msg.timestamp.uint64.toBytes(Endianness.littleEndian))
# ctx.update(msg.meta) meta is not included in the message hash, as the signature goes in the meta field
ctx.update(
if msg.ephemeral:
@[1.byte]
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver/postgres_driver/migrations.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import
logScope:
topics = "waku archive migration"

const SchemaVersion* = 3 # increase this when there is an update in the database schema
const SchemaVersion* = 4 # increase this when there is an update in the database schema

proc breakIntoStatements*(script: string): seq[string] =
## Given a full migration script, that can potentially contain a list
Expand Down
Loading

0 comments on commit f821791

Please sign in to comment.