Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Put back the relay #585

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bd69c01
Add ProtocolsHandler trait
tomaka Oct 18, 2018
d283c01
Reexport symbols
tomaka Oct 18, 2018
20dae45
Add a note about shutting down
tomaka Oct 18, 2018
27fdfd0
Add some comments to the relay transport
tomaka Oct 24, 2018
462b0ce
Add the source peer id when we are a destination
tomaka Oct 24, 2018
6280294
Add small TODO
tomaka Oct 24, 2018
0310f0f
Remove the auto relay addresses system
tomaka Oct 24, 2018
48ba073
Rename Output to RelayOutput
tomaka Oct 24, 2018
ac46279
Add RelayHandler
tomaka Oct 24, 2018
4c5b51b
Merge remote-tracking branch 'upstream/master' into protocols-handler
tomaka Oct 24, 2018
4b016e5
Merge branch 'protocols-handler' into relay-handler
tomaka Oct 24, 2018
ccdf701
Properly shut down
tomaka Oct 24, 2018
318369c
Add Send to the boxed futures
tomaka Oct 24, 2018
d77f092
Add map_protocol
tomaka Oct 24, 2018
fd3630f
Merge branch 'protocols-handler' into relay-handler
tomaka Oct 24, 2018
abd11be
Remove Transport from RelayConfig
tomaka Oct 26, 2018
11cbc39
Remove RelayTransport
tomaka Oct 28, 2018
ef9200c
Add a destination request struct
tomaka Oct 28, 2018
2f3b4a7
Add WaitRequest
tomaka Oct 28, 2018
959912d
Rename WaitRequest to ProxyRequest
tomaka Oct 28, 2018
9fbbefb
Update the handler as well
tomaka Oct 30, 2018
52a37b0
Merge remote-tracking branch 'upstream/master' into relay-handler
tomaka Nov 5, 2018
a80ef06
Merge remote-tracking branch 'upstream/master' into relay-handler
tomaka Nov 28, 2018
d4f1a1c
Make protocol compile
tomaka Nov 28, 2018
ebcd209
Minor work
tomaka Nov 28, 2018
28dddc1
More work
tomaka Nov 29, 2018
7326ca7
More work
tomaka Dec 4, 2018
c317d49
Merge remote-tracking branch 'upstream/master' into relay-handler
tomaka Jan 17, 2019
e78bf15
Merge remote-tracking branch 'upstream/master' into relay-handler
tomaka Jan 19, 2019
abe8ac7
More work towards compilation
tomaka Jan 19, 2019
7f3018a
Merge remote-tracking branch 'upstream/master' into relay-handler
tomaka Mar 10, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ members = [
"protocols/secio",
"transports/dns",
"transports/ratelimit",
"transports/relay",
"transports/tcp",
"transports/uds",
"transports/websocket"
Expand Down
73 changes: 73 additions & 0 deletions protocols/identify/src/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::message::CircuitRelay;
use bytes::BytesMut;
use protobuf::{self, Message};
use std::{fmt, error::Error};
use tokio_codec::{Encoder, Decoder};
use unsigned_varint::codec::UviBytes;

/// Encoder and decoder for protocol messages.
//#[derive(Debug)] // TODO:
pub struct Codec {
inner: UviBytes<Vec<u8>>,
}

impl Codec {
/// Creates a `Codec`.
#[inline]
pub fn new() -> Self {
Codec {
inner: UviBytes::default(),
}
}
}

impl fmt::Debug for Codec {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
// TODO:
write!(f, "Codec")
}
}

impl Encoder for Codec {
type Item = CircuitRelay;
type Error = Box<Error>;

fn encode(&mut self, msg: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
let pkg = msg.write_to_bytes()?;
self.inner.encode(pkg, dst)?;
Ok(())
}
}

