Skip to content

Commit

Permalink
examples/file-sharing: Support binary files (#2786)
Browse files Browse the repository at this point in the history
  • Loading branch information
qidu authored Aug 16, 2022
1 parent cef5056 commit 0e5a25d
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use futures::prelude::*;
use libp2p::core::{Multiaddr, PeerId};
use libp2p::multiaddr::Protocol;
use std::error::Error;
use std::io::Write;
use std::path::PathBuf;

#[async_std::main]
Expand Down Expand Up @@ -134,8 +135,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Reply with the content of the file on incoming requests.
Some(network::Event::InboundRequest { request, channel }) => {
if request == name {
let file_content = std::fs::read_to_string(&path)?;
network_client.respond_file(file_content, channel).await;
network_client
.respond_file(std::fs::read(&path)?, channel)
.await;
}
}
e => todo!("{:?}", e),
Expand All @@ -158,12 +160,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
});

// Await the requests, ignore the remaining once a single one succeeds.
let file = futures::future::select_ok(requests)
let file_content = futures::future::select_ok(requests)
.await
.map_err(|_| "None of the providers returned file.")?
.0;

println!("Content of file {}: {}", name, file);
std::io::stdout().write_all(&file_content)?;
}
}

Expand Down Expand Up @@ -337,7 +339,7 @@ mod network {
&mut self,
peer: PeerId,
file_name: String,
) -> Result<String, Box<dyn Error + Send>> {
) -> Result<Vec<u8>, Box<dyn Error + Send>> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::RequestFile {
Expand All @@ -351,9 +353,16 @@ mod network {
}

/// Respond with the provided file content to the given request.
pub async fn respond_file(&mut self, file: String, channel: ResponseChannel<FileResponse>) {
pub async fn respond_file(
&mut self,
file: Vec<u8>,
channel: ResponseChannel<FileResponse>,
) {
self.sender
.send(Command::RespondFile { file, channel })
.send(Command::RespondFile {
file: file,
channel,
})
.await
.expect("Command receiver not to be dropped.");
}
Expand All @@ -367,7 +376,7 @@ mod network {
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_request_file:
HashMap<RequestId, oneshot::Sender<Result<String, Box<dyn Error + Send>>>>,
HashMap<RequestId, oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>>,
}

impl EventLoop {
Expand Down Expand Up @@ -476,7 +485,7 @@ mod network {
)) => {}
SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id();
println!(
eprintln!(
"Local node is listening on {:?}",
address.with(Protocol::P2p(local_peer_id.into()))
);
Expand All @@ -500,7 +509,7 @@ mod network {
}
}
SwarmEvent::IncomingConnectionError { .. } => {}
SwarmEvent::Dialing(peer_id) => println!("Dialing {}", peer_id),
SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {}", peer_id),
e => panic!("{:?}", e),
}
}
Expand Down Expand Up @@ -625,10 +634,10 @@ mod network {
RequestFile {
file_name: String,
peer: PeerId,
sender: oneshot::Sender<Result<String, Box<dyn Error + Send>>>,
sender: oneshot::Sender<Result<Vec<u8>, Box<dyn Error + Send>>>,
},
RespondFile {
file: String,
file: Vec<u8>,
channel: ResponseChannel<FileResponse>,
},
}
Expand All @@ -650,7 +659,7 @@ mod network {
#[derive(Debug, Clone, PartialEq, Eq)]
struct FileRequest(String);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FileResponse(String);
pub struct FileResponse(Vec<u8>);

impl ProtocolName for FileExchangeProtocol {
fn protocol_name(&self) -> &[u8] {
Expand Down Expand Up @@ -689,13 +698,13 @@ mod network {
where
T: AsyncRead + Unpin + Send,
{
let vec = read_length_prefixed(io, 1_000_000).await?;
let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum

if vec.is_empty() {
return Err(io::ErrorKind::UnexpectedEof.into());
}

Ok(FileResponse(String::from_utf8(vec).unwrap()))
Ok(FileResponse(vec))
}

async fn write_request<T>(
Expand Down

0 comments on commit 0e5a25d

Please sign in to comment.