Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Add support of mptcp path manager interface
Browse files Browse the repository at this point in the history
Provide functions mimicing:
 * `ip mptcp endpoint show`
 * `ip mptcp limits show`

Example (dump_mptcp.rs) been manually tested on CentOS stream 9 after command:

```bash
sudo sysctl -w net.mptcp.enabled 1
sudo ip mptcp  limits set subflow 1 add_addr_accepted 1
sudo ip netns exec mptcp ip mptcp endpoint add 198.51.100.1 \
    dev eth1 signal
```

Integration test case included and enabled.

Signed-off-by: Gris Ge <cnfourt@gmail.com>
  • Loading branch information
cathay4t committed May 22, 2022
1 parent f21ddb2 commit 1903b39
Show file tree
Hide file tree
Showing 21 changed files with 856 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,9 @@ jobs:
run: |
cd ethtool
cargo test
- name: test (mptcp-pm)
env:
# Needed root permission to modify MPTCP
CARGO_TARGET_X86_64_UNKNOWN_LINUX_GNU_RUNNER: "sudo -E"
run: cargo test -p mptcp-pm
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"genetlink",
"rtnetlink",
"audit",
"mptcp-pm",
]

# omit fuzz projects
Expand All @@ -35,6 +36,7 @@ default-members = [
"genetlink",
"rtnetlink",
"audit",
"mptcp-pm",
]

[patch.crates-io]
Expand All @@ -50,3 +52,4 @@ genetlink = { path = "genetlink" }
rtnetlink = { path = "rtnetlink" }
audit = { path = "audit" }
ethtool = { path = "ethtool" }
mptcp-pm = { path = "mptcp-pm" }
39 changes: 39 additions & 0 deletions mptcp-pm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
[package]
name = "mptcp-pm"
version = "0.1.0"
authors = ["Gris Ge <fge@redhat.com>"]
license = "MIT"
edition = "2018"
description = "Linux kernel MPTCP path manager netlink Library"
keywords = ["network"]
categories = ["network-programming", "os"]
readme = "../README.md"

[lib]
name = "mptcp_pm"
path = "src/lib.rs"
crate-type = ["lib"]

[features]
default = ["tokio_socket"]
tokio_socket = ["netlink-proto/tokio_socket", "tokio"]
smol_socket = ["netlink-proto/smol_socket", "async-std"]

[dependencies]
anyhow = "1.0.44"
async-std = { version = "1.9.0", optional = true}
byteorder = "1.4.3"
futures = "0.3.17"
genetlink = { default-features = false, version = "0.2.1"}
log = "0.4.14"
netlink-packet-core = "0.4.0"
netlink-packet-generic = "0.3.0"
netlink-packet-utils = "0.5"
netlink-proto = { default-features = false, version = "0.9.0" }
netlink-sys = "0.8.0"
thiserror = "1.0.29"
tokio = { version = "1.0.1", features = ["rt"], optional = true}

[dev-dependencies]
tokio = { version = "1.11.0", features = ["macros", "rt", "rt-multi-thread"] }
env_logger = "0.9.0"
1 change: 1 addition & 0 deletions mptcp-pm/LICENSE-MIT
1 change: 1 addition & 0 deletions mptcp-pm/README.md
38 changes: 38 additions & 0 deletions mptcp-pm/examples/dump_mptcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// SPDX-License-Identifier: MIT

use futures::stream::TryStreamExt;

fn main() {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_io()
.build()
.unwrap();
rt.block_on(get_addresses());
}

async fn get_addresses() {
let (connection, handle, _) = mptcp_pm::new_connection().unwrap();
tokio::spawn(connection);

let mut address_handle = handle.address().get().execute().await;

let mut msgs = Vec::new();
while let Some(msg) = address_handle.try_next().await.unwrap() {
msgs.push(msg);
}
assert!(!msgs.is_empty());
for msg in msgs {
println!("{:?}", msg);
}

let mut limits_handle = handle.limits().get().execute().await;

let mut msgs = Vec::new();
while let Some(msg) = limits_handle.try_next().await.unwrap() {
msgs.push(msg);
}
assert!(!msgs.is_empty());
for msg in msgs {
println!("{:?}", msg);
}
}
167 changes: 167 additions & 0 deletions mptcp-pm/src/address/attr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// SPDX-License-Identifier: MIT

