Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rln-relay): remove websocket from OnchainGroupManager #2364

Merged
merged 3 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/chat2/config_chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ type
name: "rln-relay-id-commitment-key" }: string

rlnRelayEthClientAddress* {.
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/",
defaultValue: "ws://localhost:8540/"
desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "http://localhost:8540/"
name: "rln-relay-eth-client-address" }: string

rlnRelayEthContractAddress* {.
Expand Down
4 changes: 2 additions & 2 deletions apps/wakunode2/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type
name: "rln-relay-cred-path" }: string

rlnRelayEthClientAddress* {.
desc: "WebSocket address of an Ethereum testnet client e.g., ws://localhost:8540/",
defaultValue: "ws://localhost:8540/",
desc: "WebSocket address of an Ethereum testnet client e.g., http://localhost:8540/",
defaultValue: "http://localhost:8540/",
name: "rln-relay-eth-client-address" }: string

rlnRelayEthContractAddress* {.
Expand Down
19 changes: 9 additions & 10 deletions tests/waku_rln_relay/test_rln_group_manager_onchain.nim
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ suite "Onchain group manager":

try:
await manager.startGroupSync()
except ValueError:
except CatchableError:
assert true
except Exception, CatchableError:
except Exception:
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()

await manager.stop()
Expand Down Expand Up @@ -330,9 +330,9 @@ suite "Onchain group manager":

try:
await manager.register(dummyCommitment)
except ValueError:
except CatchableError:
assert true
except Exception, CatchableError:
except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg()

await manager.stop()
Expand Down Expand Up @@ -399,9 +399,9 @@ suite "Onchain group manager":

try:
await manager.withdraw(idSecretHash)
except ValueError:
except CatchableError:
assert true
except Exception, CatchableError:
except Exception:
assert false, "exception raised: " & getCurrentExceptionMsg()

await manager.stop()
Expand Down Expand Up @@ -627,7 +627,7 @@ suite "Onchain group manager":
await manager.stop()

asyncTest "isReady should return false if ethRpc is none":
var manager = await setup()
let manager = await setup()
await manager.init()

manager.ethRpc = none(Web3)
Expand All @@ -644,7 +644,7 @@ suite "Onchain group manager":
await manager.stop()

asyncTest "isReady should return false if lastSeenBlockHead > lastProcessed":
var manager = await setup()
let manager = await setup()
await manager.init()

var isReady = true
Expand All @@ -659,14 +659,13 @@ suite "Onchain group manager":
await manager.stop()

asyncTest "isReady should return true if ethRpc is ready":
var manager = await setup()
let manager = await setup()
await manager.init()
# node can only be ready after group sync is done
try:
await manager.startGroupSync()
except Exception, CatchableError:
assert false, "exception raised when calling startGroupSync: " & getCurrentExceptionMsg()

var isReady = false
try:
isReady = await manager.isReady()
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_rln_relay/constants.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const
MembershipFee* = 1000000000000000.u256
# the current implementation of the rln lib supports a circuit for Merkle tree with depth 20
MerkleTreeDepth* = 20
EthClient* = "ws://127.0.0.1:8540"
EthClient* = "http://127.0.0.1:8540"
rymnc marked this conversation as resolved.
Show resolved Hide resolved

const
# the size of poseidon hash output in bits
Expand Down
82 changes: 55 additions & 27 deletions waku/waku_rln_relay/group_manager/on_chain/group_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ type
# in event of a reorg. we store 5 in the buffer. Maybe need to revisit this,
# because the average reorg depth is 1 to 2 blocks.
validRootBuffer*: Deque[MerkleNode]
# interval loop to shut down gracefully
blockFetchingActive*: bool

const DefaultKeyStorePath* = "rlnKeystore.json"
const DefaultKeyStorePassword* = "password"

const DefaultBlockPollRate* = 6.seconds

template initializedGuard(g: OnchainGroupManager): untyped =
if not g.initialized:
raise newException(ValueError, "OnchainGroupManager is not initialized")
raise newException(CatchableError, "OnchainGroupManager is not initialized")


proc setMetadata*(g: OnchainGroupManager): RlnRelayResult[void] =
Expand Down Expand Up @@ -316,12 +320,15 @@ proc handleRemovedEvents(g: OnchainGroupManager, blockTable: BlockTable):

proc getAndHandleEvents(g: OnchainGroupManager,
fromBlock: BlockNumber,
toBlock: BlockNumber): Future[void] {.async: (raises: [Exception]).} =
toBlock: BlockNumber): Future[bool] {.async: (raises: [Exception]).} =
Comment on lines 321 to +323
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this proc either returns true or raises an exception.

Am I missing something or is it supposed to be like that? If so, what's the point of returning bool if the return value is always true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's for compatibility with the retryWrapper template, it expects a return value to assign the result of an async operation, just set to true here since we don't use it (discarded)

initializedGuard(g)

let blockTable = await g.getBlockTable(fromBlock, toBlock)
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)
try:
await g.handleEvents(blockTable)
await g.handleRemovedEvents(blockTable)
except CatchableError:
error "failed to handle events", error=getCurrentExceptionMsg()
raise newException(ValueError, "failed to handle events")

