Skip to content

Commit

Permalink
avoid unnecessary recompression in block protocol
Browse files Browse the repository at this point in the history
Blocks can be sent straight from compressed data sources
  • Loading branch information
arnetheduck committed Apr 15, 2022
1 parent d0dbc4a commit 29c05a1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 17 deletions.
35 changes: 31 additions & 4 deletions beacon_chain/networking/eth2_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,25 @@ proc isLightClientRequestProto(fn: NimNode): NimNode =

return newLit(false)

proc writeChunkSZ*(
conn: Connection, responseCode: Option[ResponseCode],
uncompressedLen: uint64, payloadSZ: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
var output = memoryOutput(payloadSZ.len + contextBytes.len + 11)
try:
if responseCode.isSome:
output.write byte(responseCode.get)

if contextBytes.len > 0:
output.write contextBytes

output.write toBytes(uncompressedLen, Leb128).toOpenArray()
output.write payloadSZ
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise

conn.write(output.getOutput)

proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode],
payload: openArray[byte],
Expand Down Expand Up @@ -591,6 +610,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.
finally:
await stream.close()

proc sendResponseChunkBytesSZ(
response: UntypedResponse, uncompressedLen: uint64,
payloadSZ: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
inc response.writtenChunks
response.stream.writeChunkSZ(
some Success, uncompressedLen, payloadSZ, contextBytes)

proc sendResponseChunkBytes(
response: UntypedResponse, payload: openArray[byte],
contextBytes: openArray[byte] = []): Future[void] =
Expand Down Expand Up @@ -651,10 +678,10 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val)

template writeRawBytes*[M](
r: MultipleChunksResponse[M], bytes: openArray[byte],
contextBytes: openArray[byte]): untyped =
sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes)
template writeBytesSZ*[M](
r: MultipleChunksResponse[M], uncompressedLen: uint64,
bytes: openArray[byte], contextBytes: openArray[byte]): untyped =
sendResponseChunkBytesSZ(UntypedResponse(r), uncompressedLen, bytes, contextBytes)

template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk
Expand Down
45 changes: 32 additions & 13 deletions beacon_chain/sync/sync_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
# https://github.com/ethereum/consensus-specs/pull/2802

import
options, tables, sets, macros,
chronicles, chronos, stew/ranges/bitranges, libp2p/switch,
std/[options, tables, sets, macros],
chronicles, chronos, snappy/codec,
stew/ranges/bitranges, libp2p/switch,
../spec/datatypes/[phase0, altair, bellatrix],
../spec/[helpers, forks, network],
".."/[beacon_clock],
Expand Down Expand Up @@ -271,15 +272,16 @@ p2pProtocol BeaconSync(version = 1,
# Also, our response would be indistinguishable from a node
# that have been synced exactly to the altair transition slot.
break

if dag.getBlockSSZ(blocks[i], bytes):
trace "writing response block",
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
if dag.getBlockSZ(blocks[i], bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blocks[i])
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()

await response.writeRawBytes(bytes, []) # phase0 bytes
await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0 bytes

inc found

Expand Down Expand Up @@ -334,11 +336,16 @@ p2pProtocol BeaconSync(version = 1,
# that have been synced exactly to the altair transition slot.
continue

if dag.getBlockSSZ(blockRef.bid, bytes):
if dag.getBlockSZ(blockRef.bid, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef)
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()

await response.writeRawBytes(bytes, []) # phase0 bytes
await response.writeBytesSZ(uncompressedLen, bytes, []) # phase0
inc found

debug "Block root request done",
Expand Down Expand Up @@ -388,11 +395,17 @@ p2pProtocol BeaconSync(version = 1,

for i in startIndex..endIndex:
if dag.getBlockSSZ(blocks[i], bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blocks[i])
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()

await response.writeRawBytes(
bytes, dag.forkDigestAtEpoch(blocks[i].slot.epoch).data)
await response.writeBytesSZ(
uncompressedLen, bytes,
dag.forkDigestAtEpoch(blocks[i].slot.epoch).data)

inc found

Expand Down Expand Up @@ -438,11 +451,17 @@ p2pProtocol BeaconSync(version = 1,
continue

if dag.getBlockSSZ(blockRef.bid, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef)
continue

peer.updateRequestQuota(blockResponseCost)
peer.awaitNonNegativeRequestQuota()

await response.writeRawBytes(
bytes, dag.forkDigestAtEpoch(blockRef.slot.epoch).data)
await response.writeBytesSZ(
uncompressedLen, bytes,
dag.forkDigestAtEpoch(blockRef.slot.epoch).data)

inc found

Expand Down

0 comments on commit 29c05a1

Please sign in to comment.