Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Add ENOBUFS handling for unsolicited messages
Browse files Browse the repository at this point in the history
This can happen when large burst of messages come all of a sudden, which
happen very easily when routing protocols are involved (e.g. BGP). The
current implementation incorrectly assumes that any failure to read from
the socket is akin to the socket closed. This is not the case.

This adds handling for this specific error, which translates to a
wrapper struct in the unsolicited messages stream: either a message, or
an overrun. This lets applications handle best for their usecase such
event: either resync because messages are lost, or do nothing if the
listening is informational only (e.g. logging).
  • Loading branch information
Tuetuopay committed Oct 1, 2022
1 parent b2c64f9 commit 58dd138
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 69 deletions.
6 changes: 3 additions & 3 deletions audit/examples/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use futures::stream::StreamExt;

#[tokio::main]
async fn main() -> Result<(), String> {
let (connection, mut handle, mut messages) = new_connection().map_err(|e| format!("{}", e))?;
let (connection, mut handle, mut events) = new_connection().map_err(|e| format!("{}", e))?;

tokio::spawn(connection);
handle.enable_events().await.map_err(|e| format!("{}", e))?;

env_logger::init();
while let Some((msg, _)) = messages.next().await {
println!("{:?}", msg);
while let Some(event) = events.next().await {
println!("{event:?}");
}
Ok(())
}
20 changes: 12 additions & 8 deletions audit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ use futures::channel::mpsc::UnboundedReceiver;
pub fn new_connection() -> io::Result<(
proto::Connection<packet::AuditMessage, sys::TokioSocket, packet::NetlinkAuditCodec>,
Handle,
UnboundedReceiver<(
packet::NetlinkMessage<packet::AuditMessage>,
sys::SocketAddr,
)>,
UnboundedReceiver<
packet::NetlinkEvent<(
packet::NetlinkMessage<packet::AuditMessage>,
sys::SocketAddr,
)>,
>,
)> {
new_connection_with_socket()
}
Expand All @@ -33,10 +35,12 @@ pub fn new_connection() -> io::Result<(
pub fn new_connection_with_socket<S>() -> io::Result<(
proto::Connection<packet::AuditMessage, S, packet::NetlinkAuditCodec>,
Handle,
UnboundedReceiver<(
packet::NetlinkMessage<packet::AuditMessage>,
sys::SocketAddr,
)>,
UnboundedReceiver<
packet::NetlinkEvent<(
packet::NetlinkMessage<packet::AuditMessage>,
sys::SocketAddr,
)>,
>,
)>
where
S: sys::AsyncSocket,
Expand Down
6 changes: 3 additions & 3 deletions ethtool/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;

use futures::channel::mpsc::UnboundedReceiver;
use genetlink::message::RawGenlMessage;
use netlink_packet_core::NetlinkMessage;
use netlink_packet_core::{NetlinkEvent, NetlinkMessage};
use netlink_proto::Connection;
use netlink_sys::{AsyncSocket, SocketAddr};

Expand All @@ -15,7 +15,7 @@ use crate::EthtoolHandle;
pub fn new_connection() -> io::Result<(
Connection<RawGenlMessage>,
EthtoolHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
UnboundedReceiver<NetlinkEvent<(NetlinkMessage<RawGenlMessage>, SocketAddr)>>,
)> {
new_connection_with_socket()
}
Expand All @@ -24,7 +24,7 @@ pub fn new_connection() -> io::Result<(
pub fn new_connection_with_socket<S>() -> io::Result<(
Connection<RawGenlMessage, S>,
EthtoolHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
UnboundedReceiver<NetlinkEvent<(NetlinkMessage<RawGenlMessage>, SocketAddr)>>,
)>
where
S: AsyncSocket,
Expand Down
6 changes: 3 additions & 3 deletions genetlink/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use crate::{message::RawGenlMessage, GenetlinkHandle};
use futures::channel::mpsc::UnboundedReceiver;
use netlink_packet_core::NetlinkMessage;
use netlink_packet_core::{NetlinkEvent, NetlinkMessage};
use netlink_proto::{
self,
sys::{protocols::NETLINK_GENERIC, AsyncSocket, SocketAddr},
Expand All @@ -28,7 +28,7 @@ use std::io;
pub fn new_connection() -> io::Result<(
Connection<RawGenlMessage>,
GenetlinkHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
UnboundedReceiver<NetlinkEvent<(NetlinkMessage<RawGenlMessage>, SocketAddr)>>,
)> {
new_connection_with_socket()
}
Expand All @@ -38,7 +38,7 @@ pub fn new_connection() -> io::Result<(
pub fn new_connection_with_socket<S>() -> io::Result<(
Connection<RawGenlMessage, S>,
GenetlinkHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
UnboundedReceiver<NetlinkEvent<(NetlinkMessage<RawGenlMessage>, SocketAddr)>>,
)>
where
S: AsyncSocket,
Expand Down
6 changes: 3 additions & 3 deletions mptcp-pm/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io;

use futures::channel::mpsc::UnboundedReceiver;
use genetlink::message::RawGenlMessage;
use netlink_packet_core::NetlinkMessage;
use netlink_packet_core::{NetlinkEvent, NetlinkMessage};
use netlink_proto::Connection;
use netlink_sys::{AsyncSocket, SocketAddr};

Expand All @@ -15,7 +15,7 @@ use crate::MptcpPathManagerHandle;
pub fn new_connection() -> io::Result<(
Connection<RawGenlMessage>,
MptcpPathManagerHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
UnboundedReceiver<NetlinkEvent<(NetlinkMessage<RawGenlMessage>, SocketAddr)>>,
)> {
new_connection_with_socket()
}
Expand All @@ -24,7 +24,7 @@ pub fn new_connection() -> io::Result<(
pub fn new_connection_with_socket<S>() -> io::Result<(
Connection<RawGenlMessage, S>,
MptcpPathManagerHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
UnboundedReceiver<NetlinkEvent<(NetlinkMessage<RawGenlMessage>, SocketAddr)>>,
)>
where
S: AsyncSocket,
Expand Down
1 change: 1 addition & 0 deletions netlink-packet-audit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub use self::utils::{traits, DecodeError};
pub use netlink_packet_core::{
ErrorMessage,
NetlinkBuffer,
NetlinkEvent,
NetlinkHeader,
NetlinkMessage,
NetlinkPayload,
Expand Down
9 changes: 9 additions & 0 deletions netlink-packet-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ use crate::{
Parseable,
};

/// Represent a Netlink event
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum NetlinkEvent<M> {
/// An actual message was received from Netlink
Message(M),
/// The socket receive buffer filled up
Overrun,
}

/// Represent a netlink message.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct NetlinkMessage<I> {
Expand Down
1 change: 1 addition & 0 deletions netlink-packet-route/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub use self::utils::{traits, DecodeError};
pub use netlink_packet_core::{
ErrorMessage,
NetlinkBuffer,
NetlinkEvent,
NetlinkHeader,
NetlinkMessage,
NetlinkPayload,
Expand Down
22 changes: 14 additions & 8 deletions netlink-proto/examples/audit_netlink_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::process;

use netlink_proto::{
new_connection,
packet::NetlinkEvent,
sys::{protocols::NETLINK_AUDIT, SocketAddr},
};

Expand All @@ -50,11 +51,11 @@ async fn main() -> Result<(), String> {
// - `handle` is a `Handle` to the `Connection`. We use it to send
// netlink messages and receive responses to these messages.
//
// - `messages` is a channel receiver through which we receive
// - `events` is a channel receiver through which we receive
// messages that we have not sollicated, ie that are not
// response to a request we made. In this example, we'll receive
// the audit event through that channel.
let (conn, mut handle, mut messages) = new_connection(NETLINK_AUDIT)
let (conn, mut handle, mut events) = new_connection(NETLINK_AUDIT)
.map_err(|e| format!("Failed to create a new netlink connection: {}", e))?;

// Spawn the `Connection` so that it starts polling the netlink
Expand Down Expand Up @@ -91,13 +92,18 @@ async fn main() -> Result<(), String> {
}
});

// Finally, start receiving event through the `messages` channel.
// Finally, start receiving event through the `events` channel.
println!("Starting to print audit events... press ^C to interrupt");
while let Some((message, _addr)) = messages.next().await {
if let NetlinkPayload::Error(err_message) = message.payload {
eprintln!("received an error message: {:?}", err_message);
} else {
println!("{:?}", message);
while let Some(event) = events.next().await {
match event {
NetlinkEvent::Message((message, _addr)) => {
if let NetlinkPayload::Error(err_message) = message.payload {
eprintln!("received an error message: {:?}", err_message);
} else {
println!("{:?}", message);
}
}
NetlinkEvent::Overrun => println!("Netlink socket overrun. Some messages were lost"),
}
}

Expand Down
27 changes: 17 additions & 10 deletions netlink-proto/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use futures::{
use log::{error, warn};
use netlink_packet_core::{
NetlinkDeserializable,
NetlinkEvent,
NetlinkMessage,
NetlinkPayload,
NetlinkSerializable,
Expand Down Expand Up @@ -52,7 +53,7 @@ where

/// Channel used to transmit to the ConnectionHandle the unsolicited messages received from the
/// socket (multicast messages for instance).
unsolicited_messages_tx: Option<UnboundedSender<(NetlinkMessage<T>, SocketAddr)>>,
unsolicited_messages_tx: Option<UnboundedSender<NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>>>,

socket_closed: bool,
}
Expand All @@ -65,7 +66,7 @@ where
{
pub(crate) fn new(
requests_rx: UnboundedReceiver<Request<T>>,
unsolicited_messages_tx: UnboundedSender<(NetlinkMessage<T>, SocketAddr)>,
unsolicited_messages_tx: UnboundedSender<NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>>,
protocol: isize,
) -> io::Result<Self> {
let socket = S::new(protocol)?;
Expand Down Expand Up @@ -131,10 +132,14 @@ where
loop {
trace!("polling socket");
match socket.as_mut().poll_next(cx) {
Poll::Ready(Some((message, addr))) => {
Poll::Ready(Some(NetlinkEvent::Message((message, addr)))) => {
trace!("read datagram from socket");
self.protocol.handle_message(message, addr);
}
Poll::Ready(Some(NetlinkEvent::Overrun)) => {
warn!("netlink socket buffer full");
self.protocol.handle_buffer_full();
}
Poll::Ready(None) => {
warn!("netlink socket stream shut down");
self.socket_closed = true;
Expand Down Expand Up @@ -165,11 +170,13 @@ where

pub fn forward_unsolicited_messages(&mut self) {
if self.unsolicited_messages_tx.is_none() {
while let Some((message, source)) = self.protocol.incoming_requests.pop_front() {
warn!(
"ignoring unsolicited message {:?} from {:?}",
message, source
);
while let Some(event) = self.protocol.incoming_requests.pop_front() {
match event {
NetlinkEvent::Message((message, source)) => {
warn!("ignoring unsolicited message {message:?} from {source:?}")
}
NetlinkEvent::Overrun => warn!("ignoring unsolicited socket overrun"),
}
}
return;
}
Expand All @@ -183,11 +190,11 @@ where
..
} = self;

while let Some((message, source)) = protocol.incoming_requests.pop_front() {
while let Some(event) = protocol.incoming_requests.pop_front() {
if unsolicited_messages_tx
.as_mut()
.unwrap()
.unbounded_send((message, source))
.unbounded_send(event)
.is_err()
{
// The channel is unbounded so the only error that can
Expand Down
30 changes: 27 additions & 3 deletions netlink-proto/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ use crate::{
codecs::NetlinkMessageCodec,
sys::{AsyncSocket, SocketAddr},
};
use netlink_packet_core::{NetlinkDeserializable, NetlinkMessage, NetlinkSerializable};
use netlink_packet_core::{
NetlinkDeserializable,
NetlinkEvent,
NetlinkMessage,
NetlinkSerializable,
};

pub struct NetlinkFramed<T, S, C> {
socket: S,
Expand All @@ -38,7 +43,7 @@ where
S: AsyncSocket,
C: NetlinkMessageCodec,
{
type Item = (NetlinkMessage<T>, SocketAddr);
type Item = NetlinkEvent<(NetlinkMessage<T>, SocketAddr)>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
Expand All @@ -50,7 +55,9 @@ where

loop {
match C::decode::<T>(reader) {
Ok(Some(item)) => return Poll::Ready(Some((item, *in_addr))),
Ok(Some(item)) => {
return Poll::Ready(Some(NetlinkEvent::Message((item, *in_addr))))
}
Ok(None) => {}
Err(e) => {
error!("unrecoverable error in decoder: {:?}", e);
Expand All @@ -63,6 +70,23 @@ where

*in_addr = match ready!(socket.poll_recv_from(cx, reader)) {
Ok(addr) => addr,
// When receiving messages in multicast mode (i.e. we subscribed to
// notifications), the kernel will not wait for us to read datagrams before
// sending more. The receive buffer has a finite size, so once it is full (no
// more message can fit in), new messages will be dropped and recv calls will
// return `ENOBUFS`.
// This needs to be handled for applications to resynchronize with the contents
// of the kernel if necessary.
// We don't need to do anything special:
// - contents of the reader is still valid because we won't have partial messages
// in there anyways (large enough buffer)
// - contents of the socket's internal buffer is still valid because the kernel
// won't put partial data in it
Err(e) if e.raw_os_error() == Some(105) => {
// ENOBUFS
warn!("netlink socket buffer full");
return Poll::Ready(Some(NetlinkEvent::Overrun));
}
Err(e) => {
error!("failed to read from netlink socket: {:?}", e);
return Poll::Ready(None);
Expand Down
Loading

0 comments on commit 58dd138

Please sign in to comment.