g.latestProcessedBlock = toBlock
let metadataSetRes = g.setMetadata()
Expand All @@ -330,32 +337,49 @@ proc getAndHandleEvents(g: OnchainGroupManager,
warn "failed to persist rln metadata", error=metadataSetRes.error()
else:
trace "rln metadata persisted", blockNumber = g.latestProcessedBlock

return true

proc runInInterval(g: OnchainGroupManager, cb: proc, interval: Duration): void =
g.blockFetchingActive = false

proc getNewHeadCallback(g: OnchainGroupManager): BlockHeaderHandler =
proc newHeadCallback(blockheader: BlockHeader) {.gcsafe.} =
let latestBlock = BlockNumber(blockheader.number)
trace "block received", blockNumber = latestBlock
# get logs from the last block
try:
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
asyncSpawn g.getAndHandleEvents(fromBlock, latestBlock)
except CatchableError:
warn "failed to handle log: ", error=getCurrentExceptionMsg()
return newHeadCallback

proc newHeadErrCallback(error: CatchableError) =
warn "failed to get new head", error=error.msg
proc runIntervalLoop() {.async, gcsafe.} =
g.blockFetchingActive = true

while g.blockFetchingActive:
var retCb: bool
retryWrapper(retCb, RetryStrategy.new(), "Failed to run the interval loop"):
await cb()
await sleepAsync(interval)

asyncSpawn runIntervalLoop()


proc getNewBlockCallback(g: OnchainGroupManager): proc =
let ethRpc = g.ethRpc.get()
proc wrappedCb(): Future[bool] {.async, gcsafe.} =
var latestBlock: BlockNumber
retryWrapper(latestBlock, RetryStrategy.new(), "Failed to get the latest block number"):
cast[BlockNumber](await ethRpc.provider.eth_blockNumber())

if latestBlock <= g.latestProcessedBlock:
return
# get logs from the last block
# inc by 1 to prevent double processing
let fromBlock = g.latestProcessedBlock + 1
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle new block"):
await g.getAndHandleEvents(fromBlock, latestBlock)
return true
return wrappedCb

proc startListeningToEvents(g: OnchainGroupManager):
Future[void] {.async: (raises: [Exception]).} =
initializedGuard(g)

let ethRpc = g.ethRpc.get()
let newHeadCallback = g.getNewHeadCallback()
var blockHeaderSub: Subscription
retryWrapper(blockHeaderSub, RetryStrategy.new(), "Failed to subscribe to block headers"):
await ethRpc.subscribeForBlockHeaders(newHeadCallback, newHeadErrCallback)
let newBlockCallback = g.getNewBlockCallback()
g.runInInterval(newBlockCallback, DefaultBlockPollRate)

proc startOnchainSync(g: OnchainGroupManager):
Future[void] {.async: (raises: [Exception]).} =
Expand Down Expand Up @@ -385,7 +409,9 @@ proc startOnchainSync(g: OnchainGroupManager):

let toBlock = min(fromBlock + BlockNumber(blockChunkSize), currentLatestBlock)
debug "fetching events", fromBlock = fromBlock, toBlock = toBlock
await g.getAndHandleEvents(fromBlock, toBlock)
var handleBlockRes: bool
retryWrapper(handleBlockRes, RetryStrategy.new(), "Failed to handle old blocks"):
await g.getAndHandleEvents(fromBlock, toBlock)
fromBlock = toBlock + 1

except CatchableError:
Expand Down Expand Up @@ -523,13 +549,15 @@ method init*(g: OnchainGroupManager): Future[void] {.async.} =
error "failed to restart group sync", error = getCurrentExceptionMsg()

ethRpc.ondisconnect = proc() =
asyncCheck onDisconnect()
asyncSpawn onDisconnect()


waku_rln_number_registered_memberships.set(int64(g.rlnInstance.leavesSet()))
g.initialized = true

method stop*(g: OnchainGroupManager): Future[void] {.async.} =
method stop*(g: OnchainGroupManager): Future[void] {.async,gcsafe.} =
g.blockFetchingActive = false

if g.ethRpc.isSome():
g.ethRpc.get().ondisconnect = nil
await g.ethRpc.get().close()
Expand Down
5 changes: 4 additions & 1 deletion waku/waku_rln_relay/group_manager/on_chain/retry_wrapper.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ template retryWrapper*(res: auto,
body: untyped): auto =
var retryCount = retryStrategy.retryCount
var shouldRetry = retryStrategy.shouldRetry
var exceptionMessage = ""

while shouldRetry and retryCount > 0:
try:
res = body
shouldRetry = false
except:
retryCount -= 1
exceptionMessage = getCurrentExceptionMsg()
await sleepAsync(retryStrategy.retryDelay)
if shouldRetry:
raise newException(CatchableError, errStr & ": " & $getCurrentExceptionMsg())
raise newException(CatchableError, errStr & ": " & exceptionMessage)
Loading