Skip to content

Commit

Permalink
Address review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
cheatfate committed Jun 20, 2023
1 parent f9f5f80 commit a2789bd
Showing 1 changed file with 98 additions and 103 deletions.
201 changes: 98 additions & 103 deletions beacon_chain/validator_client/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

import std/strutils
import chronicles, stew/base10
import ../spec/eth2_apis/eth2_rest_serialization,
../spec/datatypes/[phase0, altair]
import common, fallback_service, scoring
import ".."/spec/eth2_apis/eth2_rest_serialization,
".."/spec/datatypes/[phase0, altair]
import "."/[common, fallback_service, scoring]

export eth2_rest_serialization, common

Expand Down Expand Up @@ -40,6 +40,11 @@ type
index*: int
score*: Opt[float64]

BestNodeResponse*[T] = object
node*: BeaconNodeServerRef
data*: ApiResponse[T]
score*: float64

const
ViableNodeStatus = {RestBeaconNodeStatus.Compatible,
RestBeaconNodeStatus.NotSynced,
Expand All @@ -61,14 +66,16 @@ proc `$`*(ss: openArray[ApiScore]): string =
chronicles.formatIt(seq[ApiScore]):
$it

func shortLog(oas: Opt[float64]): string =
if oas.isSome(): formatFloat(oas.get(), ffDecimal, 4) else: "<n/a>"
func init*(t: typedesc[ApiScore], node: BeaconNodeServerRef,
score: float64): ApiScore =
ApiScore(index: node.index, score: Opt.some(score))

func init*(t: typedesc[ApiScore], index: int, score: float64): ApiScore =
ApiScore(index: index, score: Opt.some(score))
func init*(t: typedesc[ApiScore], node: BeaconNodeServerRef): ApiScore =
ApiScore(index: node.index, score: Opt.none(float64))

func init*(t: typedesc[ApiScore], index: int): ApiScore =
ApiScore(index: index, score: Opt.none(float64))
func init*[T](t: typedesc[BestNodeResponse], node: BeaconNodeServerRef,
data: ApiResponse[T], score: float64): BestNodeResponse[T] =
BestNodeResponse[T](node: node, data: data, score: score)

proc lazyWaiter(node: BeaconNodeServerRef, request: FutureBase,
requestName: string, strategy: ApiStrategyKind) {.async.} =
Expand Down Expand Up @@ -285,62 +292,52 @@ template bestSuccess*(
var
retRes: ApiResponse[handlerType]
scores: seq[ApiScore]
bestScore: Opt[float64]
bestNode: Opt[BeaconNodeServerRef]
bestResponse: Opt[BestNodeResponse[handlerType]]

while true:
var resultReady = false
let onlineNodes =
try:
if iterations == 0:
# We are not going to wait for BNs if there some available.
await vc.waitNodes(timerFut, statuses, roles, false)
else:
# We get here only, if all the requests are failed. To avoid requests
# spam we going to wait for changes in BNs statuses.
await vc.waitNodes(timerFut, statuses, roles, true)
vc.filterNodes(statuses, roles)
except CancelledError as exc:
if not(isNil(timerFut)) and not(timerFut.finished()):
await timerFut.cancelAndWait()
raise exc
except CatchableError as exc:
# This case could not be happened.
error "Unexpected exception while waiting for beacon nodes",
err_name = $exc.name, err_msg = $exc.msg
var default: seq[BeaconNodeServerRef]
default

if len(onlineNodes) == 0:
retRes = ApiResponse[handlerType].err("No online beacon node(s)")
resultReady = true
else:
var
(pendingRequests, pendingNodes) =
block:
var requests: seq[FutureBase]
var nodes: seq[BeaconNodeServerRef]
for node {.inject.} in onlineNodes:
it = node.client
let fut = FutureBase(bodyRequest)
requests.add(fut)
nodes.add(node)
(requests, nodes)
bestResponse: ApiResponse[handlerType]
allFut: Future[void]

try:
if len(pendingRequests) == 0:
block mainLoop:
while true:
let onlineNodes =
try:
if iterations == 0:
# We are not going to wait for BNs if there some available.
await vc.waitNodes(timerFut, statuses, roles, false)
else:
# We get here only, if all the requests are failed. To avoid requests
# spam we going to wait for changes in BNs statuses.
await vc.waitNodes(timerFut, statuses, roles, true)
vc.filterNodes(statuses, roles)
except CancelledError as exc:
if not(isNil(timerFut)) and not(timerFut.finished()):
await timerFut.cancelAndWait()
retRes = ApiResponse[handlerType].err(
"Beacon node(s) unable to satisfy request")
resultReady = true
break
else:
raise exc
except CatchableError as exc:
# This case could not be happened.
error "Unexpected exception while waiting for beacon nodes",
err_name = $exc.name, err_msg = $exc.msg
var default: seq[BeaconNodeServerRef]
default

if len(onlineNodes) == 0:
retRes = ApiResponse[handlerType].err("No online beacon node(s)")
break mainLoop
else:
let
(pendingRequests, pendingNodes) =
block:
var requests: seq[FutureBase]
var nodes: seq[BeaconNodeServerRef]
for node {.inject.} in onlineNodes:
it = node.client
let fut = FutureBase(bodyRequest)
requests.add(fut)
nodes.add(node)
(requests, nodes)

var allFut: Future[void]
try:
allFut = allFutures(pendingRequests)

if isNil(timerFut):
if not(isNil(timerFut)):
await allFut or timerFut
else:
await allFut
Expand All @@ -350,6 +347,7 @@ template bestSuccess*(
node {.inject.} = pendingNodes[index]
apiResponse {.inject.} =
if future.finished():
doAssert(not(future.cancelled()))
if future.failed():
ApiResponse[responseType].err($future.error.msg)
else:
Expand Down Expand Up @@ -378,65 +376,62 @@ template bestSuccess*(
except CatchableError:
raiseAssert("Score handler must not raise exceptions")

scores.add(ApiScore.init(index, score))
scores.add(ApiScore.init(node, score))

if bestScore.isNone():
bestScore = Opt.some(score)
bestNode = Opt.some(node)
bestResponse = handlerResponse
else:
if score > bestScore.get():
bestScore = Opt.some(score)
bestNode = Opt.some(node)
bestResponse = handlerResponse
if bestResponse.isNone() or (score > bestResponse.get().score):
bestResponse = Opt.some(
BestNodeResponse.init(node, handlerResponse, score))
else:
scores.add(ApiScore.init(index))
scores.add(ApiScore.init(node))

if timerFut.finished():
# If timeout is exceeded we need to cancel all the tasks which are
# still running.
# If timeout is exceeded we need to cancel all the tasks which
# are still running.
var pendingCancel: seq[Future[void]]
for future in pendingRequests.items():
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
retRes = ApiResponse[handlerType].err(
"Timeout exceeded while awaiting for responses")
resultReady = true

# When all requests failed, bestScore will not be set.
if bestScore.isSome():
retRes = bestResponse
resultReady = true

except CancelledError as exc:
var pendingCancel: seq[Future[void]]
if not(isNil(allFut)) and not(allFut.finished()):
pendingCancel.add(allFut.cancelAndWait())
if not(isNil(timerFut)) and not(timerFut.finished()):
pendingCancel.add(timerFut.cancelAndWait())
for future in pendingRequests.items():
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
await allFutures(pendingCancel)
raise exc
except CatchableError as exc:
# This should not be happened, because allFutures() and race() did not
# raise any exceptions.
error "Unexpected exception while processing request",
err_name = $exc.name, err_msg = $exc.msg
retRes = ApiResponse[handlerType].err("Unexpected error")
resultReady = true
# When all requests failed, bestResponse will not be set.
if bestResponse.isSome():
retRes = bestResponse.get().data
break mainLoop
else:
if timerFut.finished():
retRes = ApiResponse[handlerType].err(
"Timeout exceeded while awaiting for responses")
break mainLoop

if resultReady:
break
except CancelledError as exc:
var pendingCancel: seq[Future[void]]
# `or` operation does not cancelling Futures passed as arguments.
if not(isNil(allFut)) and not(allFut.finished()):
pendingCancel.add(allFut.cancelAndWait())
if not(isNil(timerFut)) and not(timerFut.finished()):
pendingCancel.add(timerFut.cancelAndWait())
# We should cancel all the requests which are still pending.
for future in pendingRequests.items():
if not(future.finished()):
pendingCancel.add(future.cancelAndWait())
# Awaiting cancellations.
await allFutures(pendingCancel)
raise exc
except CatchableError as exc:
# This should not be happened, because allFutures() and race() did not
# raise any exceptions.
error "Unexpected exception while processing request",
err_name = $exc.name, err_msg = $exc.msg
retRes = ApiResponse[handlerType].err("Unexpected error")
break mainLoop

inc(iterations)
inc(iterations)

if retRes.isOk():
debug "Best score result selected",
request = RequestName, available_scores = scores,
best_score = shortLog(bestScore), best_node = bestNode.get()
best_score = formatFloat(bestResponse.get().score, ffDecimal, 4),
best_node = bestResponse.get().node

retRes

Expand Down

0 comments on commit a2789bd

Please sign in to comment.