Skip to content

Commit

Permalink
Merge pull request #117 from status-im/sync2
Browse files Browse the repository at this point in the history
Initial full sync impl
  • Loading branch information
yglukhov authored Mar 4, 2019
2 parents 39744ea + 9ea0bf4 commit 21e4deb
Show file tree
Hide file tree
Showing 5 changed files with 186 additions and 10 deletions.
31 changes: 28 additions & 3 deletions beacon_chain/beacon_chain_db.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ type
kHashToBlock
kHeadBlock # Pointer to the most recent block seen
kTailBlock # Pointer to the earliest finalized block
kSlotToBlockRoots

func subkey(kind: DbKeyKind): array[1, byte] =
result[0] = byte ord(kind)
Expand All @@ -22,6 +23,10 @@ func subkey[N: static int](kind: DbKeyKind, key: array[N, byte]):
result[0] = byte ord(kind)
result[1 .. ^1] = key

func subkey(kind: DbKeyKind, key: uint64): array[sizeof(key) + 1, byte] =
result[0] = byte ord(kind)
copyMem(addr result[1], unsafeAddr key, sizeof(key))

func subkey(kind: type BeaconState, key: Eth2Digest): auto =
subkey(kHashToState, key.data)

Expand All @@ -32,6 +37,26 @@ proc init*(T: type BeaconChainDB, backend: TrieDatabaseRef): BeaconChainDB =
new result
result.backend = backend

proc toSeq(v: openarray[byte], ofType: type): seq[ofType] =
if v.len != 0:
assert(v.len mod sizeof(ofType) == 0)
let sz = v.len div sizeof(ofType)
result = newSeq[ofType](sz)
copyMem(addr result[0], unsafeAddr v[0], v.len)

proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
let slotKey = subkey(kSlotToBlockRoots, value.slot)
var blockRootsBytes = db.backend.get(slotKey)
var blockRoots = blockRootsBytes.toSeq(Eth2Digest)
if key notin blockRoots:
db.backend.put(subkey(type value, key), ssz.serialize(value))
blockRootsBytes.setLen(blockRootsBytes.len + sizeof(key))
copyMem(addr blockRootsBytes[^sizeof(key)], unsafeAddr key, sizeof(key))
db.backend.put(slotKey, blockRootsBytes)

proc putHead*(db: BeaconChainDB, key: Eth2Digest) =
db.backend.put(subkey(kHeadBlock), key.data) # TODO head block?

proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
# TODO: prune old states
# TODO: it might be necessary to introduce the concept of a "last finalized
Expand All @@ -50,9 +75,6 @@ proc putState*(db: BeaconChainDB, key: Eth2Digest, value: BeaconState) =
proc putState*(db: BeaconChainDB, value: BeaconState) =
db.putState(hash_tree_root_final(value), value)

proc putBlock*(db: BeaconChainDB, key: Eth2Digest, value: BeaconBlock) =
db.backend.put(subkey(type value, key), ssz.serialize(value))

proc putBlock*(db: BeaconChainDB, value: BeaconBlock) =
db.putBlock(hash_tree_root_final(value), value)

Expand Down Expand Up @@ -81,6 +103,9 @@ proc getHeadBlock*(db: BeaconChainDB): Option[Eth2Digest] =
proc getTailBlock*(db: BeaconChainDB): Option[Eth2Digest] =
db.get(subkey(kTailBlock), Eth2Digest)

proc getBlockRootsForSlot*(db: BeaconChainDB, slot: uint64): seq[Eth2Digest] =
db.backend.get(subkey(kSlotToBlockRoots, slot)).toSeq(Eth2Digest)

proc containsBlock*(
db: BeaconChainDB, key: Eth2Digest): bool =
db.backend.contains(subkey(BeaconBlock, key))
Expand Down
13 changes: 11 additions & 2 deletions beacon_chain/beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import
spec/[datatypes, digest, crypto, beaconstate, helpers, validator], conf, time,
state_transition, fork_choice, ssz, beacon_chain_db, validator_pool, extras,
attestation_pool, block_pool,
mainchain_monitor, sync_protocol, gossipsub_protocol, trusted_state_snapshots,
mainchain_monitor, gossipsub_protocol, trusted_state_snapshots,
eth/trie/db, eth/trie/backends/rocksdb_backend

