Skip to content

Commit

Permalink
fix(rln-relay): graceful shutdown with non-zero exit code
Browse files Browse the repository at this point in the history
  • Loading branch information
rymnc committed Feb 14, 2024
1 parent d037705 commit 2c176d7
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 66 deletions.
17 changes: 9 additions & 8 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,13 @@ proc setupProtocols(node: WakuNode,
except CatchableError:
return err("failed to mount libp2p ping protocol: " & getCurrentExceptionMsg())

if conf.rlnRelay:
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

if conf.rlnRelay:
when defined(rln_v2):
let rlnConf = WakuRlnConfig(
rlnRelayDynamic: conf.rlnRelayDynamic,
Expand All @@ -472,6 +477,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
rlnRelayUserMessageLimit: conf.rlnRelayUserMessageLimit,
onFatalErrorAction: onFatalErrorAction,
)
else:
let rlnConf = WakuRlnConfig(
Expand All @@ -482,6 +488,7 @@ proc setupProtocols(node: WakuNode,
rlnRelayCredPath: conf.rlnRelayCredPath,
rlnRelayCredPassword: conf.rlnRelayCredPassword,
rlnRelayTreePath: conf.rlnRelayTreePath,
onFatalErrorAction: onFatalErrorAction,
)

try:
Expand All @@ -490,18 +497,12 @@ proc setupProtocols(node: WakuNode,
return err("failed to mount waku RLN relay protocol: " & getCurrentExceptionMsg())

if conf.store:
var onErrAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
error "Unrecoverable error occurred", error = msg
quit(QuitFailure)

# Archive setup
let archiveDriverRes = ArchiveDriver.new(conf.storeMessageDbUrl,
conf.storeMessageDbVacuum,
conf.storeMessageDbMigration,
conf.storeMaxNumDbConnections,
onErrAction)
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ proc configureStore(node: WakuNode,
Future[Result[void, string]] {.async.} =
## This snippet is extracted/duplicated from the app.nim file

var onErrAction = proc(msg: string) {.gcsafe, closure.} =
var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} =
## Action to be taken when an internal error occurs during the node run.
## e.g. the connection with the database is lost and not recovered.
# error "Unrecoverable error occurred", error = msg
Expand All @@ -74,7 +74,7 @@ proc configureStore(node: WakuNode,
storeVacuum,
storeDbMigration,
storeMaxNumDbConnections,
onErrAction)
onFatalErrorAction)
if archiveDriverRes.isErr():
return err("failed to setup archive driver: " & archiveDriverRes.error)

Expand Down
2 changes: 2 additions & 0 deletions waku/common/error_handling.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
type
OnFatalErrorHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}
2 changes: 1 addition & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ proc mountRlnRelay*(node: WakuNode,
raise newException(CatchableError, "WakuRelay protocol is not mounted, cannot mount WakuRlnRelay")

let rlnRelayRes = waitFor WakuRlnRelay.new(rlnConf,
registrationHandler)
registrationHandler)
if rlnRelayRes.isErr():
raise newException(CatchableError, "failed to mount WakuRlnRelay: " & rlnRelayRes.error)
let rlnRelay = rlnRelayRes.get()
Expand Down
2 changes: 1 addition & 1 deletion waku/waku_archive/driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import
chronos
import
../waku_core,
../common/error_handling,
./common

const DefaultPageSize*: uint = 25

type
ArchiveDriverResult*[T] = Result[T, string]
ArchiveDriver* = ref object of RootObj
OnErrHandler* = proc(errMsg: string) {.gcsafe, closure, raises: [].}

type ArchiveRow* = (PubsubTopic, WakuMessage, seq[byte], Timestamp)

Expand Down
7 changes: 4 additions & 3 deletions waku/waku_archive/driver/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import
../driver,
../../common/databases/dburl,
../../common/databases/db_sqlite,
../../common/error_handling,
./sqlite_driver,
./sqlite_driver/migrations as archive_driver_sqlite_migrations,
./queue_driver
Expand All @@ -29,13 +30,13 @@ proc new*(T: type ArchiveDriver,
vacuum: bool,
migrate: bool,
maxNumConn: int,
onErrAction: OnErrHandler):
onFatalErrorAction: OnFatalErrorHandler):
Result[T, string] =
## url - string that defines the database
## vacuum - if true, a cleanup operation will be applied to the database
## migrate - if true, the database schema will be updated
## maxNumConn - defines the maximum number of connections to handle simultaneously (Postgres)
## onErrAction - called if, e.g., the connection with db got lost
## onFatalErrorAction - called if, e.g., the connection with db got lost

