Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/load in config file in tui #68

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions rustiflow/src/flows/basic_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::flow::Flow;
enum FlowState {
Established,
FinSent,
FinAcked
FinAcked,
}

/// A basic flow that stores the basic features of a flow.
Expand Down Expand Up @@ -95,16 +95,24 @@ impl BasicFlow {
if packet.fin_flag > 0 {
if forward {
self.state_fwd = FlowState::FinSent;
self.expected_ack_seq_bwd = Some(packet.sequence_number + packet.data_length as u32 + 1);
self.expected_ack_seq_bwd =
Some(packet.sequence_number + packet.data_length as u32 + 1);
} else {
self.state_bwd = FlowState::FinSent;
self.expected_ack_seq_fwd = Some(packet.sequence_number + packet.data_length as u32 + 1);
self.expected_ack_seq_fwd =
Some(packet.sequence_number + packet.data_length as u32 + 1);
}
}

if self.state_bwd == FlowState::FinSent && forward && Some(packet.sequence_number_ack) == self.expected_ack_seq_fwd {
if self.state_bwd == FlowState::FinSent
&& forward
&& Some(packet.sequence_number_ack) == self.expected_ack_seq_fwd
{
self.state_bwd = FlowState::FinAcked;
} else if self.state_fwd == FlowState::FinSent && !forward && Some(packet.sequence_number_ack) == self.expected_ack_seq_bwd {
} else if self.state_fwd == FlowState::FinSent
&& !forward
&& Some(packet.sequence_number_ack) == self.expected_ack_seq_bwd
{
self.state_fwd = FlowState::FinAcked;
}

Expand Down
17 changes: 10 additions & 7 deletions rustiflow/src/flows/custom_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::{basic_flow::BasicFlow, flow::Flow};
/// Represents a Custom Flow, encapsulating various metrics and states of a network flow.
///
/// As an example, this flow has one feature that represents the sum of the inter arrival times of the first 10 packets for both egress and ingress direction.
///
///
/// This struct is made so you can define your own features.
#[derive(Clone)]
pub struct CustomFlow {
Expand All @@ -23,11 +23,11 @@ impl CustomFlow {
fn update_inter_arrival_time_total(&mut self, packet: &PacketFeatures) {
if (self.basic_flow.fwd_packet_count + self.basic_flow.bwd_packet_count) > 10 {
let iat = packet
.timestamp
.signed_duration_since(self.basic_flow.last_timestamp)
.num_nanoseconds()
.unwrap() as f64
/ 1000.0;
.timestamp
.signed_duration_since(self.basic_flow.last_timestamp)
.num_nanoseconds()
.unwrap() as f64
/ 1000.0;

self.inter_arrival_time_total += iat;
}
Expand Down Expand Up @@ -74,7 +74,10 @@ impl Flow for CustomFlow {

fn dump(&self) -> String {
// Add here the dump of the custom flow.
format!("{},{}", self.basic_flow.flow_key, self.inter_arrival_time_total)
format!(
"{},{}",
self.basic_flow.flow_key, self.inter_arrival_time_total
)
}

fn get_features() -> String {
Expand Down
13 changes: 10 additions & 3 deletions rustiflow/src/flows/nf_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ impl NfFlow {
return 0;
}

(self.bwd_last_timestamp.unwrap().signed_duration_since(self.bwd_first_timestamp.unwrap()))
(self
.bwd_last_timestamp
.unwrap()
.signed_duration_since(self.bwd_first_timestamp.unwrap()))
.num_milliseconds()
}

Expand Down Expand Up @@ -185,10 +188,14 @@ impl Flow for NfFlow {
{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},\
{},{},{},{},{},{},{},{},{},{},{},{},{}",
self.cic_flow.basic_flow.protocol,
self.last_timestamp.signed_duration_since(self.first_timestamp).num_milliseconds(),
self.last_timestamp
.signed_duration_since(self.first_timestamp)
.num_milliseconds(),
self.cic_flow.basic_flow.fwd_packet_count + self.cic_flow.basic_flow.bwd_packet_count,
self.cic_flow.fwd_pkt_len_tot + self.cic_flow.bwd_pkt_len_tot,
self.fwd_last_timestamp.signed_duration_since(self.fwd_first_timestamp).num_milliseconds(),
self.fwd_last_timestamp
.signed_duration_since(self.fwd_first_timestamp)
.num_milliseconds(),
self.cic_flow.basic_flow.fwd_packet_count,
self.cic_flow.fwd_pkt_len_tot,
self.get_bwd_duration(),
Expand Down
6 changes: 3 additions & 3 deletions rustiflow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ async fn run_with_config(config: Config) {
});
debug!("OutputWriter task finished");
});

debug!("Starting realtime processing...");
let start = Instant::now();
let result = handle_realtime::<$flow_ty>(
Expand All @@ -143,7 +143,7 @@ async fn run_with_config(config: Config) {
"Duration: {:.4} seconds",
end.duration_since(start).as_secs_f64()
);

// Now process the result and print the dropped packets
match result {
Ok(dropped_packets) => {
Expand Down Expand Up @@ -238,4 +238,4 @@ async fn run_with_config(config: Config) {
}
}
}
}
}
28 changes: 15 additions & 13 deletions rustiflow/src/packet_features.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@ use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use chrono::{DateTime, Utc};
use common::{EbpfEventIpv4, EbpfEventIpv6};
use log::debug;
use pnet::packet::{icmp::IcmpPacket, ip::{IpNextHeaderProtocol, IpNextHeaderProtocols}, ipv4::Ipv4Packet, ipv6::Ipv6Packet, tcp::TcpPacket, udp::UdpPacket, Packet};
use pnet::packet::{
icmp::IcmpPacket,
ip::{IpNextHeaderProtocol, IpNextHeaderProtocols},
ipv4::Ipv4Packet,
ipv6::Ipv6Packet,
tcp::TcpPacket,
udp::UdpPacket,
Packet,
};

// Define TCP flags
const FIN_FLAG: u8 = 0b00000001;
Expand Down Expand Up @@ -92,10 +100,7 @@ impl PacketFeatures {
}

// Constructor to create PacketFeatures from an IPv4 packet
pub fn from_ipv4_packet(
packet: &Ipv4Packet,
timestamp: DateTime<Utc>,
) -> Option<Self> {
pub fn from_ipv4_packet(packet: &Ipv4Packet, timestamp: DateTime<Utc>) -> Option<Self> {
extract_packet_features_transport(
packet.get_source().into(),
packet.get_destination().into(),
Expand All @@ -107,10 +112,7 @@ impl PacketFeatures {
}

// Constructor to create PacketFeatures from an IPv6 packet
pub fn from_ipv6_packet(
packet: &Ipv6Packet,
timestamp: DateTime<Utc>,
) -> Option<Self> {
pub fn from_ipv6_packet(packet: &Ipv6Packet, timestamp: DateTime<Utc>) -> Option<Self> {
extract_packet_features_transport(
packet.get_source().into(),
packet.get_destination().into(),
Expand Down Expand Up @@ -233,8 +235,8 @@ fn extract_packet_features_transport(
data_length: udp_packet.payload().len() as u16,
header_length: 8, // Fixed header size for UDP
length: total_length,
window_size: 0, // No window size for UDP
sequence_number: 0, // No sequence number for UDP
window_size: 0, // No window size for UDP
sequence_number: 0, // No sequence number for UDP
sequence_number_ack: 0, // No sequence number ACK for UDP
})
}
Expand All @@ -258,8 +260,8 @@ fn extract_packet_features_transport(
data_length: icmp_packet.payload().len() as u16,
header_length: 8, // Fixed header size for ICMP
length: total_length,
window_size: 0, // No window size for ICMP
sequence_number: 0, // No sequence number for ICMP
window_size: 0, // No window size for ICMP
sequence_number: 0, // No sequence number for ICMP
sequence_number_ack: 0, // No sequence number ACK for ICMP
})
}
Expand Down
49 changes: 40 additions & 9 deletions rustiflow/src/pcap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::{flow_table::FlowTable, packet_features::PacketFeatures};
use chrono::{DateTime, Utc};
use log::{debug, error};
use pnet::packet::{
ethernet::{EtherTypes, EthernetPacket}, ipv4::Ipv4Packet, ipv6::Ipv6Packet, Packet
ethernet::{EtherTypes, EthernetPacket},
ipv4::Ipv4Packet,
ipv6::Ipv6Packet,
Packet,
};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -61,26 +64,54 @@ where
match ethernet.get_ethertype() {
EtherTypes::Ipv4 => {
if let Some(packet) = Ipv4Packet::new(ethernet.payload()) {
process_packet::<T, Ipv4Packet>(&packet, timestamp, &shard_senders, num_threads, PacketFeatures::from_ipv4_packet).await;
}
},
process_packet::<T, Ipv4Packet>(
&packet,
timestamp,
&shard_senders,
num_threads,
PacketFeatures::from_ipv4_packet,
)
.await;
}
}
EtherTypes::Ipv6 => {
if let Some(packet) = Ipv6Packet::new(ethernet.payload()) {
process_packet::<T, Ipv6Packet>(&packet, timestamp, &shard_senders, num_threads, PacketFeatures::from_ipv6_packet).await;
}
},
process_packet::<T, Ipv6Packet>(
&packet,
timestamp,
&shard_senders,
num_threads,
PacketFeatures::from_ipv6_packet,
)
.await;
}
}
_ => {
// Check if it is a Linux cooked capture
let ethertype = u16::from_be_bytes([packet.data[14], packet.data[15]]);
match ethertype {
SLL_IPV4 => {
if let Some(packet) = Ipv4Packet::new(&packet.data[16..]) {
process_packet::<T, Ipv4Packet>(&packet, timestamp, &shard_senders, num_threads, PacketFeatures::from_ipv4_packet).await;
process_packet::<T, Ipv4Packet>(
&packet,
timestamp,
&shard_senders,
num_threads,
PacketFeatures::from_ipv4_packet,
)
.await;
}
}
SLL_IPV6 => {
if let Some(packet) = Ipv6Packet::new(&packet.data[16..]) {
process_packet::<T, Ipv6Packet>(&packet, timestamp, &shard_senders, num_threads, PacketFeatures::from_ipv6_packet).await;
process_packet::<T, Ipv6Packet>(
&packet,
timestamp,
&shard_senders,
num_threads,
PacketFeatures::from_ipv6_packet,
)
.await;
}
}
_ => debug!("Failed to parse packet as IPv4 or IPv6..."),
Expand Down
Loading
Loading