type
Expand All @@ -15,7 +15,7 @@ type
keys*: KeyPair
attachedValidators: ValidatorPool
blockPool: BlockPool
state: StateData
state*: StateData
attestationPool: AttestationPool
mainchainMonitor: MainchainMonitor
potentialHeads: seq[Eth2Digest]
Expand All @@ -28,6 +28,12 @@ const
topicAttestations = "ethereum/2.1/beacon_chain/attestations"
topicfetchBlocks = "ethereum/2.1/beacon_chain/fetch"


proc onBeaconBlock*(node: BeaconNode, blck: BeaconBlock) {.gcsafe.}

import sync_protocol


func shortValidatorKey(node: BeaconNode, validatorIdx: int): string =
($node.state.data.validator_registry[validatorIdx].pubkey)[0..7]

Expand Down Expand Up @@ -86,6 +92,9 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): T =

result.network =
newEthereumNode(result.keys, address, 0, nil, clientId, minPeers = 1)
let state = result.network.protocolState(BeaconSync)
state.node = result
state.db = result.db

let
head = result.blockPool.get(result.db.getHeadBlock().get())
Expand Down
7 changes: 5 additions & 2 deletions beacon_chain/spec/crypto.nim
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,18 @@ proc readValue*(reader: var JsonReader, value: var ValidatorPrivKey) {.inline.}

proc newPrivKey*(): ValidatorPrivKey = SigKey.random()

# RLP serialization (TODO: remove if no longer necessary)
proc append*(writer: var RlpWriter, value: ValidatorPubKey) =
writer.append value.getBytes()

proc read*(rlp: var Rlp, T: type ValidatorPubKey): T {.inline.} =
ValidatorPubKey.init rlp.toBytes.toOpenArray
result = ValidatorPubKey.init(rlp.toBytes.toOpenArray)
rlp.skipElem()

proc append*(writer: var RlpWriter, value: ValidatorSig) =
writer.append value.getBytes()

proc read*(rlp: var Rlp, T: type ValidatorSig): T {.inline.} =
ValidatorSig.init rlp.toBytes.toOpenArray
result = ValidatorSig.init(rlp.toBytes.toOpenArray)
rlp.skipElem()

12 changes: 12 additions & 0 deletions beacon_chain/spec/datatypes.nim
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,18 @@ type

body*: BeaconBlockBody

BeaconBlockHeader* = object
## Same as BeaconBlock, except `body` is the `hash_tree_root` of the
## associated BeaconBlockBody.
# TODO: Dry it up with BeaconBlock
slot*: uint64
parent_root*: Eth2Digest
state_root*: Eth2Digest
randao_reveal*: ValidatorSig
eth1_data*: Eth1Data
signature*: ValidatorSig
body*: Eth2Digest

# https://github.com/ethereum/eth2.0-specs/blob/v0.3.0/specs/core/0_beacon-chain.md#beaconblockbody
BeaconBlockBody* = object
proposer_slashings*: seq[ProposerSlashing]
Expand Down
133 changes: 130 additions & 3 deletions beacon_chain/sync_protocol.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import
options,
options, tables,
chronicles, eth/[rlp, p2p], chronos, ranges/bitranges, eth/p2p/rlpx,
spec/[datatypes, crypto, digest]
spec/[datatypes, crypto, digest],
beacon_node, beacon_chain_db, time, ssz

type
ValidatorChangeLogEntry* = object
Expand All @@ -13,8 +14,134 @@ type

ValidatorSet = seq[Validator]

BeaconSyncState* = ref object
node*: BeaconNode
db*: BeaconChainDB

func toHeader(b: BeaconBlock): BeaconBlockHeader =
BeaconBlockHeader(
slot: b.slot,
parent_root: b.parent_root,
state_root: b.state_root,
randao_reveal: b.randao_reveal,
eth1_data : b.eth1_data,
signature: b.signature,
body: hash_tree_root_final(b.body)
)

proc fromHeader(b: var BeaconBlock, h: BeaconBlockHeader) =
b.slot = h.slot
b.parent_root = h.parent_root
b.state_root = h.state_root
b.randao_reveal = h.randao_reveal
b.eth1_data = h.eth1_data
b.signature = h.signature

proc importBlocks(node: BeaconNode, roots: openarray[(Eth2Digest, uint64)], headers: openarray[BeaconBlockHeader], bodies: openarray[BeaconBlockBody]) =
var bodyMap = initTable[Eth2Digest, int]()

