diff --git a/package.json b/package.json index ea344d337a58..ef4952f88578 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,7 @@ "pouchdb-adapter-memory": "^7.0.0", "pouchdb-core": "^7.0.0", "promisify-es6": "^1.0.3", + "pull-stream": "^3.6.10", "winston": "^3.2.1", "ws": "^6.2.1" }, diff --git a/src/node/index.ts b/src/node/index.ts index 5f5e58ba927a..d53ad6908aca 100644 --- a/src/node/index.ts +++ b/src/node/index.ts @@ -52,7 +52,7 @@ class BeaconNode { ); this.db = new LevelDB(this.conf.db); - this.network = new P2PNetwork(this.conf.network); + this.network = new P2PNetwork(this.conf.network, {chain, db}); this.eth1 = new EthersEth1Notifier( this.conf.eth1, { diff --git a/src/p2p/index.ts b/src/p2p/index.ts index d7d1318c3554..463ac1eaac40 100644 --- a/src/p2p/index.ts +++ b/src/p2p/index.ts @@ -4,9 +4,13 @@ import {LodestarNode} from "./node"; import logger, {AbstractLogger} from "../logger"; import {PeerInfo} from "peer-info"; import LibP2p from "libp2p"; +import {pull} from "pull-stream"; import {PeerBook} from "peer-book"; import {PeerId} from "peer-id"; import {promisify} from "promisify-es6"; +import {Hello, Goodbye} from "../rpc/api/wire/messages"; +import {DB} from "../db"; +import {BeaconChain} from "../chain"; export interface P2pOptions { maxPeers: number; @@ -41,15 +45,21 @@ export class P2PNetwork extends EventEmitter implements Service { private log: AbstractLogger; - public constructor(opts: P2pOptions) { + private chain: BeaconChain; + + private db: DB; + + public constructor(opts: P2pOptions, {chain, db}) { super(); this.options = opts; this.maxPeers = this.options.maxPeers; this.refreshInterval = this.options.refreshInterval; this.peerBook = this.options.peerBook; this.privateKey = this.options.privateKey; - this.bootnodes = this.options.bootnodes; + this.bootnodes = this.options.bootnodes || []; + this.chain = chain; + this.db = db; this.started = false; this.node = null; @@ -71,43 +81,6 @@ export class P2PNetwork extends EventEmitter implements Service { peerInfo: await this.createPeerInfo(), bootnodes: this.options.bootnodes }); - - this.node.on('peer:discovery', (peerInfo) => { - try { - const peerId = peerInfo.id.toB58String(); - // Check if peer has already been discovered - if (this.options.peerBook.has(peerId) || this.discoveredPeers.has(peerId)) { - return; - } - this.peerBook.put(peerInfo); - this.node.dial(peerInfo, () => {}); - this.log.info(`Peer discovered: ${peerInfo}`); - this.emit('connected', peerInfo); - } catch (err) { - this.log.error(err); - } - - }); - - this.node.on('peer:connect', (peerInfo) => { - try { - this.log.info(`Peer connected: ${peerInfo}`); - this.peerBook.put(peerInfo); - this.discoveredPeers.add(peerInfo); - } catch (err) { - this.log.error(err); - } - }); - - this.node.on('peer:disconnect', (peerInfo) => { - try { - this.peerBook.remove(peerInfo); - this.discoveredPeers.delete(peerInfo); - } catch (err) { - this.log.error(err); - } - }); - } await promisify(this.node.start.bind(this.node))(); this.started = true; @@ -125,8 +98,36 @@ export class P2PNetwork extends EventEmitter implements Service { return new Promise((resolve, reject) => { const handler = (err, peerInfo) => { if (err) { - return reject(err); - } + return reject(err); + } + + protobuf.load('./messages/wire.proto').then((root) => { + this.node.on('peer:connection', (conn, peer) => { + this.log.info('peer:connection'); + + // Temporary parameters until the rest is ready. + peer.rpc.Hello({ + networkId: 0, + chainId: 0, + latestFinalizedRoot: 0x00, + latestFinalizedEpoch: 0, + bestRoot: 0x00, + bestSlot: 0, + }, + + (response, peer) => { + // Process response + }); + }); + + // Simplify this. + this.node.handle('Hello', (networkId, chainId, latestFinalizedRoot, latestFinalizedEpoch, bestRoot, BestSlot, peer, response) => { + response({ + // Respond with hello message + }); + }); + }); + this.peerBook.getAll().forEach((peer) => { peer.multiaddrs.forEach((multiaddr) => { peerInfo.multiaddrs.add(multiaddr); @@ -146,4 +147,4 @@ export class P2PNetwork extends EventEmitter implements Service { } }); } -} +}i diff --git a/src/p2p/messages/packet.proto b/src/p2p/messages/packet.proto new file mode 100644 index 000000000000..db847c3d92e0 --- /dev/null +++ b/src/p2p/messages/packet.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +message Packet { + string key = 1; + string method = 2; + string type = 3; + string dataType = 4; +} diff --git a/src/p2p/messages/wire.proto b/src/p2p/messages/wire.proto new file mode 100644 index 000000000000..9da5d407e89f --- /dev/null +++ b/src/p2p/messages/wire.proto @@ -0,0 +1,87 @@ +syntax = "proto3"; + +service Eth2WireProtocol { + rpc Hello (HelloRequest) returns (HelloResponse) {} + rpc Goodbye (GoodbyeRequest) returns (GoodbyeResponse) {} + rpc GetStatus (GetStatusRequest) returns (GetStatusResponse) {} + rpc BeaconBlockRoots (BeaconBlockRootsRequest) returns (BeaconBlockRootsResponse) {} + rpc BeaconBlockHeaders (BeaconBlockHeadersRequest) returns (BeaconBlockResponse) {} + rpc BeaconBlockBodies (BeaconBlockBodiesRequest) returns (BeaconBlockBodiesResponse) {} + rpc BeaconChainState (BeaconChainStateRequest) returns (BeaconChainStateResponse) {} +} + +message HelloRequest { + uint32 networkId = 1; + uint64 chainId = 2; + bytes latestFinalizedRoot = 3; + uint64 latestFinalizedEpoch = 4; + bytes bestRoot = 5; + uint64 bestSlot = 6; +} + +message HelloResponse { + uint32 networkId = 1; + uint64 chainId = 2; + bytes latestFinalizedRoot = 3; + uint64 latestFinalizedEpoch = 4; + bytes bestRoot = 5; + uint64 bestSlot = 6; +} + +message GoodbyeRequest { + uint64 reason = 1; +} + +message GoodbyeResponse { + uint64 reason = 1; +} + +message GetStatusRequest { + bytes sha = 1; + bytes userAgent = 2; + uint64 timestamp = 3; +} + +message GetStatusResponse { + bytes sha = 1; + bytes userAgent = 2; + uint64 timestamp = 3; +} + +message BeaconBlockRootsRequest { + uint64 startSlot = 1; + uint64 count = 2; +} + +message BeaconBlockRootsResponse { + bytes blockRoot = 1; + uint64 slot = 2; + repeated bytes roots = 3; +} + +message BeaconBlockHeadersRequest { + bytes startRoot = 1; + uint64 startSlot = 2; + uint64 maxHeaders = 3; + uint64 skipSlots = 4; +} + +message BeaconBlockHeadersResponse { + repeated bytes headers = 1; +} + +message BeaconBlockBodiesRequest { + repeated bytes blockRoots = 1; +} + +message BeaconBlockBodiesResponse { + repeated bytes blockBodies = 1; +} + +message BeaconChainStateRequest { + repeated bytes hashes = 1; +} + +message BeaconChainStateResponse { + +} diff --git a/src/p2p/node.ts b/src/p2p/node.ts index 1f12c42c5944..465db741ce7d 100644 --- a/src/p2p/node.ts +++ b/src/p2p/node.ts @@ -7,6 +7,9 @@ import {PeerInfo} from "peer-info"; import {PeerId} from "peer-id"; import {defaultsDeep} from "@nodeutils/defaults-deep"; import * as FloodSub from "libp2p-floodsub"; +import {protobuf} from "protobufjs"; +import logger, {AbstractLogger} from "../logger"; +import {PeerBook} from "peer-book"; export interface LodestarNodeOpts { bootstrap?: string[]; @@ -18,6 +21,10 @@ export class LodestarNode extends LibP2p { private pubsub: FloodSub; + private log: AbstractLogger; + + private peerBook: PeerBook; + private constructor(_options: LodestarNodeOpts) { const defaults = { modules: { @@ -36,6 +43,10 @@ export class LodestarNode extends LibP2p { } }; + this.handlers = {}; + this.requests = {}; + this.peerBook = new PeerBook(); + this.log = logger; super(defaultsDeep(_options, defaults)); } @@ -53,8 +64,59 @@ export class LodestarNode extends LibP2p { } public async start(): Promise { - await promisify(super.start.bind(this))(); + const startFn = promisify(super.start.bind(this)); + await startFn((err) => { + if (err) { + this.log.info(err); + return; + } + + this.on('peer:discovery', (peerInfo) => { + this.log.info(`Discovered Peer: ${peerInfo}`); + const peerId = peerInfo.id.toB58String(); + if (peerBook.has(peerId)) { + return; + } + + this.peerBook.put(peerInfo); + + this.dialProtocol(peerInfo, 'eth/serenity/beacon/rpc/1', (err, conn) => { + if (err) { + this.log.info(`Error during dialing: ${err} `); + return; + } + + return this._connection(conn, peerInfo); + }) + }); + super.handle('eth/serenity/beacon/rpc/1', (protocol, conn) => { + return this._connection(conn, null); + }); + + this.on('peer:connect', (peerInfo) => { + this.log.info(`Peer connected: ${peerInfo}`); + this.peerBook.put(peerInfo); + }); + + this.on('peer:disconnect', (peerInfo) => { + this.log.info(`Peer disconnected: ${peerInfo}`); + this.peerBook.remote(peerInfo); + }); + + }); await promisify(this.pubsub.start.bind(this.pubsub))(); } + + public async handle() { + + } + + private async _connection(conn: Connection, peer:): Promise { + + } + + private async _rpc(send: ): Promise { + + } } diff --git a/src/rpc/api/wire/index.ts b/src/rpc/api/wire/index.ts new file mode 100644 index 000000000000..fcadf8d9121c --- /dev/null +++ b/src/rpc/api/wire/index.ts @@ -0,0 +1,7 @@ +import {WireProtocolApi} from "./wire"; +import {IWireProtocolApi} from "./interface"; + +export { + WireProtocolApi, + IWireProtocolApi +}; diff --git a/src/rpc/api/wire/interface.ts b/src/rpc/api/wire/interface.ts new file mode 100644 index 000000000000..a938a289bfb7 --- /dev/null +++ b/src/rpc/api/wire/interface.ts @@ -0,0 +1,46 @@ +import {IApi} from "../interface"; + +import { + Request, + Response, + Hello, + Goodbye, + GetStatus, + BeaconBlockRootsRequest, + BeaconBlockRootsResponse, + BeaconBlockHeaderRequest, + BeaconBlockHeaderResponse, + BeaconBlockBodiesRequest, + BeaconBlockBodiesResponse, + BeaconChainStateRequest, + BeaconChainStateResponse +} from "./messages"; + +export interface IWireProtocolApi extends IApi { + + /** + * Returns metadata about the remote node. + */ + GetStatus(): Promise; + + /** + * Returns list of block roots and slots from the peer + */ + RequestBeaconBlockRoots(request: BeaconBlockRootsRequest): Promise; + + /** + * Returns beacon block headers from peer + */ + RequestBeaconBlockHeaders(request: BeaconBlockHeadersRequest): Promise; + + /** + * Returns block bodies associated with block roots from a peer + */ + RequestBeaconBlockBodies(request: BeaconBlockBodiesRequest): Promise; + + /** + * Returns the hashes of merkle tree nodes from merkelizing the block's state root. + */ + RequestBeaconChainState(request: BeaconChainStateRequest): Promise; + +} diff --git a/src/rpc/api/wire/messages.ts b/src/rpc/api/wire/messages.ts new file mode 100644 index 000000000000..f43a5246c5fc --- /dev/null +++ b/src/rpc/api/wire/messages.ts @@ -0,0 +1,91 @@ +import {Slot, bytes32, bytes, unint8, uint16, uint64} from "../../types/primitive"; +import {BlockRootSlot, HashTreeRoot} from "./types"; + +interface RequestBody { + +} + +export interface Request { + id: uint64; + methodId: uint16; + body: RequestBody; +} + +export interface Response { + id: uint64; + responseCode: uint16; + result: Buffer; +} + +// Method ID: 0 + +export interface Hello extends RequestBody { + networkId: uint64; + chainId: uint16; + latestFinalizedRoot: bytes32; + latestFinalizedEpoch: uint64; + bestRoot: bytes32; + bestSlot: Slot; +} + +// Method ID: 1 + +export interface Goodbye extends RequestBody { + reason: uint64; +} + +// Method ID: 2 + +export interface GetStatus extends RequestBody { + sha: bytes32; + userAgent: bytes; + timestamp: uint64; +} + +// Method ID: 10 + +export interface BeaconBlockRootsRequest extends RequestBody { + startSlot: Slot; + count: uint64; +} + +export interface BeaconBlockRootsResponse { + blockRoot: bytes32; + slot: Slot; + // Doesn't currently exist as a standalone type + roots: []BlockRootSlot; +} + +// Method ID: 11 +export interface BeaconBlockHeadersRequest extends RequestBody { + // Doesn't currently exist as a standalone type + startRoot: HashTreeRoot; + startSlot: Slot; + maxHeaders: uint64; + skipSlots: uint64; +} + +export interface BeaconBlockHeadersResponse { + // Doesn't currently exist as a standalone type + headers: []BeaconBlockHeader; +} + +// Method ID: 12 +export interface BeaconBlockBodiesRequest extends RequestBody { + blockRoots: []HashTreeRoot; +} + +export interface BeaconBlockBodiesResponse { + blockBodies: []BeaconBlockBody; +} + +// Method ID: 13 +export interface BeaconChainStateRequest extends RequestBody { + hashes: []HashTreeRoot; +} + +// Method ID: 14 +// Not yet defined in the ETH2.0 Wire spec. +export interface BeaconChainStateResponse { + +} diff --git a/src/rpc/api/wire/types.ts b/src/rpc/api/wire/types.ts new file mode 100644 index 000000000000..e6a0a0131032 --- /dev/null +++ b/src/rpc/api/wire/types.ts @@ -0,0 +1,10 @@ +import {Slot} from "../../types/primitive" + +export interface BlockRootSlot { + block_root: Buffer; + slot: Slot; +} + +export interface HashTreeRoot { + hash: Buffer; +} diff --git a/src/rpc/api/wire/wire.ts b/src/rpc/api/wire/wire.ts new file mode 100644 index 000000000000..1a91a7296ecd --- /dev/null +++ b/src/rpc/api/wire/wire.ts @@ -0,0 +1,58 @@ +import {IWireProtocolApi} from "./interface"; +import {BeaconChain} from "../../../chain"; +import {DB} from "../../../db"; + +import { + Request, + Response, + Hello, + Goodbye, + BeaconBlockRootsRequest, + BeaconBlockRootsResponse, + BeaconBlockHeaderRequest, + BeaconBlockHeaderResponse, + BeaconBlockBodiesRequest, + BeaconBlockBodiesResponse, + BeaconChainStateRequest, + BeaconChainStateResponse +} from "./messages"; + +import { + BlockRootSlot, + HashTreeRoot +} from "./types"; + +export class WireProtocolApi implements IWireProtocolApi { + + public namespace: string; + + private chain: BeaconChain; + private db: DB; + + public constructor(opts, {chain, db}) { + this.namespace = 'wire'; + this.db; + this.chain; + } + + public async GetStatus(): Promise { + + } + + public async RequestBeaconBlockRoots(request: BeaconBlockRootsRequest): Promise { + + } + + public async RequestBeaconBlockHeaders(request: BeaconBlockHeadersRequest): Promise { + + } + + public async RequestBeaconBlockBodies(request: BeaconBlockBodiesRequest): Promise { + + } + + public async RequestBeaconChainState(request: BeaconChainStateRequest): Promise { + + } + +} diff --git a/src/rpc/protocol/packet.proto b/src/rpc/protocol/packet.proto new file mode 100644 index 000000000000..db847c3d92e0 --- /dev/null +++ b/src/rpc/protocol/packet.proto @@ -0,0 +1,8 @@ +syntax = "proto3"; + +message Packet { + string key = 1; + string method = 2; + string type = 3; + string dataType = 4; +} diff --git a/src/rpc/protocol/wire.proto b/src/rpc/protocol/wire.proto new file mode 100644 index 000000000000..9da5d407e89f --- /dev/null +++ b/src/rpc/protocol/wire.proto @@ -0,0 +1,87 @@ +syntax = "proto3"; + +service Eth2WireProtocol { + rpc Hello (HelloRequest) returns (HelloResponse) {} + rpc Goodbye (GoodbyeRequest) returns (GoodbyeResponse) {} + rpc GetStatus (GetStatusRequest) returns (GetStatusResponse) {} + rpc BeaconBlockRoots (BeaconBlockRootsRequest) returns (BeaconBlockRootsResponse) {} + rpc BeaconBlockHeaders (BeaconBlockHeadersRequest) returns (BeaconBlockResponse) {} + rpc BeaconBlockBodies (BeaconBlockBodiesRequest) returns (BeaconBlockBodiesResponse) {} + rpc BeaconChainState (BeaconChainStateRequest) returns (BeaconChainStateResponse) {} +} + +message HelloRequest { + uint32 networkId = 1; + uint64 chainId = 2; + bytes latestFinalizedRoot = 3; + uint64 latestFinalizedEpoch = 4; + bytes bestRoot = 5; + uint64 bestSlot = 6; +} + +message HelloResponse { + uint32 networkId = 1; + uint64 chainId = 2; + bytes latestFinalizedRoot = 3; + uint64 latestFinalizedEpoch = 4; + bytes bestRoot = 5; + uint64 bestSlot = 6; +} + +message GoodbyeRequest { + uint64 reason = 1; +} + +message GoodbyeResponse { + uint64 reason = 1; +} + +message GetStatusRequest { + bytes sha = 1; + bytes userAgent = 2; + uint64 timestamp = 3; +} + +message GetStatusResponse { + bytes sha = 1; + bytes userAgent = 2; + uint64 timestamp = 3; +} + +message BeaconBlockRootsRequest { + uint64 startSlot = 1; + uint64 count = 2; +} + +message BeaconBlockRootsResponse { + bytes blockRoot = 1; + uint64 slot = 2; + repeated bytes roots = 3; +} + +message BeaconBlockHeadersRequest { + bytes startRoot = 1; + uint64 startSlot = 2; + uint64 maxHeaders = 3; + uint64 skipSlots = 4; +} + +message BeaconBlockHeadersResponse { + repeated bytes headers = 1; +} + +message BeaconBlockBodiesRequest { + repeated bytes blockRoots = 1; +} + +message BeaconBlockBodiesResponse { + repeated bytes blockBodies = 1; +} + +message BeaconChainStateRequest { + repeated bytes hashes = 1; +} + +message BeaconChainStateResponse { + +}