From 6ec83d1bd03a300eea25bcf0cd20f9378be1679c Mon Sep 17 00:00:00 2001 From: SkuldNorniern Date: Thu, 21 Mar 2024 12:16:42 +0900 Subject: [PATCH] refactor: improve minor error handling #22 --- src/main.rs | 36 ++++++++++++++++++---------- src/net/live_fluereflow.rs | 3 ++- src/net/offline_fluereflows.rs | 43 +++++++++++++++++++--------------- src/net/online_fluereflow.rs | 9 ++++--- 4 files changed, 55 insertions(+), 36 deletions(-) diff --git a/src/main.rs b/src/main.rs index a1d1d0c..32cd511 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,10 +11,10 @@ pub mod types; pub mod utils; use std::fs::File; -use std::{fmt::Display, process::exit}; +use std::{fmt::Display, io, process::exit}; use crate::logger::{Logger, Logstdout}; -use crate::net::DeviceError; +use crate::net::NetError; // use env_logger;::{init, Logger}; use log::{debug, Level, LevelFilter}; @@ -28,12 +28,12 @@ use log::{debug, Level, LevelFilter}; // }; #[derive(Debug)] -enum FluereError { +pub enum FluereError { InterfaceNotFound, - DeviceError(DeviceError), ArgumentParseError(String), ModeNotSupported(String), - NetworkError(String), + NetworkError(NetError), + IoError(io::Error), } impl std::fmt::Display for FluereError { @@ -42,17 +42,23 @@ impl std::fmt::Display for FluereError { FluereError::InterfaceNotFound => write!(f, "Network interface not found."), FluereError::ArgumentParseError(msg) => write!(f, "Argument parsing error: {}", msg), FluereError::ModeNotSupported(mode) => write!(f, "Mode not supported: {}", mode), - FluereError::NetworkError(msg) => write!(f, "Network error: {}", msg), - FluereError::DeviceError(err) => err.fmt(f), + FluereError::NetworkError(err) => err.fmt(f), + FluereError::IoError(err) => err.fmt(f), } } } impl std::error::Error for FluereError {} -impl From for FluereError { - fn from(err: DeviceError) -> Self { - FluereError::DeviceError(err) +impl From for FluereError { + fn from(err: NetError) -> Self { + FluereError::NetworkError(err) + } +} + +impl From for FluereError { + fn from(err: io::Error) -> Self { + FluereError::IoError(err) } } @@ -119,11 +125,15 @@ async fn main() { debug!("Fluere started"); match mode_type { - Mode::Online => net::online_fluereflow::packet_capture(parems.0).await, - Mode::Offline => net::fluereflow_fileparse(parems.0).await, + Mode::Online => net::online_fluereflow::packet_capture(parems.0) + .await + .expect("Online mode failed"), + Mode::Offline => net::fluereflow_fileparse(parems.0) + .await + .expect("Offline mode failed"), Mode::Live => net::live_fluereflow::packet_capture(parems.0) .await - .expect("Error on live mode"), + .expect("Live mode failed"), Mode::Pcap => net::pcap_capture(parems.0).await, } } else { diff --git a/src/net/live_fluereflow.rs b/src/net/live_fluereflow.rs index f99e2d3..78c3a5f 100644 --- a/src/net/live_fluereflow.rs +++ b/src/net/live_fluereflow.rs @@ -11,6 +11,7 @@ use crate::{ }, types::{Args, UDFlowKey}, utils::{cur_time_file, fluere_exporter}, + FluereError, }; use std::{ collections::HashMap, @@ -43,7 +44,7 @@ const MAX_RECENT_FLOWS: usize = 50; // This function is the entry point for the live packet capture functionality. // It takes the command line arguments as input and calls the online_packet_capture function. // It returns a Result indicating whether the operation was successful. -pub async fn packet_capture(arg: Args) -> Result<(), io::Error> { +pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { debug!("Starting Terminal User Interface"); online_packet_capture(arg).await; diff --git a/src/net/offline_fluereflows.rs b/src/net/offline_fluereflows.rs index f84ebee..7a477a2 100644 --- a/src/net/offline_fluereflows.rs +++ b/src/net/offline_fluereflows.rs @@ -8,6 +8,7 @@ use crate::{ }, types::{Args, UDFlowKey}, utils::{cur_time_file, fluere_exporter}, + FluereError, NetError, }; use fluereflow::FluereRecord; @@ -15,13 +16,13 @@ use log::{debug, info, trace}; use pcap::Capture; use tokio::task; -pub async fn fluereflow_fileparse(arg: Args) { +pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { let csv_file = arg.files.csv.unwrap(); let file_name = arg.files.file.unwrap(); let use_mac = arg.parameters.use_mac.unwrap(); - let _flow_timeout = arg.parameters.timeout.unwrap(); + let flow_timeout = arg.parameters.timeout.unwrap(); - let mut cap = Capture::from_file(file_name).unwrap(); + let mut cap = Capture::from_file(file_name).map_err(NetError::from)?; let file_dir = "./output"; match fs::create_dir_all(<&str>::clone(&file_dir)) { @@ -82,21 +83,6 @@ pub async fn fluereflow_fileparse(arg: Args) { }, Some(_) => false, }; - /*let is_reverse = if active_flow.contains_key(&key_value) { - false - } else if active_flow.contains_key(&reverse_key) { - true - } else { - if flowdata.get_prot() != 6 && flags.syn > 0 { - active_flow.insert(key_value, flowdata); - if verbose >= 2 { - println!("flow established"); - } - } else { - continue; - } - false - };*/ let time = parse_microseconds( packet.header.ts.tv_sec as u64, @@ -129,8 +115,26 @@ pub async fn fluereflow_fileparse(arg: Args) { active_flow.remove(flow_key); } } + + // Before processing a new packet, check for and handle expired flows + let mut expired_flows = Vec::new(); + for (key, flow) in active_flow.iter() { + if flow_timeout > 0 && time > (flow.last + (flow_timeout * 1000)) { + // Assuming flow.last is in microseconds + trace!("flow expired"); + trace!("flow data: {:?}", flow); + records.push(*flow); + expired_flows.push(*key); + } + } + + // Remove expired flows from the active flows map + // active_flow.retain(|key, _| !expired_flows.contains(key)); + for key in expired_flows { + active_flow.remove(&key); + } } - info!("Captured in {:?}", start.elapsed()); + info!("Converted in {:?}", start.elapsed()); let ac_flow_cnt = active_flow.len(); let ended_flow_cnt = records.len(); @@ -146,4 +150,5 @@ pub async fn fluereflow_fileparse(arg: Args) { info!("Active flow {:?}", ac_flow_cnt); info!("Ended flow {:?}", ended_flow_cnt); + Ok(()) } diff --git a/src/net/online_fluereflow.rs b/src/net/online_fluereflow.rs index 9af7525..a0e6f15 100644 --- a/src/net/online_fluereflow.rs +++ b/src/net/online_fluereflow.rs @@ -18,6 +18,7 @@ use crate::{ }, types::{Args, UDFlowKey}, utils::{cur_time_file, fluere_exporter}, + FluereError, NetError, }; use fluere_config::Config; @@ -35,7 +36,7 @@ use log::{debug, info, trace}; // This function captures packets from a network interface and converts them into NetFlow data. // It takes the command line arguments as input, which specify the network interface to capture from and other parameters. // The function runs indefinitely, capturing packets and exporting the captured data to a CSV file. -pub async fn packet_capture(arg: Args) { +pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { let csv_file = arg.files.csv.unwrap(); let use_mac = arg.parameters.use_mac.unwrap(); let interface_name = arg.interface.expect("interface not found"); @@ -52,8 +53,8 @@ pub async fn packet_capture(arg: Args) { .await .expect("Failed to load plugins"); - let interface = find_device(interface_name.as_str()).unwrap(); - let mut cap_device = CaptureDevice::new(interface.clone()).unwrap(); + let interface = find_device(interface_name.as_str())?; + let mut cap_device = CaptureDevice::new(interface.clone()).map_err(NetError::from)?; let cap = &mut cap_device.capture; let file_dir = "./output"; @@ -252,4 +253,6 @@ pub async fn packet_capture(arg: Args) { drop(plugin_manager); let result = tasks.await; info!("Exporting task excutation result: {:?}", result); + + Ok(()) }