use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

use anyhow::Context;
use byteorder::{ByteOrder, NativeEndian};
use netlink_packet_utils::{
nla::{DefaultNla, Nla, NlaBuffer},
parsers::{parse_i32, parse_ip, parse_u16, parse_u32, parse_u8},
DecodeError,
Emitable,
Parseable,
};

const MPTCP_PM_ADDR_ATTR_FAMILY: u16 = 1;
const MPTCP_PM_ADDR_ATTR_ID: u16 = 2;
const MPTCP_PM_ADDR_ATTR_ADDR4: u16 = 3;
const MPTCP_PM_ADDR_ATTR_ADDR6: u16 = 4;
const MPTCP_PM_ADDR_ATTR_PORT: u16 = 5;
const MPTCP_PM_ADDR_ATTR_FLAGS: u16 = 6;
const MPTCP_PM_ADDR_ATTR_IF_IDX: u16 = 7;

const MPTCP_PM_ADDR_FLAG_SIGNAL: u32 = 1 << 0;
const MPTCP_PM_ADDR_FLAG_SUBFLOW: u32 = 1 << 1;
const MPTCP_PM_ADDR_FLAG_BACKUP: u32 = 1 << 2;
const MPTCP_PM_ADDR_FLAG_FULLMESH: u32 = 1 << 3;
const MPTCP_PM_ADDR_FLAG_IMPLICIT: u32 = 1 << 4;

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum MptcpPathManagerAddressAttrFlag {
Signal,
Subflow,
Backup,
Fullmesh,
Implicit,
Other(u32),
}

fn u32_to_vec_flags(value: u32) -> Vec<MptcpPathManagerAddressAttrFlag> {
let mut ret = Vec::new();
let mut found = 0u32;
if (value & MPTCP_PM_ADDR_FLAG_SIGNAL) > 0 {
found += MPTCP_PM_ADDR_FLAG_SIGNAL;
ret.push(MptcpPathManagerAddressAttrFlag::Signal);
}
if (value & MPTCP_PM_ADDR_FLAG_SUBFLOW) > 0 {
found += MPTCP_PM_ADDR_FLAG_SUBFLOW;
ret.push(MptcpPathManagerAddressAttrFlag::Subflow);
}
if (value & MPTCP_PM_ADDR_FLAG_BACKUP) > 0 {
found += MPTCP_PM_ADDR_FLAG_BACKUP;
ret.push(MptcpPathManagerAddressAttrFlag::Backup);
}
if (value & MPTCP_PM_ADDR_FLAG_FULLMESH) > 0 {
found += MPTCP_PM_ADDR_FLAG_FULLMESH;
ret.push(MptcpPathManagerAddressAttrFlag::Fullmesh);
}
if (value & MPTCP_PM_ADDR_FLAG_IMPLICIT) > 0 {
found += MPTCP_PM_ADDR_FLAG_IMPLICIT;
ret.push(MptcpPathManagerAddressAttrFlag::Implicit);
}
if (value - found) > 0 {
ret.push(MptcpPathManagerAddressAttrFlag::Other(value - found));
}
ret
}

