Skip to content

Commit

Permalink
fix: store v3 bug fixes (#2718)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored May 23, 2024
1 parent ebe69be commit 4a6ec46
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 51 deletions.
43 changes: 43 additions & 0 deletions tests/waku_archive/test_driver_postgres_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,49 @@ suite "Postgres driver - queries":
check:
filteredMessages == expected[2 .. 3].reversed()

asyncTest "only cursor - invalid":
## Given
const contentTopic = "test-content-topic"

var messages =
@[
fakeWakuMessage(@[byte 0], ts = ts(00)),
fakeWakuMessage(@[byte 1], ts = ts(10)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)),
fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)),
fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)),
fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)),
fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)),
fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)),
]

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())

## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
)

## Then
assert res.isOk(), res.error

check:
res.value.len == 0

asyncTest "content topic and cursor":
## Given
const contentTopic = "test-content-topic"
Expand Down
46 changes: 46 additions & 0 deletions tests/waku_archive/test_driver_queue_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,52 @@ suite "Queue driver - query by cursor":
## Cleanup
(waitFor driver.close()).expect("driver to close")

test "only cursor - invalid":
## Given
const contentTopic = "test-content-topic"

let driver = newTestSqliteDriver()

var messages =
@[
fakeWakuMessage(@[byte 0], ts = ts(00)),
fakeWakuMessage(@[byte 1], ts = ts(10)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)),
fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)),
fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)),
fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)),
fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)),
fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)),
]

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
let retFut = waitFor driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
require retFut.isOk()

let cursor = computeTestCursor(DefaultPubsubTopic, fakeWakuMessage())

## When
let res = waitFor driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
)

## Then
check:
res.isErr()
res.error == "invalid_cursor"

## Cleanup
(waitFor driver.close()).expect("driver to close")

test "content topic and cursor":
## Given
const contentTopic = "test-content-topic"
Expand Down
47 changes: 47 additions & 0 deletions tests/waku_archive/test_driver_sqlite_query.nim
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,53 @@ suite "SQLite driver - query by cursor":
## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "only cursor - invalid":
## Given
const contentTopic = "test-content-topic"

let driver = newSqliteArchiveDriver()

var messages =
@[
fakeWakuMessage(@[byte 0], ts = ts(00)),
fakeWakuMessage(@[byte 1], ts = ts(10)),
fakeWakuMessage(@[byte 2], contentTopic = contentTopic, ts = ts(20)),
fakeWakuMessage(@[byte 3], contentTopic = contentTopic, ts = ts(30)),
fakeWakuMessage(@[byte 4], contentTopic = contentTopic, ts = ts(40)),
fakeWakuMessage(@[byte 5], contentTopic = contentTopic, ts = ts(50)),
fakeWakuMessage(@[byte 6], contentTopic = contentTopic, ts = ts(60)),
fakeWakuMessage(@[byte 7], contentTopic = contentTopic, ts = ts(70)),
]

shuffle(messages)
debug "randomized message insertion sequence", sequence = messages.mapIt(it.payload)

for msg in messages:
require (
await driver.put(
DefaultPubsubTopic,
msg,
computeDigest(msg),
computeMessageHash(DefaultPubsubTopic, msg),
msg.timestamp,
)
).isOk()

let cursor = computeArchiveCursor(DefaultPubsubTopic, fakeWakuMessage())

## When
let res = await driver.getMessages(
cursor = some(cursor), maxPageSize = 2, ascendingOrder = false
)

## Then
check:
res.isOk()
res.value.len == 0

## Cleanup
(await driver.close()).expect("driver to close")

asyncTest "content topic and cursor":
## Given
const contentTopic = "test-content-topic"
Expand Down
74 changes: 30 additions & 44 deletions waku/waku_api/rest/store/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ else:
{.push raises: [].}

import
std/[sets, strformat, uri, options],
stew/[byteutils, arrayops],
std/[sets, strformat, uri, options, sequtils],
stew/byteutils,
chronicles,
json_serialization,
json_serialization/std/options,
Expand All @@ -30,7 +30,7 @@ proc parseHash*(input: Option[string]): Result[Option[WakuMessageHash], string]
if base64UrlEncoded == "":
return ok(none(WakuMessageHash))

let base64Encoded = decodeUrl(base64UrlEncoded)
let base64Encoded = decodeUrl(base64UrlEncoded, false)

let decodedBytes = base64.decode(Base64String(base64Encoded)).valueOr:
return err("waku message hash parsing error: " & error)
Expand All @@ -45,7 +45,7 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] =
if not input.isSome() or input.get() == "":
return ok(hashes)

let decodedUrl = decodeUrl(input.get())
let decodedUrl = decodeUrl(input.get(), false)

