Skip to content

Commit

Permalink
feat: Store v3
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Mar 12, 2024
1 parent 3c82375 commit 75177d9
Show file tree
Hide file tree
Showing 34 changed files with 2,680 additions and 1,492 deletions.
2 changes: 1 addition & 1 deletion apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import
../../waku/waku_lightpush/rpc,
../../waku/waku_filter,
../../waku/waku_enr,
../../waku/waku_store,
../../waku/waku_store_legacy,
../../waku/waku_dnsdisc,
../../waku/waku_node,
../../waku/node/waku_metrics,
Expand Down
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
../../waku/waku_api/rest/store/handlers as rest_store_api,
../../waku/waku_api/rest/legacy_store/handlers as rest_store_api,
../../waku/waku_api/rest/health/handlers as rest_health_api,
../../waku/waku_api/rest/admin/handlers as rest_admin_api,
../../waku/waku_archive,
Expand Down
4 changes: 2 additions & 2 deletions tests/testlib/futures.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ proc newPushHandlerFuture*(): Future[(string, WakuMessage)] =
proc newBoolFuture*(): Future[bool] =
newFuture[bool]()

proc newHistoryFuture*(): Future[HistoryQuery] =
newFuture[HistoryQuery]()
proc newHistoryFuture*(): Future[StoreQueryRequest] =
newFuture[StoreQueryRequest]()

proc toResult*[T](future: Future[T]): Result[T, string] =
if future.cancelled():
Expand Down
13 changes: 2 additions & 11 deletions tests/waku_store/store_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import
wakucore
]