impl From<&MptcpPathManagerAddressAttrFlag> for u32 {
fn from(v: &MptcpPathManagerAddressAttrFlag) -> u32 {
match v {
MptcpPathManagerAddressAttrFlag::Signal => MPTCP_PM_ADDR_FLAG_SIGNAL,
MptcpPathManagerAddressAttrFlag::Subflow => MPTCP_PM_ADDR_FLAG_SUBFLOW,
MptcpPathManagerAddressAttrFlag::Backup => MPTCP_PM_ADDR_FLAG_BACKUP,
MptcpPathManagerAddressAttrFlag::Fullmesh => MPTCP_PM_ADDR_FLAG_FULLMESH,
MptcpPathManagerAddressAttrFlag::Implicit => MPTCP_PM_ADDR_FLAG_IMPLICIT,
MptcpPathManagerAddressAttrFlag::Other(d) => *d,
}
}
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum MptcpPathManagerAddressAttr {
Family(u16),
Id(u8),
Addr4(Ipv4Addr),
Addr6(Ipv6Addr),
Port(u16),
Flags(Vec<MptcpPathManagerAddressAttrFlag>),
IfIndex(i32),
Other(DefaultNla),
}

impl Nla for MptcpPathManagerAddressAttr {
fn value_len(&self) -> usize {
match self {
Self::Family(_) | Self::Port(_) => 2,
Self::Addr4(_) | Self::Flags(_) | Self::IfIndex(_) => 4,
Self::Id(_) => 1,
Self::Addr6(_) => 16,
Self::Other(attr) => attr.value_len(),
}
}

fn kind(&self) -> u16 {
match self {
Self::Family(_) => MPTCP_PM_ADDR_ATTR_FAMILY,
Self::Id(_) => MPTCP_PM_ADDR_ATTR_ID,
Self::Addr4(_) => MPTCP_PM_ADDR_ATTR_ADDR4,
Self::Addr6(_) => MPTCP_PM_ADDR_ATTR_ADDR6,
Self::Port(_) => MPTCP_PM_ADDR_ATTR_PORT,
Self::Flags(_) => MPTCP_PM_ADDR_ATTR_FLAGS,
Self::IfIndex(_) => MPTCP_PM_ADDR_ATTR_IF_IDX,
Self::Other(attr) => attr.kind(),
}
}

fn emit_value(&self, buffer: &mut [u8]) {
match self {
Self::Family(d) | Self::Port(d) => NativeEndian::write_u16(buffer, *d),
Self::Addr4(i) => buffer.copy_from_slice(&i.octets()),
Self::Addr6(i) => buffer.copy_from_slice(&i.octets()),
Self::Id(d) => buffer[0] = *d,
Self::Flags(flags) => {
let mut value = 0u32;
for flag in flags {
value += u32::from(flag);
}
NativeEndian::write_u32(buffer, value)
}
Self::IfIndex(d) => NativeEndian::write_i32(buffer, *d),
Self::Other(ref attr) => attr.emit(buffer),
}
}
}

impl<'a, T: AsRef<[u8]> + ?Sized> Parseable<NlaBuffer<&'a T>> for MptcpPathManagerAddressAttr {
fn parse(buf: &NlaBuffer<&'a T>) -> Result<Self, DecodeError> {
let payload = buf.value();
Ok(match buf.kind() {
MPTCP_PM_ADDR_ATTR_FAMILY => {
let err_msg = format!("Invalid MPTCP_PM_ADDR_ATTR_FAMILY value {:?}", payload);
Self::Family(parse_u16(payload).context(err_msg)?)
}
MPTCP_PM_ADDR_ATTR_ID => {
Self::Id(parse_u8(payload).context("Invalid MPTCP_PM_ADDR_ATTR_ID value")?)
}
MPTCP_PM_ADDR_ATTR_ADDR4 | MPTCP_PM_ADDR_ATTR_ADDR6 => {
match parse_ip(payload)
.context("Invalid MPTCP_PM_ADDR_ATTR_ADDR4/MPTCP_PM_ADDR_ATTR_ADDR6 value")?
{
IpAddr::V4(i) => Self::Addr4(i),
IpAddr::V6(i) => Self::Addr6(i),
}
}
MPTCP_PM_ADDR_ATTR_PORT => {
Self::Port(parse_u16(payload).context("Invalid MPTCP_PM_ADDR_ATTR_PORT value")?)
}
MPTCP_PM_ADDR_ATTR_FLAGS => Self::Flags(u32_to_vec_flags(
parse_u32(payload).context("Invalid MPTCP_PM_ADDR_ATTR_FLAGS value")?,
)),
MPTCP_PM_ADDR_ATTR_IF_IDX => Self::IfIndex(
parse_i32(payload).context("Invalid MPTCP_PM_ADDR_ATTR_IF_IDX value")?,
),
_ => Self::Other(DefaultNla::parse(buf).context("invalid NLA (unknown kind)")?),
})
}
}
31 changes: 31 additions & 0 deletions mptcp-pm/src/address/get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// SPDX-License-Identifier: MIT

use futures::TryStream;
use netlink_packet_generic::GenlMessage;

use crate::{
mptcp_execute,
MptcpPathManagerError,
MptcpPathManagerHandle,
MptcpPathManagerMessage,
};

pub struct MptcpPathManagerAddressGetRequest {
handle: MptcpPathManagerHandle,
}

impl MptcpPathManagerAddressGetRequest {
pub(crate) fn new(handle: MptcpPathManagerHandle) -> Self {
MptcpPathManagerAddressGetRequest { handle }
}

pub async fn execute(
self,
) -> impl TryStream<Ok = GenlMessage<MptcpPathManagerMessage>, Error = MptcpPathManagerError>
{
let MptcpPathManagerAddressGetRequest { mut handle } = self;

let mptcp_msg = MptcpPathManagerMessage::new_address_get();
mptcp_execute(&mut handle, mptcp_msg).await
}
}
17 changes: 17 additions & 0 deletions mptcp-pm/src/address/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// SPDX-License-Identifier: MIT

use crate::{MptcpPathManagerAddressGetRequest, MptcpPathManagerHandle};

pub struct MptcpPathManagerAddressHandle(MptcpPathManagerHandle);

impl MptcpPathManagerAddressHandle {
pub fn new(handle: MptcpPathManagerHandle) -> Self {
MptcpPathManagerAddressHandle(handle)
}

/// Retrieve the multipath-TCP addresses
/// (equivalent to `ip mptcp endpoint show`)
pub fn get(&mut self) -> MptcpPathManagerAddressGetRequest {
MptcpPathManagerAddressGetRequest::new(self.0.clone())
}
}
9 changes: 9 additions & 0 deletions mptcp-pm/src/address/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// SPDX-License-Identifier: MIT

mod attr;
mod get;
mod handle;

pub use attr::{MptcpPathManagerAddressAttr, MptcpPathManagerAddressAttrFlag};
pub use get::MptcpPathManagerAddressGetRequest;
pub use handle::MptcpPathManagerAddressHandle;
34 changes: 34 additions & 0 deletions mptcp-pm/src/connection.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// SPDX-License-Identifier: MIT

use std::io;

use futures::channel::mpsc::UnboundedReceiver;
use genetlink::message::RawGenlMessage;
use netlink_packet_core::NetlinkMessage;
use netlink_proto::Connection;
use netlink_sys::{AsyncSocket, SocketAddr};

use crate::MptcpPathManagerHandle;

#[cfg(feature = "tokio_socket")]
#[allow(clippy::type_complexity)]
pub fn new_connection() -> io::Result<(
Connection<RawGenlMessage>,
MptcpPathManagerHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
)> {
new_connection_with_socket()
}

#[allow(clippy::type_complexity)]
pub fn new_connection_with_socket<S>() -> io::Result<(
Connection<RawGenlMessage, S>,
MptcpPathManagerHandle,
UnboundedReceiver<(NetlinkMessage<RawGenlMessage>, SocketAddr)>,
)>
where
S: AsyncSocket,
{
let (conn, handle, messages) = genetlink::new_connection_with_socket()?;
Ok((conn, MptcpPathManagerHandle::new(handle), messages))
}
Loading

0 comments on commit 1903b39

Please sign in to comment.