for i, b in bodies:
bodyMap[hash_tree_root_final(b)] = i

var goodBlocks, badBlocks = 0
for h in headers:
let iBody = bodyMap.getOrDefault(h.body, -1)
if iBody >= 0:
var blk: BeaconBlock
blk.fromHeader(h)
blk.body = bodies[iBody]
node.onBeaconBlock(blk)
inc goodBlocks
else:
inc badBlocks

info "Forward sync imported blocks", goodBlocks, badBlocks, headers = headers.len, bodies = bodies.len, roots = roots.len


p2pProtocol BeaconSync(version = 1,
shortName = "bcs"):
shortName = "bcs",
networkState = BeaconSyncState):

onPeerConnected do(peer: Peer):
const
protocolVersion = 1 # TODO: Spec doesn't specify this yet
networkId = 1
let node = peer.networkState.node

var
latestFinalizedRoot: Eth2Digest # TODO
latestFinalizedEpoch: uint64 = node.state.data.finalized_epoch
bestRoot: Eth2Digest # TODO
bestSlot: uint64 = node.state.data.slot

await peer.status(protocolVersion, networkId, latestFinalizedRoot, latestFinalizedEpoch,
bestRoot, bestSlot)

let m = await peer.nextMsg(BeaconSync.status)
let bestDiff = cmp((latestFinalizedEpoch, bestSlot), (m.latestFinalizedEpoch, m.bestSlot))
if bestDiff == 0:
# Nothing to do?
trace "Nothing to sync", peer = peer.node
else:
# TODO: Check for WEAK_SUBJECTIVITY_PERIOD difference and terminate the
# connection if it's too big.
let db = peer.networkState.db

if bestDiff > 0:
# Send roots
# TODO: Currently we send all block roots in one "packet". Maybe
# they should be split to multiple packets.
type Root = (Eth2Digest, uint64)
var roots = newSeqOfCap[Root](128)
for i in m.bestSlot .. bestSlot:
for r in db.getBlockRootsForSlot(i):
roots.add((r, i))

await peer.beaconBlockRoots(roots)
else:
# Receive roots
let roots = await peer.nextMsg(BeaconSync.beaconBlockRoots)
let headers = await peer.getBeaconBlockHeaders(bestRoot, bestSlot, roots.roots.len, 0)
var bodiesRequest = newSeqOfCap[Eth2Digest](roots.roots.len)
for r in roots.roots:
bodiesRequest.add(r[0])
let bodies = await peer.getBeaconBlockBodies(bodiesRequest)
node.importBlocks(roots.roots, headers.get.blockHeaders, bodies.get.blockBodies)

proc status(peer: Peer, protocolVersion, networkId: int, latestFinalizedRoot: Eth2Digest,
latestFinalizedEpoch: uint64, bestRoot: Eth2Digest, bestSlot: uint64)

proc beaconBlockRoots(peer: Peer, roots: openarray[(Eth2Digest, uint64)])

requestResponse:
proc getBeaconBlockHeaders(peer: Peer, blockRoot: Eth2Digest, slot: uint64, maxHeaders: int, skipSlots: int) =
# TODO: validate maxHeaders
var s = slot
var headers = newSeqOfCap[BeaconBlockHeader](maxHeaders)
let db = peer.networkState.db
while headers.len < maxHeaders:
let blkRoots = db.getBlockRootsForSlot(s)
for r in blkRoots:
headers.add(db.getBlock(r).get().toHeader)
if headers.len == maxHeaders: break
inc s
await peer.beaconBlockHeaders(reqId, headers)

proc beaconBlockHeaders(peer: Peer, blockHeaders: openarray[BeaconBlockHeader])

requestResponse:
proc getBeaconBlockBodies(peer: Peer, blockRoots: openarray[Eth2Digest]) =
# TODO: Validate blockRoots.len
var bodies = newSeqOfCap[BeaconBlockBody](blockRoots.len)
let db = peer.networkState.db
for r in blockRoots:
if (let blk = db.getBlock(r); blk.isSome):
bodies.add(blk.get().body)
await peer.beaconBlockBodies(reqId, bodies)

proc beaconBlockBodies(peer: Peer, blockBodies: openarray[BeaconBlockBody])


requestResponse:
proc getValidatorChangeLog(peer: Peer, changeLogHead: Eth2Digest) =
var bb: BeaconBlock
Expand Down

0 comments on commit 21e4deb

Please sign in to comment.