if decodedUrl != "":
for subString in decodedUrl.split(','):
Expand All @@ -62,7 +62,7 @@ proc parseHashes*(input: Option[string]): Result[seq[WakuMessageHash], string] =
# and this result is URL-encoded.
proc toRestStringWakuMessageHash*(self: WakuMessageHash): string =
let base64Encoded = base64.encode(self)
encodeUrl($base64Encoded)
encodeUrl($base64Encoded, false)

## WakuMessage serde

Expand Down Expand Up @@ -146,50 +146,21 @@ proc readValue*(
proof: proof,
)

## WakuMessageHash serde

proc writeValue*(
writer: var JsonWriter, value: WakuMessageHash
) {.gcsafe, raises: [IOError].} =
writer.beginRecord()
writer.writeField("data", base64.encode(value))
writer.endRecord()

proc readValue*(
reader: var JsonReader, value: var WakuMessageHash
) {.gcsafe, raises: [SerializationError, IOError].} =
var data = none(seq[byte])

for fieldName in readObjectFields(reader):
case fieldName
of "data":
if data.isSome():
reader.raiseUnexpectedField("Multiple `data` fields found", "WakuMessageHash")
let decoded = base64.decode(reader.readValue(Base64String))
if not decoded.isOk():
reader.raiseUnexpectedField("Failed decoding data", "WakuMessageHash")
data = some(decoded.get())
else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))

if data.isNone():
reader.raiseUnexpectedValue("Field `data` is missing")

for i in 0 ..< 32:
value[i] = data.get()[i]

## WakuMessageKeyValue serde

proc writeValue*(
writer: var JsonWriter, value: WakuMessageKeyValue
) {.gcsafe, raises: [IOError].} =
writer.beginRecord()

writer.writeField("messageHash", value.messageHash)
writer.writeField("messageHash", base64.encode(value.messageHash))

if value.message.isSome():
writer.writeField("message", value.message.get())

if value.pubsubTopic.isSome():
writer.writeField("pubsubTopic", value.pubsubTopic.get())

writer.endRecord()

proc readValue*(
Expand All @@ -198,6 +169,7 @@ proc readValue*(
var
messageHash = none(WakuMessageHash)
message = none(WakuMessage)
pubsubTopic = none(PubsubTopic)

for fieldName in readObjectFields(reader):
case fieldName
Expand All @@ -206,20 +178,31 @@ proc readValue*(
reader.raiseUnexpectedField(
"Multiple `messageHash` fields found", "WakuMessageKeyValue"
)
messageHash = some(reader.readValue(WakuMessageHash))
let base64String = reader.readValue(Base64String)
let bytes = base64.decode(base64String).valueOr:
reader.raiseUnexpectedField("Failed decoding data", "messageHash")
messageHash = some(fromBytes(bytes))
of "message":
if message.isSome():
reader.raiseUnexpectedField(
"Multiple `message` fields found", "WakuMessageKeyValue"
)
message = some(reader.readValue(WakuMessage))
of "pubsubTopic":
if pubsubTopic.isSome():
reader.raiseUnexpectedField(
"Multiple `pubsubTopic` fields found", "WakuMessageKeyValue"
)
pubsubTopic = some(reader.readValue(string))
else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))

if messageHash.isNone():
reader.raiseUnexpectedValue("Field `messageHash` is missing")

value = WakuMessageKeyValue(messageHash: messageHash.get(), message: message)
value = WakuMessageKeyValue(
messageHash: messageHash.get(), message: message, pubsubTopic: pubsubTopic
)

## StoreQueryResponse serde

Expand All @@ -234,7 +217,7 @@ proc writeValue*(
writer.writeField("messages", value.messages)

if value.paginationCursor.isSome():
writer.writeField("paginationCursor", value.paginationCursor.get())
writer.writeField("paginationCursor", base64.encode(value.paginationCursor.get()))

writer.endRecord()

Expand Down Expand Up @@ -279,7 +262,10 @@ proc readValue*(
reader.raiseUnexpectedField(
"Multiple `paginationCursor` fields found", "StoreQueryResponse"
)
cursor = some(reader.readValue(WakuMessageHash))
let base64String = reader.readValue(Base64String)
let bytes = base64.decode(base64String).valueOr:
reader.raiseUnexpectedField("Failed decoding data", "paginationCursor")
cursor = some(fromBytes(bytes))
else:
reader.raiseUnexpectedField("Unrecognided field", cstring(fieldName))

Expand Down Expand Up @@ -324,10 +310,10 @@ proc writeValue*(
if req.endTime.isSome():
writer.writeField("endTime", req.endTime.get())

writer.writeField("messageHashes", req.messageHashes)
writer.writeField("messageHashes", req.messageHashes.mapIt(base64.encode(it)))

if req.paginationCursor.isSome():
writer.writeField("paginationCursor", req.paginationCursor.get())
writer.writeField("paginationCursor", base64.encode(req.paginationCursor.get()))

writer.writeField("paginationForward", req.paginationForward)

Expand Down
Loading

0 comments on commit 4a6ec46

Please sign in to comment.