Skip to content

Commit

Permalink
refactor: improve minor error handling #22
Browse files Browse the repository at this point in the history
  • Loading branch information
SkuldNorniern committed Mar 21, 2024
1 parent 0e0a33b commit 6ec83d1
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 36 deletions.
36 changes: 23 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ pub mod types;
pub mod utils;

use std::fs::File;
use std::{fmt::Display, process::exit};
use std::{fmt::Display, io, process::exit};

use crate::logger::{Logger, Logstdout};
use crate::net::DeviceError;
use crate::net::NetError;
// use env_logger;::{init, Logger};

use log::{debug, Level, LevelFilter};
Expand All @@ -28,12 +28,12 @@ use log::{debug, Level, LevelFilter};
// };

#[derive(Debug)]
enum FluereError {
pub enum FluereError {
InterfaceNotFound,
DeviceError(DeviceError),
ArgumentParseError(String),
ModeNotSupported(String),
NetworkError(String),
NetworkError(NetError),
IoError(io::Error),
}

impl std::fmt::Display for FluereError {
Expand All @@ -42,17 +42,23 @@ impl std::fmt::Display for FluereError {
FluereError::InterfaceNotFound => write!(f, "Network interface not found."),
FluereError::ArgumentParseError(msg) => write!(f, "Argument parsing error: {}", msg),
FluereError::ModeNotSupported(mode) => write!(f, "Mode not supported: {}", mode),
FluereError::NetworkError(msg) => write!(f, "Network error: {}", msg),
FluereError::DeviceError(err) => err.fmt(f),
FluereError::NetworkError(err) => err.fmt(f),
FluereError::IoError(err) => err.fmt(f),
}
}
}

impl std::error::Error for FluereError {}

impl From<DeviceError> for FluereError {
fn from(err: DeviceError) -> Self {
FluereError::DeviceError(err)
impl From<NetError> for FluereError {
fn from(err: NetError) -> Self {
FluereError::NetworkError(err)
}
}

impl From<io::Error> for FluereError {
fn from(err: io::Error) -> Self {
FluereError::IoError(err)
}
}

Expand Down Expand Up @@ -119,11 +125,15 @@ async fn main() {
debug!("Fluere started");

match mode_type {
Mode::Online => net::online_fluereflow::packet_capture(parems.0).await,
Mode::Offline => net::fluereflow_fileparse(parems.0).await,
Mode::Online => net::online_fluereflow::packet_capture(parems.0)
.await
.expect("Online mode failed"),
Mode::Offline => net::fluereflow_fileparse(parems.0)
.await
.expect("Offline mode failed"),
Mode::Live => net::live_fluereflow::packet_capture(parems.0)
.await
.expect("Error on live mode"),
.expect("Live mode failed"),
Mode::Pcap => net::pcap_capture(parems.0).await,
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/net/live_fluereflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
},
types::{Args, UDFlowKey},
utils::{cur_time_file, fluere_exporter},
FluereError,
};
use std::{
collections::HashMap,
Expand Down Expand Up @@ -43,7 +44,7 @@ const MAX_RECENT_FLOWS: usize = 50;
// This function is the entry point for the live packet capture functionality.
// It takes the command line arguments as input and calls the online_packet_capture function.
// It returns a Result indicating whether the operation was successful.
pub async fn packet_capture(arg: Args) -> Result<(), io::Error> {
pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
debug!("Starting Terminal User Interface");

online_packet_capture(arg).await;
Expand Down
43 changes: 24 additions & 19 deletions src/net/offline_fluereflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,21 @@ use crate::{
},
types::{Args, UDFlowKey},
utils::{cur_time_file, fluere_exporter},
FluereError, NetError,
};

use fluereflow::FluereRecord;
use log::{debug, info, trace};
use pcap::Capture;
use tokio::task;

pub async fn fluereflow_fileparse(arg: Args) {
pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
let csv_file = arg.files.csv.unwrap();
let file_name = arg.files.file.unwrap();
let use_mac = arg.parameters.use_mac.unwrap();
let _flow_timeout = arg.parameters.timeout.unwrap();
let flow_timeout = arg.parameters.timeout.unwrap();

let mut cap = Capture::from_file(file_name).unwrap();
let mut cap = Capture::from_file(file_name).map_err(NetError::from)?;

let file_dir = "./output";
match fs::create_dir_all(<&str>::clone(&file_dir)) {
Expand Down Expand Up @@ -82,21 +83,6 @@ pub async fn fluereflow_fileparse(arg: Args) {
},
Some(_) => false,
};
/*let is_reverse = if active_flow.contains_key(&key_value) {
false
} else if active_flow.contains_key(&reverse_key) {
true
} else {
if flowdata.get_prot() != 6 && flags.syn > 0 {
active_flow.insert(key_value, flowdata);
if verbose >= 2 {
println!("flow established");
}
} else {
continue;
}
false
};*/

let time = parse_microseconds(
packet.header.ts.tv_sec as u64,
Expand Down Expand Up @@ -129,8 +115,26 @@ pub async fn fluereflow_fileparse(arg: Args) {
active_flow.remove(flow_key);
}
}

// Before processing a new packet, check for and handle expired flows
let mut expired_flows = Vec::new();
for (key, flow) in active_flow.iter() {
if flow_timeout > 0 && time > (flow.last + (flow_timeout * 1000)) {
// Assuming flow.last is in microseconds
trace!("flow expired");
trace!("flow data: {:?}", flow);
records.push(*flow);
expired_flows.push(*key);
}
}

// Remove expired flows from the active flows map
// active_flow.retain(|key, _| !expired_flows.contains(key));
for key in expired_flows {
active_flow.remove(&key);
}
}
info!("Captured in {:?}", start.elapsed());
info!("Converted in {:?}", start.elapsed());
let ac_flow_cnt = active_flow.len();
let ended_flow_cnt = records.len();

Expand All @@ -146,4 +150,5 @@ pub async fn fluereflow_fileparse(arg: Args) {

info!("Active flow {:?}", ac_flow_cnt);
info!("Ended flow {:?}", ended_flow_cnt);
Ok(())
}
9 changes: 6 additions & 3 deletions src/net/online_fluereflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
},
types::{Args, UDFlowKey},
utils::{cur_time_file, fluere_exporter},
FluereError, NetError,
};

use fluere_config::Config;
Expand All @@ -35,7 +36,7 @@ use log::{debug, info, trace};
// This function captures packets from a network interface and converts them into NetFlow data.
// It takes the command line arguments as input, which specify the network interface to capture from and other parameters.
// The function runs indefinitely, capturing packets and exporting the captured data to a CSV file.
pub async fn packet_capture(arg: Args) {
pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
let csv_file = arg.files.csv.unwrap();
let use_mac = arg.parameters.use_mac.unwrap();
let interface_name = arg.interface.expect("interface not found");
Expand All @@ -52,8 +53,8 @@ pub async fn packet_capture(arg: Args) {
.await
.expect("Failed to load plugins");

let interface = find_device(interface_name.as_str()).unwrap();
let mut cap_device = CaptureDevice::new(interface.clone()).unwrap();
let interface = find_device(interface_name.as_str())?;
let mut cap_device = CaptureDevice::new(interface.clone()).map_err(NetError::from)?;
let cap = &mut cap_device.capture;

let file_dir = "./output";
Expand Down Expand Up @@ -252,4 +253,6 @@ pub async fn packet_capture(arg: Args) {
drop(plugin_manager);
let result = tasks.await;
info!("Exporting task excutation result: {:?}", result);

Ok(())
}

0 comments on commit 6ec83d1

Please sign in to comment.