proc newTestWakuStore*(switch: Switch, handler: HistoryQueryHandler): Future[WakuStore] {.async.} =
proc newTestWakuStore*(switch: Switch, handler: StoreQueryRequestHandler): Future[WakuStore] {.async.} =
let
peerManager = PeerManager.new(switch)
proto = WakuStore.new(peerManager, rng, handler)
Expand All @@ -30,13 +30,4 @@ proc newTestWakuStore*(switch: Switch, handler: HistoryQueryHandler): Future[Wak

proc newTestWakuStoreClient*(switch: Switch): WakuStoreClient =
let peerManager = PeerManager.new(switch)
WakuStoreClient.new(peerManager, rng)


proc computeHistoryCursor*(pubsubTopic: PubsubTopic, message: WakuMessage): HistoryCursor =
HistoryCursor(
pubsubTopic: pubsubTopic,
senderTime: message.timestamp,
storeTime: message.timestamp,
digest: waku_store.computeDigest(message)
)
WakuStoreClient.new(peerManager, rng)
69 changes: 40 additions & 29 deletions tests/waku_store/test_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ suite "Store Client":
var message1 {.threadvar.}: WakuMessage
var message2 {.threadvar.}: WakuMessage
var message3 {.threadvar.}: WakuMessage
var messageSeq {.threadvar.}: seq[WakuMessage]
var handlerFuture {.threadvar.}: Future[HistoryQuery]
var handler {.threadvar.}: HistoryQueryHandler
var historyQuery {.threadvar.}: HistoryQuery
var hash1 {.threadvar.}: WakuMessageHash
var hash2 {.threadvar.}: WakuMessageHash
var hash3 {.threadvar.}: WakuMessageHash
var messageSeq {.threadvar.}: seq[WakuMessageKeyValue]
var handlerFuture {.threadvar.}: Future[StoreQueryRequest]
var handler {.threadvar.}: StoreQueryRequestHandler
var storeQuery {.threadvar.}: StoreQueryRequest

var serverSwitch {.threadvar.}: Switch
var clientSwitch {.threadvar.}: Switch
Expand All @@ -46,17 +49,25 @@ suite "Store Client":
message1 = fakeWakuMessage(contentTopic=DefaultContentTopic)
message2 = fakeWakuMessage(contentTopic=DefaultContentTopic)
message3 = fakeWakuMessage(contentTopic=DefaultContentTopic)
messageSeq = @[message1, message2, message3]
hash1 = computeMessageHash(DefaultPubsubTopic, message1)
hash2 = computeMessageHash(DefaultPubsubTopic, message2)
hash3 = computeMessageHash(DefaultPubsubTopic, message3)
messageSeq = @[
WakuMessageKeyValue(messageHash: hash1, message: message1),
WakuMessageKeyValue(messageHash: hash2, message: message2),
WakuMessageKeyValue(messageHash: hash3, message: message3),]
handlerFuture = newHistoryFuture()
handler = proc(
req: HistoryQuery
): Future[HistoryResult] {.async, gcsafe.} =
handlerFuture.complete(req)
return ok(HistoryResponse(messages: messageSeq))
historyQuery = HistoryQuery(
req: StoreQueryRequest
): Future[StoreQueryResult] {.async, gcsafe.} =
var request = req
request.requestId = ""
handlerFuture.complete(request)
return ok(StoreQueryResponse(messages: messageSeq))
storeQuery = StoreQueryRequest(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
direction: PagingDirection.FORWARD
paginationForward: PagingDirection.FORWARD
)

serverSwitch = newTestSwitch()
Expand All @@ -73,15 +84,15 @@ suite "Store Client":
asyncTeardown:
await allFutures(serverSwitch.stop(), clientSwitch.stop())

suite "HistoryQuery Creation and Execution":
suite "StoreQueryRequest Creation and Execution":
asyncTest "Valid Queries":
# When a valid query is sent to the server
let queryResponse = await client.query(historyQuery, peer=serverPeerInfo)
let queryResponse = await client.query(storeQuery, peer=serverPeerInfo)

# Then the query is processed successfully
assert await handlerFuture.withTimeout(FUTURE_TIMEOUT)
check:
handlerFuture.read() == historyQuery
handlerFuture.read() == storeQuery
queryResponse.get().messages == messageSeq

asyncTest "Invalid Queries":
Expand All @@ -91,33 +102,33 @@ suite "Store Client":

# Given some invalid queries
let
invalidQuery1 = HistoryQuery(
invalidQuery1 = StoreQueryRequest(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[],
direction: PagingDirection.FORWARD
paginationForward: PagingDirection.FORWARD
)
invalidQuery2 = HistoryQuery(
invalidQuery2 = StoreQueryRequest(
pubsubTopic: PubsubTopic.none(),
contentTopics: @[DefaultContentTopic],
direction: PagingDirection.FORWARD
paginationForward: PagingDirection.FORWARD
)
invalidQuery3 = HistoryQuery(
invalidQuery3 = StoreQueryRequest(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
pageSize: 0
paginationLimit: some(uint64(0))
)
invalidQuery4 = HistoryQuery(
invalidQuery4 = StoreQueryRequest(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
pageSize: 0
paginationLimit: some(uint64(0))
)
invalidQuery5 = HistoryQuery(
invalidQuery5 = StoreQueryRequest(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
startTime: some(0.Timestamp),
endTime: some(0.Timestamp)
)
invalidQuery6 = HistoryQuery(
invalidQuery6 = StoreQueryRequest(
pubsubTopic: some(DefaultPubsubTopic),
contentTopics: @[DefaultContentTopic],
startTime: some(0.Timestamp),
Expand Down Expand Up @@ -183,15 +194,15 @@ suite "Store Client":
handlerFuture.read() == invalidQuery6
queryResponse6.get().messages == messageSeq

suite "Verification of HistoryResponse Payload":
suite "Verification of StoreQueryResponse Payload":
asyncTest "Positive Responses":
# When a valid query is sent to the server
let queryResponse = await client.query(historyQuery, peer=serverPeerInfo)
let queryResponse = await client.query(storeQuery, peer=serverPeerInfo)

# Then the query is processed successfully, and is of the expected type
check:
await handlerFuture.withTimeout(FUTURE_TIMEOUT)
type(queryResponse.get()) is HistoryResponse
type(queryResponse.get()) is StoreQueryResponse

asyncTest "Negative Responses - PeerDialFailure":
# Given a stopped peer
Expand All @@ -200,10 +211,10 @@ suite "Store Client":
otherServerPeerInfo = otherServerSwitch.peerInfo.toRemotePeerInfo()

# When a query is sent to the stopped peer
let queryResponse = await client.query(historyQuery, peer=otherServerPeerInfo)
let queryResponse = await client.query(storeQuery, peer=otherServerPeerInfo)

# Then the query is not processed
check:
not await handlerFuture.withTimeout(FUTURE_TIMEOUT)
queryResponse.isErr()
queryResponse.error.kind == HistoryErrorKind.PEER_DIAL_FAILURE
queryResponse.error.kind == ErrorCode.PEER_DIAL_FAILURE
Loading

0 comments on commit 75177d9

Please sign in to comment.