Skip to content

Commit

Permalink
Update flare header download mechanism (#2607)
Browse files Browse the repository at this point in the history
* Reverse order in staged blob lists

why:
  having the largest block number with the least header list index `0`
  makes it easier to grow the list with parent headers, i.e. decreasing
  block numbers.

* Set a header response threshold when to ditch peer

* Refactor extension of staged header chains record

why:
  Was cobbled together as a proof of concept after several approaches of
  how to run the download.

* TODO update

* Make debugging code independent of `release` flag

* Update import from jacek
  • Loading branch information
mjfh authored Sep 10, 2024
1 parent 38c58c4 commit 1ced684
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 140 deletions.
5 changes: 4 additions & 1 deletion nimbus/sync/flare/TODO.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Update/resolve code fragments which are tagged FIXME

* Revisit timeouts when fetching header data from the network
* Check noisy and verification sections whether they are really wanted
when going into production
+ **extraTraceMessages**
+ **verifyDataStructureOk**
3 changes: 2 additions & 1 deletion nimbus/sync/flare/worker.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.


{.push raises:[].}

import
Expand Down Expand Up @@ -161,7 +162,7 @@ proc runMulti*(buddy: FlareBuddyRef) {.async.} =

else:
when extraTraceMessages:
debug info & ": failed, done", peer
debug info & ": nothing fetched, done", peer

# ------------------------------------------------------------------------------
# End
Expand Down
24 changes: 17 additions & 7 deletions nimbus/sync/flare/worker/db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -221,21 +221,31 @@ proc dbInitEra1*(ctx: FlareCtxRef): bool =
proc dbStashHeaders*(
ctx: FlareCtxRef;
first: BlockNumber;
rlpBlobs: openArray[Blob];
revBlobs: openArray[Blob];
) =
## Temporarily store header chain to persistent db (oblivious of the chain
## layout.) Note that headres should not be stashed if they are available
## on the `Era1` repo, i.e. if the corresponding block number is at most
## layout.) The headers should not be stashed if they are available on the
## `Era1` repo, i.e. if the corresponding block number is at most
## `ctx.pool.e1AvailMax`.
##
## The `revBlobs[]` arguments are passed in reverse order so that block
## numbers apply as
## ::
## #first -- revBlobs[^1]
## #(first+1) -- revBlobs[^2]
## ..
##
const info = "dbStashHeaders"
let kvt = ctx.db.ctx.getKvt()
for n,data in rlpBlobs:
let key = flareHeaderKey(first + n.uint)
let
kvt = ctx.db.ctx.getKvt()
last = first + revBlobs.len.uint - 1
for n,data in revBlobs:
let key = flareHeaderKey(last - n.uint)
kvt.put(key.toOpenArray, data).isOkOr:
raiseAssert info & ": put() failed: " & $$error
when extraTraceMessages:
trace info & ": headers stashed", first=first.bnStr, nHeaders=rlpBlobs.len
trace info & ": headers stashed",
iv=BnRange.new(first, last), nHeaders=revBlobs.len

proc dbPeekHeader*(ctx: FlareCtxRef; num: BlockNumber): Opt[BlockHeader] =
## Retrieve some stashed header.
Expand Down
43 changes: 22 additions & 21 deletions nimbus/sync/flare/worker/staged.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ const
extraTraceMessages = false # or true
## Enabled additional logging noise

verifyStagedQueueOk = not defined(release) or true
verifyDataStructureOk = false or true
## Debugging mode

# ------------------------------------------------------------------------------
# Private debugging helpers
# ------------------------------------------------------------------------------

when verifyStagedQueueOk:
when verifyDataStructureOk:
proc verifyStagedQueue(
ctx: FlareCtxRef;
info: static[string];
Expand All @@ -54,7 +54,7 @@ when verifyStagedQueueOk:
while rc.isOk:
let
key = rc.value.key
nHeaders = rc.value.data.headers.len.uint
nHeaders = rc.value.data.revHdrs.len.uint
minPt = key - nHeaders + 1
unproc = ctx.unprocCovered(minPt, key)
if 0 < unproc:
Expand Down Expand Up @@ -202,9 +202,9 @@ proc stagedCollect*(
# Request interval
ivReq = BnRange.new(ivReqMin, ivTop)

# Current length of the headers queue. This is one way to calculate
# the response length from the network.
nLhcHeaders = lhc.headers.len
# Current length of the headers queue. This is used to calculate the
# response length from the network.
nLhcHeaders = lhc.revHdrs.len

# Fetch and extend chain record
if not await buddy.fetchAndCheck(ivReq, lhc, info):
Expand All @@ -223,17 +223,17 @@ proc stagedCollect*(
break

# Update remaining interval
let ivRespLen = lhc.headers.len - nLhcHeaders
if iv.minPt + ivRespLen.uint < ivTop:
let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1`
when extraTraceMessages:
trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop),
ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen,
isOpportunistic
ivTop = newIvTop
else:
let ivRespLen = lhc.revHdrs.len - nLhcHeaders
if ivTop <= iv.minPt + ivRespLen.uint or buddy.ctrl.stopped:
break

let newIvTop = ivTop - ivRespLen.uint # will mostly be `ivReq.minPt-1`
when extraTraceMessages:
trace info & ": collected range", peer, iv=BnRange.new(iv.minPt, ivTop),
ivReq, ivResp=BnRange.new(newIvTop+1, ivReq.maxPt), ivRespLen,
isOpportunistic
ivTop = newIvTop

# Store `lhcOpt` chain on the `staged` queue
let qItem = ctx.lhc.staged.insert(iv.maxPt).valueOr:
raiseAssert info & ": duplicate key on staged queue iv=" & $iv
Expand All @@ -242,10 +242,11 @@ proc stagedCollect*(
when extraTraceMessages:
trace info & ": stashed on staged queue", peer,
iv=BnRange.new(iv.maxPt - lhc.headers.len.uint + 1, iv.maxPt),
nHeaders=lhc.headers.len, isOpportunistic
nHeaders=lhc.headers.len, isOpportunistic, ctrl=buddy.ctrl.state
else:
trace info & ": stashed on staged queue", peer,
topBlock=iv.maxPt.bnStr, nHeaders=lhc.headers.len, isOpportunistic
topBlock=iv.maxPt.bnStr, nHeaders=lhc.revHdrs.len,
isOpportunistic, ctrl=buddy.ctrl.state

return true

Expand All @@ -261,7 +262,7 @@ proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int =

let
least = ctx.layout.least # `L` from `README.md` (1) or `worker_desc`
iv = BnRange.new(qItem.key - qItem.data.headers.len.uint + 1, qItem.key)
iv = BnRange.new(qItem.key - qItem.data.revHdrs.len.uint + 1, qItem.key)
if iv.maxPt+1 < least:
when extraTraceMessages:
trace info & ": there is a gap", iv, L=least.bnStr, nSaved=result
Expand All @@ -287,7 +288,7 @@ proc stagedProcess*(ctx: FlareCtxRef; info: static[string]): int =
break

# Store headers on database
ctx.dbStashHeaders(iv.minPt, qItem.data.headers)
ctx.dbStashHeaders(iv.minPt, qItem.data.revHdrs)
ctx.layout.least = iv.minPt
ctx.layout.leastParent = qItem.data.parentHash
let ok = ctx.dbStoreLinkedHChainsLayout()
Expand Down Expand Up @@ -344,13 +345,13 @@ proc stagedReorg*(ctx: FlareCtxRef; info: static[string]) =
defer: walk.destroy()
var rc = walk.first
while rc.isOk:
let (key, nHeaders) = (rc.value.key, rc.value.data.headers.len.uint)
let (key, nHeaders) = (rc.value.key, rc.value.data.revHdrs.len.uint)
ctx.unprocMerge(key - nHeaders + 1, key)
rc = walk.next
# Reset `staged` queue
ctx.lhc.staged.clear()

when verifyStagedQueueOk:
when verifyDataStructureOk:
ctx.verifyStagedQueue(info, multiMode = false)

when extraTraceMessages:
Expand Down
74 changes: 62 additions & 12 deletions nimbus/sync/flare/worker/staged/headers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,40 @@ logScope:
const extraTraceMessages = false # or true
## Enabled additional logging noise

# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------

# Copied from `nimbus_import`
func shortLog(a: chronos.Duration, parts = int.high): string =
## Returns string representation of Duration ``a`` as nanoseconds value.
var
res = ""
v = a.nanoseconds()
parts = parts

template f(n: string, T: Duration) =
if v >= T.nanoseconds():
res.add($(uint64(v div T.nanoseconds())))
res.add(n)
v = v mod T.nanoseconds()
dec parts
if v == 0 or parts <= 0:
return res

f("s", Second)
f("ms", Millisecond)
f("us", Microsecond)
f("ns", Nanosecond)

res

# For some reason neither `formatIt` nor `$` works as expected with logging
# the `elapsed` variable, below. This might be due to the fact that the
# `headersFetchReversed()` function is a generic one, i.e. a template.
func toStr(a: chronos.Duration): string =
a.shortLog(2)

# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
Expand All @@ -36,6 +70,8 @@ proc headersFetchReversed*(
): Future[Result[seq[BlockHeader],void]]
{.async.} =
## Get a list of headers in reverse order.
const
threshold = fetchHeaderReqZombieThreshold # shortcut
let
peer = buddy.peer
useHash = (topHash != EMPTY_ROOT_HASH)
Expand All @@ -56,6 +92,7 @@ proc headersFetchReversed*(
startBlock: HashOrNum(
isHash: false,
number: ivReq.maxPt))
start = Moment.now()

when extraTraceMessages:
trace trEthSendSendingGetBlockHeaders & " reverse", peer, ivReq,
Expand All @@ -64,32 +101,45 @@ proc headersFetchReversed*(
# Fetch headers from peer
var resp: Option[blockHeadersObj]
try:
# There is no obvious way to set an individual timeout for this call. The
# eth/xx driver sets a global response timeout to `10s`. By how it is
# implemented, the `Future` returned by `peer.getBlockHeaders(req)` cannot
# reliably be used in a `withTimeout()` directive. It would rather crash
# in `rplx` with a violated `req.timeoutAt <= Moment.now()` assertion.
resp = await peer.getBlockHeaders(req)
except TransportError as e:
`info` info & ", stop", peer, ivReq, nReq=req.maxResults, useHash,
error=($e.name), msg=e.msg
`info` info & " error", peer, ivReq, nReq=req.maxResults, useHash,
elapsed=(Moment.now() - start).toStr, error=($e.name), msg=e.msg
return err()

# Beware of peer terminating the session while fetching data
if buddy.ctrl.stopped:
return err()
# Kludge: Ban an overly slow peer for a while
let elapsed = Moment.now() - start
if threshold < elapsed:
buddy.ctrl.zombie = true # abandon slow peer

if resp.isNone:
# Evaluate result
if resp.isNone or buddy.ctrl.stopped:
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer,
ivReq, nReq=req.maxResults, respose="n/a", useHash
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=0, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
return err()

let h: seq[BlockHeader] = resp.get.headers
if h.len == 0 or ivReq.len < h.len.uint:
when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults,
useHash, nResp=h.len
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
nResp=h.len, elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
return err()

when extraTraceMessages:
trace trEthRecvReceivedBlockHeaders, peer, ivReq, nReq=req.maxResults,
useHash, ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state
else:
if buddy.ctrl.stopped:
trace trEthRecvReceivedBlockHeaders, peer, nReq=req.maxResults, useHash,
ivResp=BnRange.new(h[^1].number,h[0].number), nResp=h.len,
elapsed=elapsed.toStr, threshold, ctrl=buddy.ctrl.state

return ok(h)

Expand Down
Loading

0 comments on commit 1ced684

Please sign in to comment.