impl Decoder for Codec {
type Item = CircuitRelay;
type Error = Box<Error>;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if let Some(p) = self.inner.decode(src)? {
Ok(Some(protobuf::parse_from_bytes(&p)?))
} else {
Ok(None)
}
}
}
276 changes: 276 additions & 0 deletions protocols/identify/src/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::prelude::*;
use libp2p_core::protocols_handler::{ProtocolsHandler, ProtocolsHandlerEvent};
use libp2p_core::{Multiaddr, PeerId, either, upgrade};
use protocol;
use std::{error, io, marker::PhantomData};
use tokio_io::{AsyncRead, AsyncWrite};

/// Protocol handler that identifies the remote at a regular period.
pub struct RelayHandler<TSubstream, TDestSubstream, TSrcSubstream> {
/// True if wanting to shut down.
shutdown: bool,

/// List of futures that must be processed. Contains notably the futures that relay data to
/// a destination, or the futures that send a negative response back.
active_futures: Vec<Box<Future<Item = (), Error = io::Error> + Send>>,

/// List of peers we should ask the remote to relay to.
relay_requests: Vec<(PeerId, Vec<Multiaddr>)>,

/// Queue of events to return when polled.
queued_events: Vec<RelayHandlerEvent<TSubstream>>,

/// Phantom data.
marker: PhantomData<(TDestSubstream, TSrcSubstream)>
}

/// Event produced by the relay handler.
//#[derive(Debug)] // TODO: restore
pub enum RelayHandlerEvent<TSubstream> {
/// The remote wants us to relay communications to a third party.
HopRequest(RelayHandlerHopRequest<TSubstream>),

/// The remote is a relay and is relaying a connection to us. In other words, we are used as
/// destination.
DestinationRequest(RelayHandlerDestRequest<TSubstream>),
}

/// Event that can be sent to the relay handler.
//#[derive(Debug)] // TODO: restore
pub enum RelayHandlerIn<TSubstream, TDestSubstream, TSrcSubstream> {
/// Accept a hop request from the remote.
AcceptHopRequest {
/// The request that was produced by the handler earlier.
request: RelayHandlerHopRequest<TSubstream>,
/// The substream to the destination.
dest_substream: TDestSubstream,
},

/// Denies a hop request from the remote.
DenyHopRequest(RelayHandlerHopRequest<TSubstream>),

/// Denies a destination request from the remote.
DenyDestinationRequest(RelayHandlerDestRequest<TSubstream>),

/// Opens a new substream to the remote and asks it to relay communications to a third party.
RelayRequest {
/// Id of the peer to connect to.
target: PeerId,
/// Addresses known for this peer.
addresses: Vec<Multiaddr>,
},

/// Asks the node to be used as a destination for a relayed connection.
DestinationRequest {
/// Peer id of the node whose communications are being relayed.
source: PeerId,
/// Addresses of the node whose communications are being relayed.
source_addresses: Vec<Multiaddr>,
/// Substream to the source.
substream: TSrcSubstream,
},
}

/// The remote wants us to be treated as a destination.
pub struct RelayHandlerHopRequest<TSubstream> {
inner: protocol::RelayHopRequest<TSubstream>,
}

impl<TSubstream> RelayHandlerHopRequest<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + 'static
{
/// Peer id of the target that we should relay to.
#[inline]
pub fn target_id(&self) -> &PeerId {
self.inner.target_id()
}

// TODO: addresses
}

/// The remote wants us to be treated as a destination.
pub struct RelayHandlerDestRequest<TSubstream> {
inner: protocol::RelayDestinationRequest<TSubstream>,
}

impl<TSubstream> RelayHandlerDestRequest<TSubstream>
where TSubstream: AsyncRead + AsyncWrite + 'static
{
/// Peer id of the source that is being relayed.
#[inline]
pub fn source_id(&self) -> &PeerId {
self.inner.source_id()
}

/// Addresses of the source that is being relayed.
#[inline]
pub fn source_addresses(&self) -> &PeerId {
self.inner.source_addresses()
}

/// Accepts the request. Produces a `Future` that sends back a success message then provides
/// the stream to the source.
#[inline]
pub fn accept(self) -> impl Future<Item = TSubstream, Error = Box<dyn error::Error + 'static>> {
self.inner.accept()
}
}

