From 4d263a8421591737c391b651b8e082c52fb6f713 Mon Sep 17 00:00:00 2001 From: driftluo Date: Wed, 27 Nov 2024 20:21:31 +0800 Subject: [PATCH] feat: enable tcp base protocol listen on same port --- tentacle/Cargo.toml | 3 +- tentacle/examples/simple.rs | 2 +- tentacle/src/lock/mod.rs | 3 +- tentacle/src/service.rs | 12 +- tentacle/src/transports/browser.rs | 11 +- tentacle/src/transports/memory.rs | 9 +- tentacle/src/transports/mod.rs | 158 ++++--- tentacle/src/transports/tcp.rs | 88 ++-- tentacle/src/transports/tcp_base_listen.rs | 488 +++++++++++++++++++++ tentacle/src/transports/tls.rs | 187 +------- tentacle/src/transports/ws.rs | 147 +------ tentacle/src/utils/dns.rs | 11 +- 12 files changed, 677 insertions(+), 442 deletions(-) create mode 100644 tentacle/src/transports/tcp_base_listen.rs diff --git a/tentacle/Cargo.toml b/tentacle/Cargo.toml index 2933c63e..83db3dfd 100644 --- a/tentacle/Cargo.toml +++ b/tentacle/Cargo.toml @@ -31,6 +31,7 @@ nohash-hasher = "0.2" parking_lot = { version = "0.12", optional = true } tokio-tungstenite = { version = "0.24", optional = true } +httparse = { version = "1.9", optional = true } futures-timer = { version = "3.0.2", optional = true } async-std = { version = "1", features = ["unstable"], optional = true } async-io = { version = "1", optional = true } @@ -76,7 +77,7 @@ nix = { version = "0.29", default-features = false, features = ["signal"] } [features] default = ["tokio-runtime", "tokio-timer"] -ws = ["tokio-tungstenite"] +ws = ["tokio-tungstenite", "httparse"] tls = ["tokio-rustls"] upnp = ["igd"] secio-async-trait = ["secio/async-trait"] diff --git a/tentacle/examples/simple.rs b/tentacle/examples/simple.rs index c526d727..81a50d8a 100644 --- a/tentacle/examples/simple.rs +++ b/tentacle/examples/simple.rs @@ -217,7 +217,7 @@ fn server() { .unwrap(); #[cfg(feature = "ws")] service - .listen("/ip4/127.0.0.1/tcp/1338/ws".parse().unwrap()) + .listen("/ip4/127.0.0.1/tcp/1337/ws".parse().unwrap()) .await .unwrap(); service.run().await diff --git a/tentacle/src/lock/mod.rs b/tentacle/src/lock/mod.rs index 5ca0a318..bbe17b30 100644 --- a/tentacle/src/lock/mod.rs +++ b/tentacle/src/lock/mod.rs @@ -1,10 +1,9 @@ -#![allow(dead_code)] +#![allow(dead_code, unused_imports)] #[cfg(feature = "parking_lot")] pub use parking_lot::{const_fair_mutex, const_mutex, const_rwlock, FairMutex, Mutex, RwLock}; #[cfg(not(feature = "parking_lot"))] pub mod native; -#[allow(unused_imports)] #[cfg(not(feature = "parking_lot"))] pub use native::{Mutex, RwLock}; diff --git a/tentacle/src/service.rs b/tentacle/src/service.rs index 20f5b3d6..6eef01d1 100644 --- a/tentacle/src/service.rs +++ b/tentacle/src/service.rs @@ -32,7 +32,7 @@ use crate::{ }, session::{Session, SessionEvent, SessionMeta}, traits::ServiceHandle, - transports::{MultiIncoming, MultiTransport, Transport}, + transports::{MultiIncoming, MultiTransport, TransportDial, TransportListen}, utils::extract_peer_id, ProtocolId, SessionId, }; @@ -228,7 +228,9 @@ where } inner.listens.insert(listen_address.clone()); - inner.spawn_listener(incoming, listen_address); + if !matches!(incoming, MultiIncoming::TcpUpgrade) { + inner.spawn_listener(incoming, listen_address); + } Ok(addr) } @@ -1017,7 +1019,7 @@ where if let Some(ref mut client) = self.igd_client { client.remove(&address); } - + self.try_update_listens().await; let _ignore = self .handle_sender .send(ServiceEvent::ListenClose { address }.into()) @@ -1075,7 +1077,9 @@ where if let Some(client) = self.igd_client.as_mut() { client.register(&listen_address) } - self.spawn_listener(incoming, listen_address); + if !matches!(incoming, MultiIncoming::TcpUpgrade) { + self.spawn_listener(incoming, listen_address); + } } SessionEvent::ProtocolHandleError { error, proto_id } => { let _ignore = self diff --git a/tentacle/src/transports/browser.rs b/tentacle/src/transports/browser.rs index d2db9a47..46f0ba2c 100644 --- a/tentacle/src/transports/browser.rs +++ b/tentacle/src/transports/browser.rs @@ -36,7 +36,9 @@ use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::{ error::TransportErrorKind, multiaddr::{Multiaddr, Protocol}, - transports::{find_type, Result, Transport, TransportFuture, TransportType}, + transports::{ + find_type, Result, TransportDial, TransportFuture, TransportListen, TransportType, + }, utils::multiaddr_to_socketaddr, }; use futures::FutureExt; @@ -113,13 +115,16 @@ impl BrowserTransport { pub type BrowserDialFuture = TransportFuture> + Send>>>; -impl Transport for BrowserTransport { +impl TransportListen for BrowserTransport { type ListenFuture = (); - type DialFuture = BrowserDialFuture; fn listen(self, address: Multiaddr) -> Result { Err(TransportErrorKind::NotSupported(address)) } +} + +impl TransportDial for BrowserTransport { + type DialFuture = BrowserDialFuture; fn dial(self, address: Multiaddr) -> Result { if !matches!(find_type(&address), TransportType::Ws) { diff --git a/tentacle/src/transports/memory.rs b/tentacle/src/transports/memory.rs index b339d778..9f0fec23 100644 --- a/tentacle/src/transports/memory.rs +++ b/tentacle/src/transports/memory.rs @@ -2,7 +2,7 @@ use crate::{ error::TransportErrorKind, lock::Mutex, multiaddr::{Multiaddr, Protocol}, - transports::{Result, Transport, TransportFuture}, + transports::{Result, TransportDial, TransportFuture, TransportListen}, }; use bytes::Bytes; @@ -120,15 +120,16 @@ pub type MemoryListenFuture = pub type MemoryDialFuture = TransportFuture> + Send>>>; -impl Transport for MemoryTransport { +impl TransportListen for MemoryTransport { type ListenFuture = MemoryListenFuture; - type DialFuture = MemoryDialFuture; fn listen(self, address: Multiaddr) -> Result { let task = bind(address); Ok(TransportFuture::new(Box::pin(task))) } - +} +impl TransportDial for MemoryTransport { + type DialFuture = MemoryDialFuture; fn dial(self, address: Multiaddr) -> Result { let task = connect(address); Ok(TransportFuture::new(Box::pin(task))) diff --git a/tentacle/src/transports/mod.rs b/tentacle/src/transports/mod.rs index c70f4e78..2f47d5ad 100644 --- a/tentacle/src/transports/mod.rs +++ b/tentacle/src/transports/mod.rs @@ -16,6 +16,8 @@ mod browser; mod memory; #[cfg(not(target_family = "wasm"))] mod tcp; +#[cfg(not(target_family = "wasm"))] +mod tcp_base_listen; #[cfg(all(feature = "tls", not(target_family = "wasm")))] mod tls; #[cfg(all(feature = "ws", not(target_family = "wasm")))] @@ -28,13 +30,18 @@ pub use os::*; type Result = std::result::Result; -/// Definition of transport protocol behavior -pub trait Transport { +/// Definition of transport listen protocol behavior +pub trait TransportListen { type ListenFuture; - type DialFuture; /// Transport listen fn listen(self, address: Multiaddr) -> Result; +} + +/// Definition of transport dial protocol behavior +pub trait TransportDial { + type DialFuture; + /// Transport dial fn dial(self, address: Multiaddr) -> Result; } @@ -93,6 +100,18 @@ pub fn find_type(addr: &Multiaddr) -> TransportType { .unwrap_or(TransportType::Tcp) } +pub(crate) fn parse_tls_domain_name(addr: &Multiaddr) -> Option { + let mut iter = addr.iter(); + + iter.find_map(|proto| { + if let Protocol::Tls(s) = proto { + Some(s.to_string()) + } else { + None + } + }) +} + #[cfg(not(target_family = "wasm"))] mod os { use super::*; @@ -100,17 +119,17 @@ mod os { use crate::{ runtime::{TcpListener, TcpStream}, service::config::TcpConfig, - utils::socketaddr_to_multiaddr, }; use futures::{prelude::Stream, FutureExt, StreamExt}; - use log::debug; use std::{ + collections::HashMap, fmt, future::Future, io, net::SocketAddr, pin::Pin, + sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -120,19 +139,31 @@ mod os { MemoryDialFuture, MemoryListenFuture, MemoryListener, MemorySocket, MemoryTransport, }; use self::tcp::{TcpDialFuture, TcpListenFuture, TcpTransport}; - #[cfg(feature = "tls")] - use self::tls::{TlsDialFuture, TlsListenFuture, TlsListener, TlsStream, TlsTransport}; + use self::tcp_base_listen::{TcpBaseListener, TcpBaseListenerEnum, UpgradeMode}; #[cfg(feature = "ws")] - use self::ws::{WebsocketListener, WsDialFuture, WsListenFuture, WsStream, WsTransport}; + use self::ws::{WsDialFuture, WsStream, WsTransport}; #[cfg(feature = "tls")] - use crate::service::config::TlsConfig; + use { + self::tls::{TlsDialFuture, TlsStream, TlsTransport}, + crate::service::config::TlsConfig, + }; + + #[derive(Debug, Clone, Copy)] + pub(crate) enum TcpListenMode { + Tcp, + #[cfg(feature = "tls")] + TLS, + #[cfg(feature = "ws")] + Ws, + } #[derive(Clone)] pub(crate) struct MultiTransport { - timeout: Duration, - tcp_config: TcpConfig, + pub(crate) timeout: Duration, + pub(crate) tcp_config: TcpConfig, + pub(crate) listens_upgrade_modes: Arc>>, #[cfg(feature = "tls")] - tls_config: Option, + pub(crate) tls_config: Option, } impl MultiTransport { @@ -140,6 +171,7 @@ mod os { MultiTransport { timeout, tcp_config, + listens_upgrade_modes: Arc::new(crate::lock::Mutex::new(Default::default())), #[cfg(feature = "tls")] tls_config: None, } @@ -152,22 +184,25 @@ mod os { } } - impl Transport for MultiTransport { + impl TransportListen for MultiTransport { type ListenFuture = MultiListenFuture; - type DialFuture = MultiDialFuture; fn listen(self, address: Multiaddr) -> Result { match find_type(&address) { TransportType::Tcp => { - match TcpTransport::new(self.timeout, self.tcp_config.tcp).listen(address) { + match TcpTransport::from_multi_transport(self, TcpListenMode::Tcp) + .listen(address) + { Ok(future) => Ok(MultiListenFuture::Tcp(future)), Err(e) => Err(e), } } #[cfg(feature = "ws")] TransportType::Ws => { - match WsTransport::new(self.timeout, self.tcp_config.ws).listen(address) { - Ok(future) => Ok(MultiListenFuture::Ws(future)), + match TcpTransport::from_multi_transport(self, TcpListenMode::Ws) + .listen(address) + { + Ok(future) => Ok(MultiListenFuture::Tcp(future)), Err(e) => Err(e), } } @@ -180,18 +215,26 @@ mod os { TransportType::Wss => Err(TransportErrorKind::NotSupported(address)), #[cfg(feature = "tls")] TransportType::Tls => { - let tls_config = self.tls_config.ok_or_else(|| { - TransportErrorKind::TlsError("tls config is not set".to_string()) - })?; - TlsTransport::new(self.timeout, tls_config, self.tcp_config.tls) + if self.tls_config.is_none() { + return Err(TransportErrorKind::TlsError( + "tls config is not set".to_string(), + )); + } + match TcpTransport::from_multi_transport(self, TcpListenMode::TLS) .listen(address) - .map(MultiListenFuture::Tls) + { + Ok(future) => Ok(MultiListenFuture::Tcp(future)), + Err(e) => Err(e), + } } #[cfg(not(feature = "tls"))] TransportType::Tls => Err(TransportErrorKind::NotSupported(address)), } } + } + impl TransportDial for MultiTransport { + type DialFuture = MultiDialFuture; fn dial(self, address: Multiaddr) -> Result { match find_type(&address) { TransportType::Tcp => { @@ -232,10 +275,6 @@ mod os { pub enum MultiListenFuture { Tcp(TcpListenFuture), Memory(MemoryListenFuture), - #[cfg(feature = "ws")] - Ws(WsListenFuture), - #[cfg(feature = "tls")] - Tls(TlsListenFuture), } impl Future for MultiListenFuture { @@ -243,24 +282,17 @@ mod os { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match self.get_mut() { - MultiListenFuture::Tcp(inner) => Pin::new( - &mut inner.map(|res| res.map(|res| (res.0, MultiIncoming::Tcp(res.1)))), - ) + MultiListenFuture::Tcp(inner) => Pin::new(&mut inner.map(|res| { + res.map(|res| match res.1 { + TcpBaseListenerEnum::New(i) => (res.0, MultiIncoming::Tcp(i)), + TcpBaseListenerEnum::Upgrade => (res.0, MultiIncoming::TcpUpgrade), + }) + })) .poll(cx), MultiListenFuture::Memory(inner) => Pin::new( &mut inner.map(|res| res.map(|res| (res.0, MultiIncoming::Memory(res.1)))), ) .poll(cx), - #[cfg(feature = "ws")] - MultiListenFuture::Ws(inner) => { - Pin::new(&mut inner.map(|res| res.map(|res| (res.0, MultiIncoming::Ws(res.1))))) - .poll(cx) - } - #[cfg(feature = "tls")] - MultiListenFuture::Tls(inner) => Pin::new( - &mut inner.map(|res| res.map(|res| (res.0, MultiIncoming::Tls(res.1)))), - ) - .poll(cx), } } } @@ -381,12 +413,9 @@ mod os { } pub enum MultiIncoming { - Tcp(TcpListener), + TcpUpgrade, + Tcp(TcpBaseListener), Memory(MemoryListener), - #[cfg(feature = "ws")] - Ws(WebsocketListener), - #[cfg(feature = "tls")] - Tls(TlsListener), } impl Stream for MultiIncoming { @@ -394,46 +423,15 @@ mod os { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { match self.get_mut() { - MultiIncoming::Tcp(inner) => { - loop { - match inner.poll_accept(cx)? { - // Why can't get the peer address of the connected stream ? - // Error will be "Transport endpoint is not connected", - // so why incoming will appear unconnected stream ? - Poll::Ready((stream, _)) => match stream.peer_addr() { - Ok(remote_address) => { - break Poll::Ready(Some(Ok(( - socketaddr_to_multiaddr(remote_address), - MultiStream::Tcp(stream), - )))) - } - Err(err) => { - debug!("stream get peer address error: {:?}", err); - } - }, - Poll::Pending => break Poll::Pending, - } - } - } - MultiIncoming::Memory(inner) => match inner.poll_next_unpin(cx)? { - Poll::Ready(Some((addr, stream))) => { - Poll::Ready(Some(Ok((addr, MultiStream::Memory(stream))))) - } + MultiIncoming::Tcp(inner) => match inner.poll_next_unpin(cx)? { + Poll::Ready(Some((addr, stream))) => Poll::Ready(Some(Ok((addr, stream)))), Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, }, - #[cfg(feature = "ws")] - MultiIncoming::Ws(inner) => match inner.poll_next_unpin(cx)? { - Poll::Ready(Some((addr, stream))) => { - Poll::Ready(Some(Ok((addr, MultiStream::Ws(Box::new(stream)))))) - } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - }, - #[cfg(feature = "tls")] - MultiIncoming::Tls(inner) => match inner.poll_next_unpin(cx)? { + MultiIncoming::TcpUpgrade => unreachable!(), + MultiIncoming::Memory(inner) => match inner.poll_next_unpin(cx)? { Poll::Ready(Some((addr, stream))) => { - Poll::Ready(Some(Ok((addr, MultiStream::Tls(stream))))) + Poll::Ready(Some(Ok((addr, MultiStream::Memory(stream))))) } Poll::Ready(None) => Poll::Ready(None), Poll::Pending => Poll::Pending, diff --git a/tentacle/src/transports/tcp.rs b/tentacle/src/transports/tcp.rs index 605c12a9..31b46949 100644 --- a/tentacle/src/transports/tcp.rs +++ b/tentacle/src/transports/tcp.rs @@ -1,34 +1,22 @@ -use super::Result; use futures::{future::ok, TryFutureExt}; -use std::{future::Future, pin::Pin, time::Duration}; +use std::{ + collections::HashMap, future::Future, net::SocketAddr, pin::Pin, sync::Arc, time::Duration, +}; +#[cfg(feature = "tls")] +use crate::service::TlsConfig; use crate::{ error::TransportErrorKind, multiaddr::Multiaddr, - runtime::{TcpListener, TcpStream}, + runtime::TcpStream, service::config::TcpSocketConfig, - transports::{tcp_dial, tcp_listen, Transport, TransportFuture}, - utils::{dns::DnsResolver, multiaddr_to_socketaddr, socketaddr_to_multiaddr}, + transports::{ + tcp_base_listen::{bind, TcpBaseListenerEnum, UpgradeMode}, + tcp_dial, Result, TcpListenMode, TransportDial, TransportFuture, TransportListen, + }, + utils::{dns::DnsResolver, multiaddr_to_socketaddr}, }; -/// Tcp listen bind -async fn bind( - address: impl Future>, - tcp_config: TcpSocketConfig, -) -> Result<(Multiaddr, TcpListener)> { - let addr = address.await?; - match multiaddr_to_socketaddr(&addr) { - Some(socket_address) => { - let (local_addr, tcp) = tcp_listen(socket_address, tcp_config).await?; - - let listen_addr = socketaddr_to_multiaddr(local_addr); - - Ok((listen_addr, tcp)) - } - None => Err(TransportErrorKind::NotSupported(addr)), - } -} - /// Tcp connect async fn connect( address: impl Future>, @@ -50,26 +38,53 @@ async fn connect( pub struct TcpTransport { timeout: Duration, tcp_config: TcpSocketConfig, + listen_mode: TcpListenMode, + global: Arc>>, + #[cfg(feature = "tls")] + tls_config: TlsConfig, } impl TcpTransport { pub fn new(timeout: Duration, tcp_config: TcpSocketConfig) -> Self { - TcpTransport { + Self { timeout, tcp_config, + listen_mode: TcpListenMode::Tcp, + global: Arc::new(crate::lock::Mutex::new(Default::default())), + #[cfg(feature = "tls")] + tls_config: Default::default(), + } + } + + pub fn from_multi_transport( + multi_transport: super::MultiTransport, + listen_mode: TcpListenMode, + ) -> Self { + Self { + timeout: multi_transport.timeout, + tcp_config: match listen_mode { + TcpListenMode::Tcp => multi_transport.tcp_config.tcp, + #[cfg(feature = "ws")] + TcpListenMode::Ws => multi_transport.tcp_config.ws, + #[cfg(feature = "tls")] + TcpListenMode::TLS => multi_transport.tcp_config.tls, + }, + listen_mode, + global: multi_transport.listens_upgrade_modes, + #[cfg(feature = "tls")] + tls_config: multi_transport.tls_config.unwrap_or_default(), } } } // If `Existence type` is available, `Pin>` will no longer be needed here, and the signature is `TransportFuture>` pub type TcpListenFuture = - TransportFuture> + Send>>>; + TransportFuture> + Send>>>; pub type TcpDialFuture = TransportFuture> + Send>>>; -impl Transport for TcpTransport { +impl TransportListen for TcpTransport { type ListenFuture = TcpListenFuture; - type DialFuture = TcpDialFuture; fn listen(self, address: Multiaddr) -> Result { match DnsResolver::new(address.clone()) { @@ -79,15 +94,32 @@ impl Transport for TcpTransport { TransportErrorKind::DnsResolverError(multiaddr, io_error) }), self.tcp_config, + self.listen_mode, + #[cfg(feature = "tls")] + self.tls_config, + self.global, + self.timeout, ); Ok(TransportFuture::new(Box::pin(task))) } None => { - let task = bind(ok(address), self.tcp_config); + let task = bind( + ok(address), + self.tcp_config, + self.listen_mode, + #[cfg(feature = "tls")] + self.tls_config, + self.global, + self.timeout, + ); Ok(TransportFuture::new(Box::pin(task))) } } } +} + +impl TransportDial for TcpTransport { + type DialFuture = TcpDialFuture; fn dial(self, address: Multiaddr) -> Result { match DnsResolver::new(address.clone()) { diff --git a/tentacle/src/transports/tcp_base_listen.rs b/tentacle/src/transports/tcp_base_listen.rs new file mode 100644 index 00000000..17f6d9ed --- /dev/null +++ b/tentacle/src/transports/tcp_base_listen.rs @@ -0,0 +1,488 @@ +use std::{ + collections::{hash_map::Entry, HashMap}, + future::Future, + io, + net::SocketAddr, + pin::Pin, + sync::{ + atomic::{AtomicU8, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; + +use futures::{ + channel::mpsc::{self, Receiver, Sender}, + SinkExt, Stream, +}; +use log::debug; + +#[cfg(any(feature = "ws", feature = "tls"))] +use crate::multiaddr::Protocol; +#[cfg(feature = "ws")] +use {crate::transports::ws::WsStream, tokio_tungstenite::accept_async}; +#[cfg(feature = "tls")] +use { + crate::{service::TlsConfig, transports::parse_tls_domain_name}, + std::borrow::Cow, + tokio_rustls::{ + rustls::{server::ResolvesServerCertUsingSni, ServerConfig}, + TlsAcceptor, + }, +}; + +use crate::{ + multiaddr::Multiaddr, + runtime::{TcpListener, TcpStream}, + service::config::TcpSocketConfig, + transports::{tcp_listen, MultiStream, Result, TcpListenMode, TransportErrorKind}, + utils::{multiaddr_to_socketaddr, socketaddr_to_multiaddr}, +}; + +pub enum TcpBaseListenerEnum { + Upgrade, + New(TcpBaseListener), +} + +/// Tcp listen bind +pub async fn bind( + address: impl Future>, + tcp_config: TcpSocketConfig, + listen_mode: TcpListenMode, + #[cfg(feature = "tls")] config: TlsConfig, + global: Arc>>, + timeout: Duration, +) -> Result<(Multiaddr, TcpBaseListenerEnum)> { + let addr = address.await?; + let upgrade_mode: UpgradeMode = listen_mode.into(); + match multiaddr_to_socketaddr(&addr) { + Some(socket_address) => { + // Global register global listener upgrade mode + match global.clone().lock().entry(socket_address) { + Entry::Occupied(v) => { + #[allow(unused_mut)] + let mut tcp_base_addr: Multiaddr = socketaddr_to_multiaddr(socket_address); + let listen_addr = match listen_mode { + TcpListenMode::Tcp => tcp_base_addr, + #[cfg(feature = "ws")] + TcpListenMode::Ws => { + tcp_base_addr.push(Protocol::Ws); + tcp_base_addr + } + #[cfg(feature = "tls")] + TcpListenMode::TLS => { + match parse_tls_domain_name(&addr) { + None => return Err(TransportErrorKind::NotSupported(addr)), + Some(d) => { + tcp_base_addr.push(Protocol::Tls(Cow::Owned(d))); + } + } + tcp_base_addr + } + }; + v.get().combine(listen_mode.into()); + return Ok((listen_addr, TcpBaseListenerEnum::Upgrade)); + } + Entry::Vacant(v) => { + v.insert(upgrade_mode.clone()); + } + } + let (local_addr, tcp) = tcp_listen(socket_address, tcp_config).await?; + + #[allow(unused_mut)] + let mut tcp_base_addr: Multiaddr = socketaddr_to_multiaddr(local_addr); + let listen_addr = match listen_mode { + TcpListenMode::Tcp => tcp_base_addr, + #[cfg(feature = "ws")] + TcpListenMode::Ws => { + tcp_base_addr.push(Protocol::Ws); + tcp_base_addr + } + #[cfg(feature = "tls")] + TcpListenMode::TLS => { + match parse_tls_domain_name(&addr) { + None => return Err(TransportErrorKind::NotSupported(addr)), + Some(d) => { + tcp_base_addr.push(Protocol::Tls(Cow::Owned(d))); + } + } + tcp_base_addr + } + }; + #[cfg(feature = "tls")] + let tls_server_config = config.tls_server_config.unwrap_or( + // if enable tls but not set tls config, it will use a empty server config + Arc::new( + ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new(ResolvesServerCertUsingSni::new())), + ), + ); + + Ok(( + listen_addr, + TcpBaseListenerEnum::New({ + let tcp_listen = + TcpBaseListener::new(timeout, tcp, local_addr, upgrade_mode, global); + #[cfg(feature = "tls")] + let tcp_listen = tcp_listen.tls_config(tls_server_config); + tcp_listen + }), + )) + } + None => Err(TransportErrorKind::NotSupported(addr)), + } +} + +#[derive(Clone)] +pub(crate) struct UpgradeMode { + inner: Arc, +} + +impl UpgradeMode { + pub fn combine(&self, other: UpgradeModeEnum) { + let other = other as u8; + self.inner.fetch_or(other, Ordering::AcqRel); + } + + pub fn to_enum(&self) -> UpgradeModeEnum { + self.inner.load(Ordering::Acquire).into() + } +} + +impl From for UpgradeMode { + fn from(value: UpgradeModeEnum) -> Self { + Self { + inner: Arc::new(AtomicU8::from(value as u8)), + } + } +} + +impl From for UpgradeMode { + fn from(value: TcpListenMode) -> Self { + match value { + TcpListenMode::Tcp => UpgradeModeEnum::OnlyTcp.into(), + #[cfg(feature = "tls")] + TcpListenMode::TLS => UpgradeModeEnum::OnlyTLS.into(), + #[cfg(feature = "ws")] + TcpListenMode::Ws => UpgradeModeEnum::OnlyWs.into(), + } + } +} + +impl From for UpgradeModeEnum { + fn from(value: TcpListenMode) -> Self { + match value { + TcpListenMode::Tcp => UpgradeModeEnum::OnlyTcp, + #[cfg(feature = "tls")] + TcpListenMode::TLS => UpgradeModeEnum::OnlyTLS, + #[cfg(feature = "ws")] + TcpListenMode::Ws => UpgradeModeEnum::OnlyWs, + } + } +} + +#[repr(u8)] +pub enum UpgradeModeEnum { + OnlyTcp = 0b1, + #[cfg(feature = "ws")] + OnlyWs = 0b10, + #[cfg(feature = "tls")] + OnlyTLS = 0b100, + #[cfg(feature = "ws")] + TcpAndWs = 0b11, + #[cfg(feature = "tls")] + TcpAndTLS = 0b101, + #[cfg(all(feature = "ws", feature = "tls"))] + All = 0b111, +} + +impl From for UpgradeModeEnum { + fn from(value: u8) -> Self { + match value { + 0b1 => UpgradeModeEnum::OnlyTcp, + #[cfg(feature = "ws")] + 0b10 => UpgradeModeEnum::OnlyWs, + #[cfg(feature = "ws")] + 0b11 => UpgradeModeEnum::TcpAndWs, + #[cfg(feature = "tls")] + 0b100 => UpgradeModeEnum::OnlyTLS, + #[cfg(feature = "tls")] + 0b101 => UpgradeModeEnum::TcpAndTLS, + #[cfg(all(feature = "ws", feature = "tls"))] + 0b111 => UpgradeModeEnum::All, + _ => unreachable!(), + } + } +} + +pub struct TcpBaseListener { + inner: TcpListener, + upgrade_mode: UpgradeMode, + timeout: Duration, + local_addr: SocketAddr, + sender: Sender<(Multiaddr, MultiStream)>, + pending_stream: Receiver<(Multiaddr, MultiStream)>, + global: Arc>>, + #[cfg(feature = "tls")] + tls_config: Arc, +} + +impl Drop for TcpBaseListener { + fn drop(&mut self) { + self.global.lock().remove(&self.local_addr); + } +} + +impl TcpBaseListener { + fn new( + timeout: Duration, + inner: TcpListener, + local_addr: SocketAddr, + upgrade_mode: UpgradeMode, + global: Arc>>, + ) -> Self { + let (tx, rx) = mpsc::channel(128); + + Self { + inner, + timeout, + upgrade_mode, + local_addr, + global, + sender: tx, + pending_stream: rx, + #[cfg(feature = "tls")] + tls_config: Arc::new( + ServerConfig::builder() + .with_no_client_auth() + .with_cert_resolver(Arc::new(ResolvesServerCertUsingSni::new())), + ), + } + } + + #[cfg(feature = "tls")] + fn tls_config(mut self, tls_config: Arc) -> Self { + self.tls_config = tls_config; + self + } + + fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, MultiStream)> { + match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) { + Poll::Ready(Some(res)) => Poll::Ready(res), + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } + } + + fn poll_listen(&mut self, cx: &mut Context) -> Poll> { + match self.inner.poll_accept(cx)? { + Poll::Ready((stream, _)) => { + // Why can't get the peer address of the connected stream ? + // Error will be "Transport endpoint is not connected", + // so why incoming will appear unconnected stream ? + match stream.peer_addr() { + Ok(remote_address) => { + let timeout = self.timeout; + let sender = self.sender.clone(); + let upgrade_mode = self.upgrade_mode.to_enum(); + #[cfg(feature = "tls")] + let acceptor = TlsAcceptor::from(Arc::clone(&self.tls_config)); + crate::runtime::spawn(protocol_select( + stream, + timeout, + upgrade_mode, + sender, + remote_address, + #[cfg(feature = "tls")] + acceptor, + )); + } + Err(err) => { + debug!("stream get peer address error: {:?}", err); + } + } + Poll::Ready(Ok(())) + } + Poll::Pending => Poll::Pending, + } + } +} + +impl Stream for TcpBaseListener { + type Item = std::result::Result<(Multiaddr, MultiStream), io::Error>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(res) = self.poll_pending(cx) { + return Poll::Ready(Some(Ok(res))); + } + + loop { + let is_pending = self.poll_listen(cx)?.is_pending(); + match self.poll_pending(cx) { + Poll::Ready(res) => return Poll::Ready(Some(Ok(res))), + Poll::Pending => { + if is_pending { + break; + } + } + } + } + Poll::Pending + } +} + +async fn protocol_select( + stream: TcpStream, + #[allow(unused_variables)] timeout: Duration, + #[allow(unused_mut)] mut upgrade_mode: UpgradeModeEnum, + mut sender: Sender<(Multiaddr, MultiStream)>, + remote_address: SocketAddr, + #[cfg(feature = "tls")] acceptor: TlsAcceptor, +) { + loop { + match upgrade_mode { + UpgradeModeEnum::OnlyTcp => { + if sender + .send(( + socketaddr_to_multiaddr(remote_address), + MultiStream::Tcp(stream), + )) + .await + .is_err() + { + debug!("receiver closed unexpectedly") + } + return; + } + #[cfg(feature = "ws")] + UpgradeModeEnum::OnlyWs => { + match crate::runtime::timeout(timeout, accept_async(stream)).await { + Err(_) => debug!("accept websocket stream timeout"), + Ok(res) => match res { + Ok(stream) => { + let mut addr = socketaddr_to_multiaddr(remote_address); + addr.push(Protocol::Ws); + if sender + .send((addr, MultiStream::Ws(Box::new(WsStream::new(stream))))) + .await + .is_err() + { + debug!("receiver closed unexpectedly") + } + } + Err(err) => { + debug!("accept websocket stream err: {:?}", err); + } + }, + } + return; + } + #[cfg(feature = "tls")] + UpgradeModeEnum::OnlyTLS => { + match crate::runtime::timeout(timeout, acceptor.accept(stream)).await { + Err(_) => debug!("accept tls server stream timeout"), + Ok(res) => match res { + Ok(stream) => { + let mut addr = socketaddr_to_multiaddr(remote_address); + addr.push(Protocol::Tls(Cow::Borrowed(""))); + if sender + .send((addr, MultiStream::Tls(Box::new(stream)))) + .await + .is_err() + { + debug!("receiver closed unexpectedly") + } + } + Err(err) => { + debug!("accept tls server stream err: {:?}", err); + } + }, + } + return; + } + #[cfg(feature = "tls")] + UpgradeModeEnum::TcpAndTLS => { + let mut peek_buf = [0u8; 16]; + if let Err(e) = stream.peek(&mut peek_buf).await { + debug!("stream encountered err: {}, close unexpectedly", e); + return; + } + + // The first sixteen bytes of secio's Propose message's mode is fixed + // it's bytes like follow: + // + // | 4 byte | 4 byte | 4 byte | 4 byte | 4 byte |... + // |--|--|--|--|--| + // | LengthDelimitedCodec header| molecule propose header | rand start | rand end/pubkey start | pubkey end/exchange start |... + // + // LengthDelimitedCodec header is big-end total len + // molecule propose header is little-end total len + // rand start offset is 24 = (5(feild count) + 1(total len))* 4 + let length_delimited_header = + u32::from_be_bytes(TryInto::<[u8; 4]>::try_into(&peek_buf[..4]).unwrap()); + let molecule_header = + u32::from_le_bytes(TryInto::<[u8; 4]>::try_into(&peek_buf[4..8]).unwrap()); + let rand_start = + u32::from_le_bytes(TryInto::<[u8; 4]>::try_into(&peek_buf[8..12]).unwrap()); + let rand_end = + u32::from_le_bytes(TryInto::<[u8; 4]>::try_into(&peek_buf[12..16]).unwrap()); + + if length_delimited_header == molecule_header + && rand_start == 24 + && rand_start < rand_end + && rand_end < molecule_header + { + upgrade_mode = UpgradeModeEnum::OnlyTcp; + continue; + } else { + upgrade_mode = UpgradeModeEnum::OnlyTLS; + continue; + } + } + #[cfg(feature = "ws")] + UpgradeModeEnum::TcpAndWs => { + let mut peek_buf = [0u8; 16]; + if let Err(e) = stream.peek(&mut peek_buf).await { + debug!("stream encountered err: {}, close unexpectedly", e); + return; + } + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut req = httparse::Request::new(&mut headers); + + match req.parse(&peek_buf) { + Ok(_) => { + upgrade_mode = UpgradeModeEnum::OnlyWs; + continue; + } + Err(_) => { + upgrade_mode = UpgradeModeEnum::OnlyTcp; + continue; + } + } + } + #[cfg(all(feature = "ws", feature = "tls"))] + UpgradeModeEnum::All => { + let mut peek_buf = [0u8; 16]; + if let Err(e) = stream.peek(&mut peek_buf).await { + debug!("stream encountered err: {}, close unexpectedly", e); + return; + } + + let mut headers = [httparse::EMPTY_HEADER; 16]; + let mut req = httparse::Request::new(&mut headers); + + match req.parse(&peek_buf) { + Ok(_) => { + upgrade_mode = UpgradeModeEnum::OnlyWs; + continue; + } + Err(_) => { + upgrade_mode = UpgradeModeEnum::TcpAndTLS; + continue; + } + } + } + } + } +} diff --git a/tentacle/src/transports/tls.rs b/tentacle/src/transports/tls.rs index 69d34daf..d9c2109a 100644 --- a/tentacle/src/transports/tls.rs +++ b/tentacle/src/transports/tls.rs @@ -1,57 +1,21 @@ use super::Result; -use futures::{future::ok, SinkExt, Stream, TryFutureExt}; -use log::{debug, warn}; -use std::{ - borrow::Cow, - future::Future, - pin::Pin, - task::{Context, Poll}, - time::Duration, -}; +use futures::{future::ok, TryFutureExt}; +use std::{future::Future, pin::Pin, time::Duration}; -use crate::runtime::TcpListener; use crate::service::TlsConfig; use crate::{ error::TransportErrorKind, - multiaddr::{Multiaddr, Protocol}, + multiaddr::Multiaddr, service::config::TcpSocketConfig, session::AsyncRw, - transports::{tcp_dial, tcp_listen, Transport, TransportFuture}, - utils::{dns::DnsResolver, multiaddr_to_socketaddr, socketaddr_to_multiaddr}, + transports::{parse_tls_domain_name, tcp_dial, TransportDial, TransportFuture}, + utils::{dns::DnsResolver, multiaddr_to_socketaddr}, }; -use futures::channel::mpsc::{channel, Receiver, Sender}; -use std::sync::Arc; -use tokio::io; -use tokio_rustls::rustls::{pki_types::ServerName, ServerConfig}; -use tokio_rustls::{TlsAcceptor, TlsConnector}; -pub type TlsStream = Box; +use tokio_rustls::rustls::pki_types::ServerName; +use tokio_rustls::TlsConnector; -/// Tls listen bind -async fn bind( - address: impl Future>, - timeout: Duration, - config: TlsConfig, - domain_name: String, - tcp_config: TcpSocketConfig, -) -> Result<(Multiaddr, TlsListener)> { - let addr = address.await?; - match multiaddr_to_socketaddr(&addr) { - Some(socket_address) => { - let (local_addr, tcp) = tcp_listen(socket_address, tcp_config).await?; - let tls_server_config = config.tls_server_config.ok_or_else(|| { - TransportErrorKind::TlsError("server config not found".to_string()) - })?; - let mut listen_addr = socketaddr_to_multiaddr(local_addr); - listen_addr.push(Protocol::Tls(Cow::Owned(domain_name))); - Ok(( - listen_addr, - TlsListener::new(timeout, tcp, tls_server_config), - )) - } - None => Err(TransportErrorKind::NotSupported(addr)), - } -} +pub type TlsStream = Box; /// Tls connect async fn connect( @@ -88,105 +52,6 @@ async fn connect( } } -pub struct TlsListener { - inner: TcpListener, - timeout: Duration, - sender: Sender<(Multiaddr, TlsStream)>, - pending_stream: Receiver<(Multiaddr, TlsStream)>, - tls_config: Arc, -} - -impl TlsListener { - fn new(timeout: Duration, listen: TcpListener, tls_config: Arc) -> Self { - let (sender, rx) = channel(24); - TlsListener { - inner: listen, - timeout, - sender, - pending_stream: rx, - tls_config, - } - } - - fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, TlsStream)> { - match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) { - Poll::Ready(Some(res)) => Poll::Ready(res), - Poll::Ready(None) | Poll::Pending => Poll::Pending, - } - } - - fn poll_listen(&mut self, cx: &mut Context) -> Poll> { - match self.inner.poll_accept(cx)? { - Poll::Ready((stream, _)) => { - match stream.peer_addr() { - Ok(remote_address) => { - let timeout = self.timeout; - let mut sender = self.sender.clone(); - let acceptor = TlsAcceptor::from(Arc::clone(&self.tls_config)); - crate::runtime::spawn(async move { - match crate::runtime::timeout(timeout, acceptor.accept(stream)).await { - Err(_) => warn!("accept tls server stream timeout"), - Ok(res) => match res { - Ok(stream) => { - let mut addr = socketaddr_to_multiaddr(remote_address); - addr.push(Protocol::Tls(Cow::Borrowed(""))); - if sender.send((addr, Box::new(stream))).await.is_err() { - debug!("receiver closed unexpectedly") - } - } - Err(err) => { - warn!("accept tls server stream err: {:?}", err); - } - }, - } - }); - } - Err(err) => { - debug!("stream get peer address error: {:?}", err); - } - } - Poll::Ready(Ok(())) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl Stream for TlsListener { - type Item = std::result::Result<(Multiaddr, TlsStream), io::Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if let Poll::Ready(res) = self.poll_pending(cx) { - return Poll::Ready(Some(Ok(res))); - } - - loop { - let is_pending = self.poll_listen(cx)?.is_pending(); - match self.poll_pending(cx) { - Poll::Ready(res) => return Poll::Ready(Some(Ok(res))), - Poll::Pending => { - if is_pending { - break; - } - } - } - } - Poll::Pending - } -} - -fn parse_tls_domain_name(addr: &Multiaddr) -> Option { - let mut iter = addr.iter(); - - iter.find_map(|proto| { - if let Protocol::Tls(s) = proto { - Some(s.to_string()) - } else { - None - } - }) -} - /// Tcp transport pub struct TlsTransport { timeout: Duration, @@ -204,46 +69,12 @@ impl TlsTransport { } } -pub type TlsListenFuture = - TransportFuture> + Send>>>; pub type TlsDialFuture = TransportFuture> + Send>>>; -impl Transport for TlsTransport { - type ListenFuture = TlsListenFuture; +impl TransportDial for TlsTransport { type DialFuture = TlsDialFuture; - fn listen(self, address: Multiaddr) -> Result { - if let Some(domain_name) = parse_tls_domain_name(&address) { - match DnsResolver::new(address.clone()) { - Some(dns) => { - let task = bind( - dns.map_err(|(multiaddr, io_error)| { - TransportErrorKind::DnsResolverError(multiaddr, io_error) - }), - self.timeout, - self.config, - domain_name, - self.tcp_config, - ); - Ok(TransportFuture::new(Box::pin(task))) - } - None => { - let task = bind( - ok(address), - self.timeout, - self.config, - domain_name, - self.tcp_config, - ); - Ok(TransportFuture::new(Box::pin(task))) - } - } - } else { - Err(TransportErrorKind::NotSupported(address)) - } - } - fn dial(self, address: Multiaddr) -> Result { if let Some(domain_name) = parse_tls_domain_name(&address) { match DnsResolver::new(address.clone()) { diff --git a/tentacle/src/transports/ws.rs b/tentacle/src/transports/ws.rs index d0c18194..9c2cbf09 100644 --- a/tentacle/src/transports/ws.rs +++ b/tentacle/src/transports/ws.rs @@ -1,8 +1,4 @@ -use futures::{ - channel::mpsc::{channel, Receiver, Sender}, - future::ok, - Sink, SinkExt, Stream, StreamExt, TryFutureExt, -}; +use futures::{future::ok, Sink, StreamExt, TryFutureExt}; use log::debug; use std::{ future::Future, @@ -13,39 +9,20 @@ use std::{ }; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio_tungstenite::{ - accept_async, client_async_with_config, + client_async_with_config, tungstenite::{Error, Message}, WebSocketStream, }; use crate::{ error::TransportErrorKind, - multiaddr::{Multiaddr, Protocol}, - runtime::{TcpListener, TcpStream}, + multiaddr::Multiaddr, + runtime::TcpStream, service::config::TcpSocketConfig, - transports::{tcp_dial, tcp_listen, Result, Transport, TransportFuture}, - utils::{dns::DnsResolver, multiaddr_to_socketaddr, socketaddr_to_multiaddr}, + transports::{tcp_dial, Result, TransportDial, TransportFuture}, + utils::{dns::DnsResolver, multiaddr_to_socketaddr}, }; -/// websocket listen bind -async fn bind( - address: impl Future>, - timeout: Duration, - tcp_config: TcpSocketConfig, -) -> Result<(Multiaddr, WebsocketListener)> { - let addr = address.await?; - match multiaddr_to_socketaddr(&addr) { - Some(socket_address) => { - let (addr, tcp) = tcp_listen(socket_address, tcp_config).await?; - let mut listen_addr = socketaddr_to_multiaddr(addr); - listen_addr.push(Protocol::Ws); - - Ok((listen_addr, WebsocketListener::new(timeout, tcp))) - } - None => Err(TransportErrorKind::NotSupported(addr)), - } -} - /// websocket connect async fn connect( address: impl Future>, @@ -91,34 +68,12 @@ impl WsTransport { } } -pub type WsListenFuture = - TransportFuture> + Send>>>; pub type WsDialFuture = TransportFuture> + Send>>>; -impl Transport for WsTransport { - type ListenFuture = WsListenFuture; +impl TransportDial for WsTransport { type DialFuture = WsDialFuture; - fn listen(self, address: Multiaddr) -> Result { - match DnsResolver::new(address.clone()) { - Some(dns) => { - let task = bind( - dns.map_err(|(multiaddr, io_error)| { - TransportErrorKind::DnsResolverError(multiaddr, io_error) - }), - self.timeout, - self.tcp_config, - ); - Ok(TransportFuture::new(Box::pin(task))) - } - None => { - let task = bind(ok(address), self.timeout, self.tcp_config); - Ok(TransportFuture::new(Box::pin(task))) - } - } - } - fn dial(self, address: Multiaddr) -> Result { match DnsResolver::new(address.clone()) { Some(dns) => { @@ -142,92 +97,6 @@ impl Transport for WsTransport { } } -#[derive(Debug)] -pub struct WebsocketListener { - inner: TcpListener, - timeout: Duration, - sender: Sender<(Multiaddr, WsStream)>, - pending_stream: Receiver<(Multiaddr, WsStream)>, -} - -impl WebsocketListener { - fn new(timeout: Duration, listen: TcpListener) -> Self { - let (sender, rx) = channel(24); - WebsocketListener { - inner: listen, - timeout, - sender, - pending_stream: rx, - } - } - - fn poll_pending(&mut self, cx: &mut Context) -> Poll<(Multiaddr, WsStream)> { - match Pin::new(&mut self.pending_stream).as_mut().poll_next(cx) { - Poll::Ready(Some(res)) => Poll::Ready(res), - Poll::Ready(None) | Poll::Pending => Poll::Pending, - } - } - - fn poll_listen(&mut self, cx: &mut Context) -> Poll> { - match self.inner.poll_accept(cx)? { - Poll::Ready((stream, _)) => { - match stream.peer_addr() { - Ok(remote_address) => { - let timeout = self.timeout; - let mut sender = self.sender.clone(); - crate::runtime::spawn(async move { - match crate::runtime::timeout(timeout, accept_async(stream)).await { - Err(_) => debug!("accept websocket stream timeout"), - Ok(res) => match res { - Ok(stream) => { - let mut addr = socketaddr_to_multiaddr(remote_address); - addr.push(Protocol::Ws); - if sender.send((addr, WsStream::new(stream))).await.is_err() - { - debug!("receiver closed unexpectedly") - } - } - Err(err) => { - debug!("accept websocket stream err: {:?}", err); - } - }, - } - }); - } - Err(err) => { - debug!("stream get peer address error: {:?}", err); - } - } - Poll::Ready(Ok(())) - } - Poll::Pending => Poll::Pending, - } - } -} - -impl Stream for WebsocketListener { - type Item = std::result::Result<(Multiaddr, WsStream), io::Error>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if let Poll::Ready(res) = self.poll_pending(cx) { - return Poll::Ready(Some(Ok(res))); - } - - loop { - let is_pending = self.poll_listen(cx)?.is_pending(); - match self.poll_pending(cx) { - Poll::Ready(res) => return Poll::Ready(Some(Ok(res))), - Poll::Pending => { - if is_pending { - break; - } - } - } - } - Poll::Pending - } -} - #[derive(Debug)] pub struct WsStream { inner: WebSocketStream, @@ -237,7 +106,7 @@ pub struct WsStream { } impl WsStream { - fn new(inner: WebSocketStream) -> Self { + pub fn new(inner: WebSocketStream) -> Self { WsStream { inner, recv_buf: Vec::new(), diff --git a/tentacle/src/utils/dns.rs b/tentacle/src/utils/dns.rs index 37347696..bbbae286 100644 --- a/tentacle/src/utils/dns.rs +++ b/tentacle/src/utils/dns.rs @@ -12,7 +12,7 @@ use std::{ use crate::{ multiaddr::{Multiaddr, Protocol}, secio::PeerId, - transports::{find_type, TransportType}, + transports::{find_type, parse_tls_domain_name, TransportType}, utils::{extract_peer_id, socketaddr_to_multiaddr}, }; @@ -76,7 +76,14 @@ impl DnsResolver { Some(address) => { let mut address = socketaddr_to_multiaddr(address); match self.ty { - TransportType::Tcp | TransportType::Memory | TransportType::Tls => (), + TransportType::Tcp | TransportType::Memory => (), + TransportType::Tls => { + let domain_name = parse_tls_domain_name(&self.source_address) + .map(Cow::Owned) + .unwrap_or(Cow::Borrowed("")); + + address.push(Protocol::Tls(domain_name)); + } TransportType::Ws => address.push(Protocol::Ws), TransportType::Wss => address.push(Protocol::Wss), }