-
Notifications
You must be signed in to change notification settings - Fork 257
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial full sync impl #117
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,14 +13,16 @@ type | |
kHashToBlock | ||
kHeadBlock # Pointer to the most recent block seen | ||
kTailBlock # Pointer to the earliest finalized block | ||
kSlotToBlockRoots | ||
|
||
func subkey(kind: DbKeyKind): array[1, byte] = | ||
result[0] = byte ord(kind) | ||
|
||
func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]): | ||
array[N + 1, byte] = | ||
result[0] = byte ord(kind) | ||
result[1 .. ^1] = key | ||
func subkey[T](kind: DbKeyKind, key: T): auto = | ||
var res: array[sizeof(T) + 1, byte] | ||
res[0] = byte ord(kind) | ||
copyMem(addr res[1], unsafeAddr key, sizeof(key)) | ||
return res | ||
|
||
func subkey(kind: type BeaconState, key: Eth2Digest): auto = | ||
subkey(kHashToState, key.data) | ||
|
@@ -32,6 +34,26 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB = | |
new result | ||
result.backend = backend | ||
|
||
proc toSeq(v: openarray[byte], ofType: type): seq[ofType] = | ||
if v.len != 0: | ||
assert(v.len mod sizeof(ofType) == 0) | ||
let sz = v.len div sizeof(ofType) | ||
result = newSeq[ofType](sz) | ||
copyMem(addr result[0], unsafeAddr v[0], v.len) | ||
|
||
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) = | ||
let slotKey = subkey(kSlotToBlockRoots, value.slot) | ||
var blockRootsBytes = db.backend.get(slotKey) | ||
var blockRoots = blockRootsBytes.toSeq(Eth2Digest) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically could just use ssz here no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. Just... why? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for simplicity so we just have one serialization format in db (and reuse the code for flattening a seq) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I don't have a strong opinion, but a few reasons I like my approach more:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was more curious than anything.. I wouldn't have bothered with coming up with a special serialization for it (and the additional unsafe code that comes with). I get the feeling the real optimization here will be to drop |
||
if key notin blockRoots: | ||
db.backend.put(subkey(type value, key), ssz.serialize(value)) | ||
blockRootsBytes.setLen(blockRootsBytes.len + sizeof(key)) | ||
copyMem(addr blockRootsBytes[^sizeof(key)], unsafeAddr key, sizeof(key)) | ||
db.backend.put(slotKey, blockRootsBytes) | ||
|
||
proc putHead*(db: BeaconChainDB, key: Eth2Digest) = | ||
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block? | ||
|
||
proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) = | ||
# TODO: prune old states | ||
# TODO: it might be necessary to introduce the concept of a "last finalized | ||
|
@@ -50,9 +72,6 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) = | |
proc putState*(db: BeaconChainDB, value: BeaconState) = | ||
db.putState(hash_tree_root_final(value), value) | ||
|
||
proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) = | ||
db.backend.put(subkey(type value, key), ssz.serialize(value)) | ||
|
||
proc putBlock*(db: BeaconChainDB, value: BeaconBlock) = | ||
db.putBlock(hash_tree_root_final(value), value) | ||
|
||
|
@@ -81,6 +100,9 @@ proc getHeadBlock*(db: BeaconChainDB): Option[Eth2Digest] = | |
proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] = | ||
db.get(subkey(kTailBlock), Eth2Digest) | ||
|
||
proc getBlockRootsForSlot*(db: BeaconChainDB, slot: uint64): seq[Eth2Digest] = | ||
db.backend.get(subkey(kSlotToBlockRoots, slot)).toSeq(Eth2Digest) | ||
|
||
proc containsBlock*( | ||
db: BeaconChainDB, key: Eth2Digest): bool = | ||
db.backend.contains(subkey(BeaconBlock, key)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -128,15 +128,20 @@ proc readValue*(reader: var JsonReader, value: var ValidatorPrivKey) {.inline.} | |
|
||
proc newPrivKey*(): ValidatorPrivKey = SigKey.random() | ||
|
||
# RLP serialization (TODO: remove if no longer necessary) | ||
proc append*(writer: var RlpWriter, value: ValidatorPubKey) = | ||
writer.append value.getBytes() | ||
|
||
proc read*(rlp: var Rlp, T: type ValidatorPubKey): T {.inline.} = | ||
ValidatorPubKey.init rlp.toBytes.toOpenArray | ||
let r = rlp.read(seq[byte]) | ||
if not init(result, r): | ||
raise newException(Exception, "Could not init ValidatorPubKey from bytes") | ||
|
||
proc append*(writer: var RlpWriter, value: ValidatorSig) = | ||
writer.append value.getBytes() | ||
|
||
proc read*(rlp: var Rlp, T: type ValidatorSig): T {.inline.} = | ||
ValidatorSig.init rlp.toBytes.toOpenArray | ||
let r = rlp.read(seq[byte]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The version using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, it wasn't advancing the read cursor. Fixed it differently. |
||
if not init(result, r): | ||
raise newException(Exception, "Could not init ValidatorSig from bytes") | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,8 @@ | ||
import | ||
options, | ||
options, tables, | ||
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx, | ||
spec/[datatypes, crypto, digest] | ||
spec/[datatypes, crypto, digest], | ||
beacon_node, beacon_chain_db, time, ssz | ||
|
||
type | ||
ValidatorChangeLogEntry* = object | ||
|
@@ -13,8 +14,132 @@ type | |
|
||
ValidatorSet = seq[Validator] | ||
|
||
BeaconSyncState* = ref object | ||
node*: BeaconNode | ||
db*: BeaconChainDB | ||
|
||
func toHeader(b: BeaconBlock): BeaconBlockHeader = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. any reason for not using construction syntax? ie There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No reason really. Somehow I keep forgetting about this option :) |
||
result.slot = b.slot | ||
result.parent_root = b.parent_root | ||
result.state_root = b.state_root | ||
result.randao_reveal = b.randao_reveal | ||
result.eth1_data = b.eth1_data | ||
result.signature = b.signature | ||
result.body = hash_tree_root_final(b.body) | ||
|
||
proc fromHeader(b: var BeaconBlock, h: BeaconBlockHeader) = | ||
b.slot = h.slot | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. humm, this looks a bit dangerous in that it lets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or ideally take both a body and a header, so it's impossible to forget to set data.. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. Will do. |
||
b.parent_root = h.parent_root | ||
b.state_root = h.state_root | ||
b.randao_reveal = h.randao_reveal | ||
b.eth1_data = h.eth1_data | ||
b.signature = h.signature | ||
|
||
proc importBlocks(node: BeaconNode, roots: openarray[(Eth2Digest, uint64)], headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]) = | ||
var bodyMap = initTable[Eth2Digest, int]() | ||
|
||
for i, b in bodies: | ||
bodyMap[hash_tree_root_final(b)] = i | ||
|
||
var goodBlocks, badBlocks = 0 | ||
for h in headers: | ||
let iBody = bodyMap.getOrDefault(h.body, -1) | ||
if iBody >= 0: | ||
var blk: BeaconBlock | ||
blk.fromHeader(h) | ||
blk.body = bodies[iBody] | ||
node.onBeaconBlock(blk) | ||
inc goodBlocks | ||
else: | ||
inc badBlocks | ||
|
||
info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len | ||
|
||
|
||
p2pProtocol BeaconSync(version = 1, | ||
shortName = "bcs"): | ||
shortName = "bcs", | ||
networkState = BeaconSyncState): | ||
|
||
onPeerConnected do(peer: Peer): | ||
const | ||
protocolVersion = 1 # TODO: Spec doesn't specify this yet | ||
networkId = 1 | ||
let node = peer.networkState.node | ||
|
||
var | ||
latestFinalizedRoot: Eth2Digest # TODO | ||
latestFinalizedEpoch: uint64 = node.state.data.finalized_epoch | ||
bestRoot: Eth2Digest # TODO | ||
bestSlot: uint64 = node.state.data.slot | ||
|
||
await peer.status(protocolVersion, networkId, latestFinalizedRoot, latestFinalizedEpoch, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should use the new |
||
bestRoot, bestSlot) | ||
|
||
let m = await peer.nextMsg(BeaconSync.status) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens here if peer sends something different, or doesn't send anything at all? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Different dispatchers will be called (e.g. |
||
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot)) | ||
if bestDiff == 0: | ||
# Nothing to do? | ||
trace "Nothing to sync", peer = peer.node | ||
else: | ||
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the | ||
# connection if it's too big. | ||
let db = peer.networkState.db | ||
|
||
if bestDiff > 0: | ||
# Send roots | ||
# TODO: Currently we send all block roots in one "packet". Maybe | ||
# they should be split to multiple packets. | ||
type Root = (Eth2Digest, uint64) | ||
var roots = newSeqOfCap[Root](128) | ||
for i in m.bestSlot .. bestSlot: | ||
for r in db.getBlockRootsForSlot(i): | ||
roots.add((r, i)) | ||
|
||
await peer.beaconBlockRoots(roots) | ||
else: | ||
# Receive roots | ||
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots) | ||
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0) | ||
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len) | ||
for r in roots.roots: | ||
bodiesRequest.add(r[0]) | ||
let bodies = await peer.getBeaconBlockBodies(bodiesRequest) | ||
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies) | ||
|
||
proc status(peer: Peer, protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest, | ||
latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64) | ||
|
||
proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, uint64)]) | ||
|
||
requestResponse: | ||
proc getBeaconBlockHeaders(peer: Peer, blockRoot: Eth2Digest, slot: uint64, maxHeaders: int, skipSlots: int) = | ||
# TODO: validate maxHeaders | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO and implement skipSlots |
||
var s = slot | ||
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders) | ||
let db = peer.networkState.db | ||
while headers.len < maxHeaders: | ||
let blkRoots = db.getBlockRootsForSlot(s) | ||
for r in blkRoots: | ||
headers.add(db.getBlock(r).get().toHeader) | ||
if headers.len == maxHeaders: break | ||
inc s | ||
await peer.beaconBlockHeaders(reqId, headers) | ||
|
||
proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader]) | ||
|
||
requestResponse: | ||
proc getBeaconBlockBodies(peer: Peer, blockRoots: openarray[Eth2Digest]) = | ||
# TODO: Validate blockRoots.len | ||
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len) | ||
let db = peer.networkState.db | ||
for r in blockRoots: | ||
if (let blk = db.getBlock(r); blk.isSome): | ||
bodies.add(blk.get().body) | ||
await peer.beaconBlockBodies(reqId, bodies) | ||
|
||
proc beaconBlockBodies(peer: Peer, blockBodies: openarray[BeaconBlockBody]) | ||
|
||
|
||
requestResponse: | ||
proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) = | ||
var bb: BeaconBlock | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this function a bit dangerous? I can pass a string by accident and it will do the wrong thing. Taking
openarray[byte]
seems a bit safer.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Added an overload instead.