impl<TSubstream, TDestSubstream, TSrcSubstream> RelayHandler<TSubstream, TDestSubstream, TSrcSubstream> {
/// Builds a new `RelayHandler`.
#[inline]
pub fn new() -> Self {
RelayHandler {
shutdown: false,
active_futures: Vec::new(),
relay_requests: Vec::new(),
queued_events: Vec::new(),
marker: PhantomData,
}
}
}

impl<TSubstream, TDestSubstream, TSrcSubstream> ProtocolsHandler for RelayHandler<TSubstream, TDestSubstream, TSrcSubstream>
where
TSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, // TODO: remove useless bounds
TDestSubstream: AsyncRead + AsyncWrite + Send + Sync + 'static, // TODO: remove useless bounds
{
type InEvent = RelayHandlerIn<TSubstream, TDestSubstream, TSrcSubstream>;
type OutEvent = RelayHandlerEvent<TSubstream>;
type Substream = TSubstream;
type InboundProtocol = protocol::RelayListen;
type OutboundProtocol = upgrade::EitherUpgrade<protocol::RelayProxyRequest, protocol::RelayTargetOpen>;
type OutboundOpenInfo = ();

#[inline]
fn listen_protocol(&self) -> Self::InboundProtocol {
protocol::RelayListen::new()
}

fn inject_fully_negotiated_inbound(
&mut self,
protocol: <Self::InboundProtocol as upgrade::InboundUpgrade<TSubstream>>::Output,
) {
match protocol {
// We have been asked to become a destination.
protocol::RelayRemoteRequest::DestinationRequest(dest_request) => {

},
// We have been asked to act as a proxy.
protocol::RelayRemoteRequest::HopRequest(hop_request) => {

},
}
}

fn inject_fully_negotiated_outbound(
&mut self,
protocol: <Self::OutboundProtocol as upgrade::OutboundUpgrade<TSubstream>>::Output,
_: Self::OutboundOpenInfo,
) {
match protocol {
either::EitherOutput::First(proxy_request) => {

},
either::EitherOutput::Second(target_request) => {

},
}
}

#[inline]
fn inject_event(&mut self, event: Self::InEvent) {
match event {
RelayHandlerIn::AcceptHopRequest { request, dest_substream } => {
let fut = request.inner.fulfill(dest_substream);
self.active_futures.push(Box::new(fut));
},
RelayHandlerIn::DenyHopRequest(rq) => {
let fut = rq.inner.deny();
self.active_futures.push(Box::new(fut));
},
RelayHandlerIn::RelayRequest { target, addresses } => {
self.relay_requests.push((target.clone(), addresses.clone()));
},
}
}

#[inline]
fn inject_inbound_closed(&mut self) {}

#[inline]
fn inject_dial_upgrade_error(&mut self, _: Self::OutboundOpenInfo, _: io::Error) {
// TODO: report?
}

#[inline]
fn shutdown(&mut self) {
self.shutdown = true;
}

fn poll(
&mut self,
) -> Poll<
Option<
ProtocolsHandlerEvent<
Self::OutboundProtocol,
Self::OutboundOpenInfo,
RelayHandlerEvent<TSubstream>,
>,
>,
io::Error,
> {
// Open substreams if necessary.
if !self.relay_requests.is_empty() {
let rq = self.relay_requests.remove(0);
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::OutboundSubstreamRequest {
info: rq,
upgrade: self.config.clone(),
})));
}

// Report the queued events.
if !self.queued_events.is_empty() {
let event = self.queued_events.remove(0);
return Ok(Async::Ready(Some(ProtocolsHandlerEvent::Custom(event))));
}

// We remove each element from `active_futures` one by one and add them back.
for n in (0..self.active_futures.len()).rev() {
let mut future = self.active_futures.swap_remove(n);
match future.poll() {
// Don't add back the future if it's finished or errors.
Ok(Async::Ready(())) | Err(_) => {},
Ok(Async::NotReady) => self.active_futures.push(future),
}
}

// Shut down process.
if self.shutdown {
self.active_futures.clear(); // TODO: not a proper shutdown
return Ok(Async::Ready(None));
}

Ok(Async::NotReady)
}
}
Loading