Skip to content

Commit

Permalink
bottle in sea network service
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-babichenko committed Apr 2, 2020
1 parent 1a04850 commit 6ac0462
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 2 deletions.
10 changes: 10 additions & 0 deletions chain-network/proto/node.proto
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ message Gossip {
repeated bytes nodes = 2;
}

// Gossip message with encrypred information on nodes in the network (bottle in
// the sea). These messages are forwarded until they find their target peers.
message BottleInSea {
bytes node = 1;
}

// Element of the subscription stream returned by BlockSubscription.
message BlockEvent {
oneof item {
Expand Down Expand Up @@ -174,4 +180,8 @@ service Node {
// Establishes a bidirectional stream to exchange information on new
// network peers.
rpc GossipSubscription(stream Gossip) returns (stream Gossip);

// Establishes a bidirectional stream to forward "bottle in the sea" gossip
// messages.
rpc BottleInSeaSubscription(stream BottleInSea) returns (stream BottleInSea);
}
24 changes: 24 additions & 0 deletions chain-network/src/core/server/bottle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use super::PushStream;
use crate::data::BottleInSea;
use crate::data::Peer;
use crate::error::Error;
use async_trait::async_trait;
use futures::stream::Stream;

/// Interface for the blockchain node to exchange "bottle in the sea" messages.
#[async_trait]
pub trait BottleInSeaService {
/// The type of outbound asynchronous streams returned by the
/// `subscription` method.
type SubscriptionStream: Stream<Item = Result<BottleInSea, Error>> + Send + Sync;

/// Called by the protocol implementation to establish a
/// bidirectional subscription stream.
/// The inbound stream is passed to the asynchronous method,
/// which resolves to the outbound stream.
async fn bottle_in_sea_subscription(
&self,
subscriber: Peer,
stream: PushStream<BottleInSea>,
) -> Result<Self::SubscriptionStream, Error>;
}
2 changes: 2 additions & 0 deletions chain-network/src/core/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
mod block;
mod bottle;
mod fragment;
mod gossip;
mod node;
mod push;

pub use block::BlockService;
pub use bottle::BottleInSeaService;
pub use fragment::FragmentService;
pub use gossip::GossipService;

Expand Down
9 changes: 8 additions & 1 deletion chain-network/src/core/server/node.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::{BlockService, FragmentService, GossipService};
use super::{BlockService, BottleInSeaService, FragmentService, GossipService};

/// Interface to application logic of the blockchain node server.
///
Expand All @@ -15,6 +15,9 @@ pub trait Node {
/// The implementation of the gossip service.
type GossipService: GossipService + Send + Sync;

/// The implementation of the "bottle in the sea" gossip service.
type BottleInSeaService: BottleInSeaService + Send + Sync;

/// Instantiates the block service,
/// if supported by this node.
fn block_service(&self) -> Option<&Self::BlockService>;
Expand All @@ -26,4 +29,8 @@ pub trait Node {
/// Instantiates the gossip service,
/// if supported by this node.
fn gossip_service(&self) -> Option<&Self::GossipService>;

/// Instantiates the "bottle in the sea" service,
/// if supported by this node.
fn bottle_in_sea_service(&self) -> Option<&Self::BottleInSeaService>;
}
34 changes: 34 additions & 0 deletions chain-network/src/data/bottle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/// A "bottle in the sea" raw message.
#[derive(Clone)]
pub struct BottleInSea(Box<[u8]>);

impl BottleInSea {
#[inline]
pub fn from_bytes<B: Into<Box<[u8]>>>(bytes: B) -> Self {
Self(bytes.into())
}

#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.0
}

#[inline]
pub fn into_bytes(self) -> Vec<u8> {
self.0.into()
}
}

impl AsRef<[u8]> for BottleInSea {
#[inline]
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}

impl From<BottleInSea> for Vec<u8> {
#[inline]
fn from(block: BottleInSea) -> Self {
block.into_bytes()
}
}
2 changes: 2 additions & 0 deletions chain-network/src/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
pub mod block;
pub mod bottle;
pub mod fragment;
pub mod gossip;
pub mod p2p;

pub use block::{Block, BlockEvent, BlockId, BlockIds, Header};
pub use bottle::BottleInSea;
pub use fragment::{Fragment, FragmentId, FragmentIds};
pub use gossip::Gossip;
pub use p2p::{Peer, Peers};
17 changes: 17 additions & 0 deletions chain-network/src/grpc/convert.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::proto;
use crate::data::{
block::{self, Block, BlockEvent, BlockId, ChainPullRequest, Header},
bottle::BottleInSea,
fragment::Fragment,
gossip::{Gossip, Node},
p2p::Peer,
Expand Down Expand Up @@ -201,6 +202,22 @@ impl IntoProtobuf for Gossip {
}
}

impl FromProtobuf<proto::BottleInSea> for BottleInSea {
fn from_message(message: proto::BottleInSea) -> Result<Self, Error> {
Ok(BottleInSea::from_bytes(message.node))
}
}

impl IntoProtobuf for BottleInSea {
type Message = proto::BottleInSea;

fn into_message(self) -> proto::BottleInSea {
proto::BottleInSea {
node: self.into_bytes(),
}
}
}

impl FromProtobuf<proto::Peer> for Peer {
fn from_message(message: proto::Peer) -> Result<Self, Error> {
use proto::peer;
Expand Down
25 changes: 24 additions & 1 deletion chain-network/src/grpc/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::convert;
use super::proto;
use super::streaming::{InboundStream, OutboundTryStream};
use crate::core::server::{BlockService, FragmentService, GossipService, Node};
use crate::core::server::{BlockService, BottleInSeaService, FragmentService, GossipService, Node};
use crate::data::{block, fragment, BlockId};
use crate::PROTOCOL_VERSION;
use tonic::{Code, Status};
Expand Down Expand Up @@ -35,6 +35,12 @@ where
.gossip_service()
.ok_or_else(|| Status::new(Code::Unimplemented, "not implemented"))
}

fn bottle_in_sea_service(&self) -> Result<&T::BottleInSeaService, Status> {
self.inner
.bottle_in_sea_service()
.ok_or_else(|| Status::new(Code::Unimplemented, "not implemented"))
}
}

#[tonic::async_trait]
Expand Down Expand Up @@ -213,4 +219,21 @@ where
let res = OutboundTryStream::new(outbound);
Ok(tonic::Response::new(res))
}

type BottleInSeaSubscriptionStream =
OutboundTryStream<<T::BottleInSeaService as BottleInSeaService>::SubscriptionStream>;

async fn bottle_in_sea_subscription(
&self,
req: tonic::Request<tonic::Streaming<proto::BottleInSea>>,
) -> Result<tonic::Response<Self::BottleInSeaSubscriptionStream>, tonic::Status> {
let service = self.bottle_in_sea_service()?;
let peer = convert::decode_peer(req.metadata())?;
let inbound = InboundStream::new(req.into_inner());
let outbound = service
.bottle_in_sea_subscription(peer, Box::pin(inbound))
.await?;
let res = OutboundTryStream::new(outbound);
Ok(tonic::Response::new(res))
}
}

0 comments on commit 6ac0462

Please sign in to comment.