diff --git a/util/network-devp2p/src/connection.rs b/util/network-devp2p/src/connection.rs index ee59c48f2ab..eb663d37942 100644 --- a/util/network-devp2p/src/connection.rs +++ b/util/network-devp2p/src/connection.rs @@ -167,8 +167,8 @@ impl Connection { /// Create a new connection with given id and socket. pub fn new(token: StreamToken, socket: TcpStream) -> Connection { Connection { - token: token, - socket: socket, + token, + socket, send_queue: VecDeque::new(), rec_buf: Bytes::new(), rec_size: 0, @@ -318,24 +318,24 @@ impl EncryptedConnection { let mac_encoder = EcbEncryptor::new(AesSafe256Encryptor::new(&key_material[32..64]), NoPadding); let mut egress_mac = Keccak::new_keccak256(); - let mut mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.remote_nonce; + let mut mac_material = H256::from_slice(&key_material[32..64]) ^ handshake.remote_nonce; egress_mac.update(&mac_material); egress_mac.update(if handshake.originated { &handshake.auth_cipher } else { &handshake.ack_cipher }); let mut ingress_mac = Keccak::new_keccak256(); - mac_material = &H256::from_slice(&key_material[32..64]) ^ &handshake.nonce; + mac_material = H256::from_slice(&key_material[32..64]) ^ handshake.nonce; ingress_mac.update(&mac_material); ingress_mac.update(if handshake.originated { &handshake.ack_cipher } else { &handshake.auth_cipher }); let old_connection = handshake.connection.try_clone()?; let connection = ::std::mem::replace(&mut handshake.connection, old_connection); let mut enc = EncryptedConnection { - connection: connection, - encoder: encoder, - decoder: decoder, - mac_encoder: mac_encoder, - egress_mac: egress_mac, - ingress_mac: ingress_mac, + connection, + encoder, + decoder, + mac_encoder, + egress_mac, + ingress_mac, read_state: EncryptedConnectionState::Header, protocol_id: 0, payload_len: 0, @@ -534,7 +534,7 @@ mod tests { read_buffer: vec![], write_buffer: vec![], cursor: 0, - buf_size: buf_size, + buf_size, } } } diff --git a/util/network-devp2p/src/discovery.rs b/util/network-devp2p/src/discovery.rs index bc808c39827..c7782b247ca 100644 --- a/util/network-devp2p/src/discovery.rs +++ b/util/network-devp2p/src/discovery.rs @@ -72,7 +72,7 @@ impl BucketEntry { let now = Instant::now(); BucketEntry { id_hash: keccak(address.id), - address: address, + address, last_seen: now, backoff_until: now, fail_count: 0, @@ -137,7 +137,7 @@ pub struct TableUpdates { impl<'a> Discovery<'a> { pub fn new(key: &KeyPair, public: NodeEndpoint, ip_filter: IpFilter) -> Discovery<'static> { Discovery { - id: key.public().clone(), + id: *key.public(), id_hash: keccak(key.public()), secret: key.secret().clone(), public_endpoint: public, @@ -151,7 +151,7 @@ impl<'a> Discovery<'a> { send_queue: VecDeque::new(), check_timestamps: true, adding_nodes: Vec::new(), - ip_filter: ip_filter, + ip_filter, request_backoff: &REQUEST_BACKOFF, } } @@ -248,11 +248,11 @@ impl<'a> Discovery<'a> { { let nearest = self.nearest_node_entries(&self.discovery_id).into_iter(); let nearest = nearest.filter(|x| !self.discovery_nodes.contains(&x.id)).take(ALPHA).collect::>(); - let target = self.discovery_id.clone(); + let target = self.discovery_id; for r in nearest { match self.send_find_node(&r, &target) { Ok(()) => { - self.discovery_nodes.insert(r.id.clone()); + self.discovery_nodes.insert(r.id); tried_count += 1; }, Err(e) => { @@ -401,7 +401,7 @@ impl<'a> Discovery<'a> { } fn send_to(&mut self, payload: Bytes, address: SocketAddr) { - self.send_queue.push_back(Datagram { payload: payload, address: address }); + self.send_queue.push_back(Datagram { payload, address }); } @@ -461,7 +461,7 @@ impl<'a> Discovery<'a> { append_expiration(&mut response); self.send_packet(PACKET_PONG, from, &response.drain())?; - let entry = NodeEntry { id: node.clone(), endpoint: source.clone() }; + let entry = NodeEntry { id: *node, endpoint: source.clone() }; if !entry.endpoint.is_valid() { debug!(target: "discovery", "Got bad address: {:?}", entry); } else if !self.is_allowed(&entry) { @@ -479,10 +479,10 @@ impl<'a> Discovery<'a> { let echo_hash: H256 = rlp.val_at(1)?; let timestamp: u64 = rlp.val_at(2)?; self.check_timestamp(timestamp)?; - let mut node = NodeEntry { id: node_id.clone(), endpoint: dest }; + let mut node = NodeEntry { id: *node_id, endpoint: dest }; if !node.endpoint.is_valid() { debug!(target: "discovery", "Bad address: {:?}", node); - node.endpoint.address = from.clone(); + node.endpoint.address = *from; } let is_expected = match self.in_flight_requests.entry(*node_id) { @@ -530,10 +530,10 @@ impl<'a> Discovery<'a> { let packets = chunks.map(|c| { let mut rlp = RlpStream::new_list(2); rlp.begin_list(c.len()); - for n in 0 .. c.len() { + for n in c { rlp.begin_list(4); - c[n].endpoint.to_rlp(&mut rlp); - rlp.append(&c[n].id); + n.endpoint.to_rlp(&mut rlp); + rlp.append(&n.id); } append_expiration(&mut rlp); rlp.out() @@ -581,7 +581,7 @@ impl<'a> Discovery<'a> { if node_id == self.id { continue; } - let entry = NodeEntry { id: node_id.clone(), endpoint: endpoint }; + let entry = NodeEntry { id: node_id, endpoint }; if !self.is_allowed(&entry) { debug!(target: "discovery", "Address not allowed: {:?}", entry); continue; @@ -644,7 +644,7 @@ impl<'a> Discovery<'a> { let removed = self.check_expired(Instant::now()); self.discover(); if !removed.is_empty() { - Some(TableUpdates { added: HashMap::new(), removed: removed }) + Some(TableUpdates { added: HashMap::new(), removed }) } else { None } } diff --git a/util/network-devp2p/src/handshake.rs b/util/network-devp2p/src/handshake.rs index e2b1ebaa0dd..e664f040238 100644 --- a/util/network-devp2p/src/handshake.rs +++ b/util/network-devp2p/src/handshake.rs @@ -271,7 +271,7 @@ impl Handshake { // E(remote-pubk, S(ecdhe-random, ecdh-shared-secret^nonce) || H(ecdhe-random-pubk) || pubk || nonce || 0x0) let shared = *ecdh::agree(secret, &self.id)?; - sig.copy_from_slice(&*sign(self.ecdhe.secret(), &(&shared ^ &self.nonce))?); + sig.copy_from_slice(&*sign(self.ecdhe.secret(), &(shared ^ self.nonce))?); write_keccak(self.ecdhe.public(), hepubk); pubk.copy_from_slice(public); nonce.copy_from_slice(&self.nonce); diff --git a/util/network-devp2p/src/host.rs b/util/network-devp2p/src/host.rs index 81e304f1ac1..a24684c3b02 100644 --- a/util/network-devp2p/src/host.rs +++ b/util/network-devp2p/src/host.rs @@ -280,7 +280,7 @@ impl Host { listen_address = SocketAddr::new(listen_address.ip(), tcp_listener.local_addr()?.port()); debug!(target: "network", "Listening at {:?}", listen_address); let udp_port = config.udp_port.unwrap_or_else(|| listen_address.port()); - let local_endpoint = NodeEndpoint { address: listen_address, udp_port: udp_port }; + let local_endpoint = NodeEndpoint { address: listen_address, udp_port }; let boot_nodes = config.boot_nodes.clone(); let reserved_nodes = config.reserved_nodes.clone(); @@ -288,13 +288,13 @@ impl Host { let mut host = Host { info: RwLock::new(HostInfo { - keys: keys, - config: config, + keys, + config, nonce: H256::random(), protocol_version: PROTOCOL_VERSION, capabilities: Vec::new(), public_endpoint: None, - local_endpoint: local_endpoint, + local_endpoint, }), discovery: Mutex::new(None), udp_socket: Mutex::new(None), @@ -306,7 +306,7 @@ impl Host { timer_counter: RwLock::new(USER_TIMER), reserved_nodes: RwLock::new(HashSet::new()), stopping: AtomicBool::new(false), - filter: filter, + filter, }; for n in boot_nodes { @@ -349,11 +349,11 @@ impl Host { Ok(()) } - pub fn set_non_reserved_mode(&self, mode: &NonReservedPeerMode, io: &IoContext) { + pub fn set_non_reserved_mode(&self, mode: NonReservedPeerMode, io: &IoContext) { let mut info = self.info.write(); - if &info.config.non_reserved_mode != mode { - info.config.non_reserved_mode = mode.clone(); + if info.config.non_reserved_mode != mode { + info.config.non_reserved_mode = mode; drop(info); if let NonReservedPeerMode::Deny = mode { // disconnect all non-reserved peers here. @@ -430,7 +430,7 @@ impl Host { return Ok(()); } let local_endpoint = self.info.read().local_endpoint.clone(); - let public_address = self.info.read().config.public_address.clone(); + let public_address = self.info.read().config.public_address; let allow_ips = self.info.read().config.ip_filter.clone(); let public_endpoint = match public_address { None => { @@ -489,7 +489,7 @@ impl Host { } fn have_session(&self, id: &NodeId) -> bool { - self.sessions.read().iter().any(|e| e.lock().info.id == Some(id.clone())) + self.sessions.read().iter().any(|e| e.lock().info.id == Some(*id)) } // returns (handshakes, egress, ingress) @@ -534,7 +534,7 @@ impl Host { } let config = &info.config; - (config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.ip_filter.clone(), info.id().clone()) + (config.min_peers, config.non_reserved_mode == NonReservedPeerMode::Deny, config.max_handshakes as usize, config.ip_filter.clone(), *info.id()) }; let (handshake_count, egress_count, ingress_count) = self.session_count(); @@ -710,18 +710,18 @@ impl Host { let (min_peers, mut max_peers, reserved_only, self_id) = { let info = self.info.read(); let mut max_peers = info.config.max_peers; - for cap in s.info.capabilities.iter() { + for cap in &s.info.capabilities { if let Some(num) = info.config.reserved_protocols.get(&cap.protocol) { max_peers += *num; break; } } - (info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny, info.id().clone()) + (info.config.min_peers as usize, max_peers as usize, info.config.non_reserved_mode == NonReservedPeerMode::Deny, *info.id()) }; max_peers = max(max_peers, min_peers); - let id = s.id().expect("Ready session always has id").clone(); + let id = *s.id().expect("Ready session always has id"); // Check for the session limit. // Outgoing connections are allowed as long as their count is <= min_peers @@ -729,13 +729,11 @@ impl Host { let max_ingress = max(max_peers - min_peers, min_peers / 2); if reserved_only || (s.info.originated && egress_count > min_peers) || - (!s.info.originated && ingress_count > max_ingress) { + (!s.info.originated && ingress_count > max_ingress) && !self.reserved_nodes.read().contains(&id) { // only proceed if the connecting peer is reserved. - if !self.reserved_nodes.read().contains(&id) { - s.disconnect(io, DisconnectReason::TooManyPeers); - kill = true; - break; - } + s.disconnect(io, DisconnectReason::TooManyPeers); + kill = true; + break; } if !self.filter.as_ref().map_or(true, |f| f.connection_allowed(&self_id, &id, ConnectionDirection::Inbound)) { @@ -752,7 +750,7 @@ impl Host { if let Ok(address) = s.remote_addr() { // We can't know remote listening ports, so just assume defaults and hope for the best. let endpoint = NodeEndpoint { address: SocketAddr::new(address.ip(), DEFAULT_PORT), udp_port: DEFAULT_PORT }; - let entry = NodeEntry { id: id, endpoint: endpoint }; + let entry = NodeEntry { id, endpoint }; let mut nodes = self.nodes.write(); if !nodes.contains(&entry.id) { nodes.add_node(Node::new(entry.id, entry.endpoint.clone())); @@ -807,7 +805,7 @@ impl Host { } for p in ready_data { let reserved = self.reserved_nodes.read(); - if let Some(h) = handlers.get(&p).clone() { + if let Some(h) = handlers.get(&p) { h.connected(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token); // accumulate pending packets. let mut session = session.lock(); @@ -818,7 +816,7 @@ impl Host { for (p, packet_id, data) in packet_data { let reserved = self.reserved_nodes.read(); - if let Some(h) = handlers.get(&p).clone() { + if let Some(h) = handlers.get(&p) { h.read(&NetworkContext::new(io, p, Some(session.clone()), self.sessions.clone(), &reserved), &token, packet_id, &data); } } @@ -858,31 +856,28 @@ impl Host { } fn discovery_writable(&self, io: &IoContext) { - match (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { - (Some(udp_socket), Some(discovery)) => { - while let Some(data) = discovery.dequeue_send() { - match udp_socket.send_to(&data.payload, &data.address) { - Ok(Some(size)) if size == data.payload.len() => { - }, - Ok(Some(_)) => { - warn!(target: "network", "UDP sent incomplete datagram"); - }, - Ok(None) => { - discovery.requeue_send(data); - return; - } - Err(e) => { - debug!(target: "network", "UDP send error: {:?}, address: {:?}", e, &data.address); - return; - } + if let (Some(udp_socket), Some(discovery)) = (self.udp_socket.lock().as_ref(), self.discovery.lock().as_mut()) { + while let Some(data) = discovery.dequeue_send() { + match udp_socket.send_to(&data.payload, &data.address) { + Ok(Some(size)) if size == data.payload.len() => { + }, + Ok(Some(_)) => { + warn!(target: "network", "UDP sent incomplete datagram"); + }, + Ok(None) => { + discovery.requeue_send(data); + return; + } + Err(e) => { + debug!(target: "network", "UDP send error: {:?}, address: {:?}", e, &data.address); + return; } } - io.update_registration(DISCOVERY) - .unwrap_or_else(|e| { - debug!(target: "network", "Error updating discovery registration: {:?}", e) - }); - }, - _ => (), + } + io.update_registration(DISCOVERY) + .unwrap_or_else(|e| { + debug!(target: "network", "Error updating discovery registration: {:?}", e) + }); } } @@ -922,7 +917,7 @@ impl Host { } for p in to_disconnect { let reserved = self.reserved_nodes.read(); - if let Some(h) = self.handlers.read().get(&p).clone() { + if let Some(h) = self.handlers.read().get(&p) { h.disconnected(&NetworkContext::new(io, p, expired_session.clone(), self.sessions.clone(), &reserved), &token); } } @@ -1012,11 +1007,13 @@ impl IoHandler for Host { IDLE => self.maintain_network(io), FIRST_SESSION ... LAST_SESSION => self.connection_timeout(token, io), DISCOVERY_REFRESH => { - self.discovery.lock().as_mut().map(|d| d.refresh()); + if let Some(d) = self.discovery.lock().as_mut() { + d.refresh(); + } io.update_registration(DISCOVERY).unwrap_or_else(|e| debug!("Error updating discovery registration: {:?}", e)); }, DISCOVERY_ROUND => { - let node_changes = { self.discovery.lock().as_mut().map_or(None, |d| d.round()) }; + let node_changes = { self.discovery.lock().as_mut().and_then(|d| d.round()) }; if let Some(node_changes) = node_changes { self.update_nodes(io, node_changes); } diff --git a/util/network-devp2p/src/node_table.rs b/util/network-devp2p/src/node_table.rs index b2c417b2504..7d1380907ba 100644 --- a/util/network-devp2p/src/node_table.rs +++ b/util/network-devp2p/src/node_table.rs @@ -393,7 +393,6 @@ impl NodeTable { let nodes = node_ids.into_iter() .map(|id| self.nodes.get(&id).expect("self.nodes() only returns node IDs from self.nodes")) .take(MAX_NODES) - .map(|node| node.clone()) .map(Into::into) .collect(); let table = json::NodeTable { nodes }; diff --git a/util/network-devp2p/src/service.rs b/util/network-devp2p/src/service.rs index 709161aebb5..fc8f79b365e 100644 --- a/util/network-devp2p/src/service.rs +++ b/util/network-devp2p/src/service.rs @@ -59,12 +59,12 @@ impl NetworkService { let io_service = IoService::::start()?; Ok(NetworkService { - io_service: io_service, + io_service, host_info: config.client_version.clone(), host: RwLock::new(None), - config: config, - host_handler: host_handler, - filter: filter, + config, + host_handler, + filter, }) } @@ -120,10 +120,10 @@ impl NetworkService { /// In case of error, also returns the listening address for better error reporting. pub fn start(&self) -> Result<(), (Error, Option)> { let mut host = self.host.write(); - let listen_addr = self.config.listen_address.clone(); + let listen_addr = self.config.listen_address; if host.is_none() { let h = Arc::new(Host::new(self.config.clone(), self.filter.clone()) - .map_err(|err| (err.into(), listen_addr))?); + .map_err(|err| (err, listen_addr))?); self.io_service.register_handler(h.clone()) .map_err(|err| (err.into(), listen_addr))?; *host = Some(h); @@ -177,7 +177,7 @@ impl NetworkService { let host = self.host.read(); if let Some(ref host) = *host { let io_ctxt = IoContext::new(self.io_service.channel(), 0); - host.set_non_reserved_mode(&mode, &io_ctxt); + host.set_non_reserved_mode(mode, &io_ctxt); } } diff --git a/util/network/src/lib.rs b/util/network/src/lib.rs index c31ace41093..88bd4e6869a 100644 --- a/util/network/src/lib.rs +++ b/util/network/src/lib.rs @@ -350,7 +350,7 @@ pub trait NetworkProtocolHandler: Sync + Send { } /// Non-reserved peer modes. -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum NonReservedPeerMode { /// Accept them. This is the default. Accept,