let dbUrlValidationRes = dburl.validateDbUrl(url)
if dbUrlValidationRes.isErr():
Expand Down Expand Up @@ -85,7 +86,7 @@ proc new*(T: type ArchiveDriver,
when defined(postgres):
let res = PostgresDriver.new(dbUrl = url,
maxConnections = maxNumConn,
onErrAction = onErrAction)
onFatalErrorAction = onFatalErrorAction)
if res.isErr():
return err("failed to init postgres archive driver: " & res.error)

Expand Down
11 changes: 6 additions & 5 deletions waku/waku_archive/driver/postgres_driver/postgres_driver.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import
chronos,
chronicles
import
../../../common/error_handling,
../../../waku_core,
../../common,
../../driver,
Expand Down Expand Up @@ -89,7 +90,7 @@ const DefaultMaxNumConns = 50
proc new*(T: type PostgresDriver,
dbUrl: string,
maxConnections = DefaultMaxNumConns,
onErrAction: OnErrHandler = nil):
onFatalErrorAction: OnFatalErrorHandler = nil):
ArchiveDriverResult[T] =

## Very simplistic split of max connections
Expand All @@ -101,11 +102,11 @@ proc new*(T: type PostgresDriver,
let writeConnPool = PgAsyncPool.new(dbUrl, maxNumConnOnEachPool).valueOr:
return err("error creating write conn pool PgAsyncPool")

if not isNil(onErrAction):
asyncSpawn checkConnectivity(readConnPool, onErrAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(readConnPool, onFatalErrorAction)

if not isNil(onErrAction):
asyncSpawn checkConnectivity(writeConnPool, onErrAction)
if not isNil(onFatalErrorAction):
asyncSpawn checkConnectivity(writeConnPool, onFatalErrorAction)

return ok(PostgresDriver(writeConnPool: writeConnPool,
readConnPool: readConnPool))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import
stew/results
import
../../driver,
../../../common/databases/db_postgres
../../../common/databases/db_postgres,
../../../common/error_handling

## Simple query to validate that the postgres is working and attending requests
const HealthCheckQuery = "SELECT version();"
Expand All @@ -17,7 +18,7 @@ const MaxNumTrials = 20
const TrialInterval = 1.seconds

proc checkConnectivity*(connPool: PgAsyncPool,
onErrAction: OnErrHandler) {.async.} =
onFatalErrorAction: OnFatalErrorHandler) {.async.} =

while true:

Expand All @@ -29,7 +30,7 @@ proc checkConnectivity*(connPool: PgAsyncPool,
block errorBlock:
## Force close all the opened connections. No need to close gracefully.
(await connPool.resetConnPool()).isOkOr:
onErrAction("checkConnectivity resetConnPool error: " & error)
onFatalErrorAction("checkConnectivity resetConnPool error: " & error)

var numTrial = 0
while numTrial < MaxNumTrials:
Expand All @@ -42,6 +43,6 @@ proc checkConnectivity*(connPool: PgAsyncPool,
numTrial.inc()

## The connection couldn't be resumed. Let's inform the upper layers.
onErrAction("postgres health check error: " & error)
onFatalErrorAction("postgres health check error: " & error)

await sleepAsync(CheckConnectivityInterval)
2 changes: 2 additions & 0 deletions waku/waku_rln_relay/group_manager/group_manager_base.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import
../../common/error_handling,
../protocol_types,
../protocol_metrics,
../constants,
Expand Down Expand Up @@ -44,6 +45,7 @@ type
initialized*: bool
latestIndex*: MembershipIndex
validRoots*: Deque[MerkleNode]
onFatalErrorAction*: OnFatalErrorHandler
when defined(rln_v2):
userMessageLimit*: Option[UserMessageLimit]

Expand Down
Loading

0 comments on commit 2c176d7

Please sign in to comment.