From a2789bd599cee891c973fabaa1d39f9a03c2d944 Mon Sep 17 00:00:00 2001 From: cheatfate Date: Wed, 21 Jun 2023 00:25:08 +0300 Subject: [PATCH] Address review comments. --- beacon_chain/validator_client/api.nim | 201 +++++++++++++------------- 1 file changed, 98 insertions(+), 103 deletions(-) diff --git a/beacon_chain/validator_client/api.nim b/beacon_chain/validator_client/api.nim index 7749df3c63..2216481eec 100644 --- a/beacon_chain/validator_client/api.nim +++ b/beacon_chain/validator_client/api.nim @@ -7,9 +7,9 @@ import std/strutils import chronicles, stew/base10 -import ../spec/eth2_apis/eth2_rest_serialization, - ../spec/datatypes/[phase0, altair] -import common, fallback_service, scoring +import ".."/spec/eth2_apis/eth2_rest_serialization, + ".."/spec/datatypes/[phase0, altair] +import "."/[common, fallback_service, scoring] export eth2_rest_serialization, common @@ -40,6 +40,11 @@ type index*: int score*: Opt[float64] + BestNodeResponse*[T] = object + node*: BeaconNodeServerRef + data*: ApiResponse[T] + score*: float64 + const ViableNodeStatus = {RestBeaconNodeStatus.Compatible, RestBeaconNodeStatus.NotSynced, @@ -61,14 +66,16 @@ proc `$`*(ss: openArray[ApiScore]): string = chronicles.formatIt(seq[ApiScore]): $it -func shortLog(oas: Opt[float64]): string = - if oas.isSome(): formatFloat(oas.get(), ffDecimal, 4) else: "" +func init*(t: typedesc[ApiScore], node: BeaconNodeServerRef, + score: float64): ApiScore = + ApiScore(index: node.index, score: Opt.some(score)) -func init*(t: typedesc[ApiScore], index: int, score: float64): ApiScore = - ApiScore(index: index, score: Opt.some(score)) +func init*(t: typedesc[ApiScore], node: BeaconNodeServerRef): ApiScore = + ApiScore(index: node.index, score: Opt.none(float64)) -func init*(t: typedesc[ApiScore], index: int): ApiScore = - ApiScore(index: index, score: Opt.none(float64)) +func init*[T](t: typedesc[BestNodeResponse], node: BeaconNodeServerRef, + data: ApiResponse[T], score: float64): BestNodeResponse[T] = + BestNodeResponse[T](node: node, data: data, score: score) proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase, requestName: string, strategy: ApiStrategyKind) {.async.} = @@ -285,62 +292,52 @@ template bestSuccess*( var retRes: ApiResponse[handlerType] scores: seq[ApiScore] - bestScore: Opt[float64] - bestNode: Opt[BeaconNodeServerRef] + bestResponse: Opt[BestNodeResponse[handlerType]] - while true: - var resultReady = false - let onlineNodes = - try: - if iterations == 0: - # We are not going to wait for BNs if there some available. - await vc.waitNodes(timerFut, statuses, roles, false) - else: - # We get here only, if all the requests are failed. To avoid requests - # spam we going to wait for changes in BNs statuses. - await vc.waitNodes(timerFut, statuses, roles, true) - vc.filterNodes(statuses, roles) - except CancelledError as exc: - if not(isNil(timerFut)) and not(timerFut.finished()): - await timerFut.cancelAndWait() - raise exc - except CatchableError as exc: - # This case could not be happened. - error "Unexpected exception while waiting for beacon nodes", - err_name = $exc.name, err_msg = $exc.msg - var default: seq[BeaconNodeServerRef] - default - - if len(onlineNodes) == 0: - retRes = ApiResponse[handlerType].err("No online beacon node(s)") - resultReady = true - else: - var - (pendingRequests, pendingNodes) = - block: - var requests: seq[FutureBase] - var nodes: seq[BeaconNodeServerRef] - for node {.inject.} in onlineNodes: - it = node.client - let fut = FutureBase(bodyRequest) - requests.add(fut) - nodes.add(node) - (requests, nodes) - bestResponse: ApiResponse[handlerType] - allFut: Future[void] - - try: - if len(pendingRequests) == 0: + block mainLoop: + while true: + let onlineNodes = + try: + if iterations == 0: + # We are not going to wait for BNs if there some available. + await vc.waitNodes(timerFut, statuses, roles, false) + else: + # We get here only, if all the requests are failed. To avoid requests + # spam we going to wait for changes in BNs statuses. + await vc.waitNodes(timerFut, statuses, roles, true) + vc.filterNodes(statuses, roles) + except CancelledError as exc: if not(isNil(timerFut)) and not(timerFut.finished()): await timerFut.cancelAndWait() - retRes = ApiResponse[handlerType].err( - "Beacon node(s) unable to satisfy request") - resultReady = true - break - else: + raise exc + except CatchableError as exc: + # This case could not be happened. + error "Unexpected exception while waiting for beacon nodes", + err_name = $exc.name, err_msg = $exc.msg + var default: seq[BeaconNodeServerRef] + default + + if len(onlineNodes) == 0: + retRes = ApiResponse[handlerType].err("No online beacon node(s)") + break mainLoop + else: + let + (pendingRequests, pendingNodes) = + block: + var requests: seq[FutureBase] + var nodes: seq[BeaconNodeServerRef] + for node {.inject.} in onlineNodes: + it = node.client + let fut = FutureBase(bodyRequest) + requests.add(fut) + nodes.add(node) + (requests, nodes) + + var allFut: Future[void] + try: allFut = allFutures(pendingRequests) - if isNil(timerFut): + if not(isNil(timerFut)): await allFut or timerFut else: await allFut @@ -350,6 +347,7 @@ template bestSuccess*( node {.inject.} = pendingNodes[index] apiResponse {.inject.} = if future.finished(): + doAssert(not(future.cancelled())) if future.failed(): ApiResponse[responseType].err($future.error.msg) else: @@ -378,65 +376,62 @@ template bestSuccess*( except CatchableError: raiseAssert("Score handler must not raise exceptions") - scores.add(ApiScore.init(index, score)) + scores.add(ApiScore.init(node, score)) - if bestScore.isNone(): - bestScore = Opt.some(score) - bestNode = Opt.some(node) - bestResponse = handlerResponse - else: - if score > bestScore.get(): - bestScore = Opt.some(score) - bestNode = Opt.some(node) - bestResponse = handlerResponse + if bestResponse.isNone() or (score > bestResponse.get().score): + bestResponse = Opt.some( + BestNodeResponse.init(node, handlerResponse, score)) else: - scores.add(ApiScore.init(index)) + scores.add(ApiScore.init(node)) if timerFut.finished(): - # If timeout is exceeded we need to cancel all the tasks which are - # still running. + # If timeout is exceeded we need to cancel all the tasks which + # are still running. var pendingCancel: seq[Future[void]] for future in pendingRequests.items(): if not(future.finished()): pendingCancel.add(future.cancelAndWait()) await allFutures(pendingCancel) - retRes = ApiResponse[handlerType].err( - "Timeout exceeded while awaiting for responses") - resultReady = true - # When all requests failed, bestScore will not be set. - if bestScore.isSome(): - retRes = bestResponse - resultReady = true - - except CancelledError as exc: - var pendingCancel: seq[Future[void]] - if not(isNil(allFut)) and not(allFut.finished()): - pendingCancel.add(allFut.cancelAndWait()) - if not(isNil(timerFut)) and not(timerFut.finished()): - pendingCancel.add(timerFut.cancelAndWait()) - for future in pendingRequests.items(): - if not(future.finished()): - pendingCancel.add(future.cancelAndWait()) - await allFutures(pendingCancel) - raise exc - except CatchableError as exc: - # This should not be happened, because allFutures() and race() did not - # raise any exceptions. - error "Unexpected exception while processing request", - err_name = $exc.name, err_msg = $exc.msg - retRes = ApiResponse[handlerType].err("Unexpected error") - resultReady = true + # When all requests failed, bestResponse will not be set. + if bestResponse.isSome(): + retRes = bestResponse.get().data + break mainLoop + else: + if timerFut.finished(): + retRes = ApiResponse[handlerType].err( + "Timeout exceeded while awaiting for responses") + break mainLoop - if resultReady: - break + except CancelledError as exc: + var pendingCancel: seq[Future[void]] + # `or` operation does not cancelling Futures passed as arguments. + if not(isNil(allFut)) and not(allFut.finished()): + pendingCancel.add(allFut.cancelAndWait()) + if not(isNil(timerFut)) and not(timerFut.finished()): + pendingCancel.add(timerFut.cancelAndWait()) + # We should cancel all the requests which are still pending. + for future in pendingRequests.items(): + if not(future.finished()): + pendingCancel.add(future.cancelAndWait()) + # Awaiting cancellations. + await allFutures(pendingCancel) + raise exc + except CatchableError as exc: + # This should not be happened, because allFutures() and race() did not + # raise any exceptions. + error "Unexpected exception while processing request", + err_name = $exc.name, err_msg = $exc.msg + retRes = ApiResponse[handlerType].err("Unexpected error") + break mainLoop - inc(iterations) + inc(iterations) if retRes.isOk(): debug "Best score result selected", request = RequestName, available_scores = scores, - best_score = shortLog(bestScore), best_node = bestNode.get() + best_score = formatFloat(bestResponse.get().score, ffDecimal, 4), + best_node = bestResponse.get().node retRes