diff --git a/cli/src/main.rs b/cli/src/main.rs index fb49d09..272123f 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -16,7 +16,7 @@ use tracing::debug; use crate::essential::{ CliError, info, update_cli }; use crate::install::{ InstallArgs, InstallCommands, install_cortexflow, install_simple_example }; use crate::logs::{ LogsArgs, logs_command }; -use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_identity_events }; +use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_dropped_packets, monitor_identity_events, monitor_latency_metrics }; use crate::policies::{ PoliciesArgs, PoliciesCommands, @@ -109,6 +109,12 @@ async fn args_parser() -> Result<(), CliError> { MonitorCommands::Connections => { let _ = monitor_identity_events().await.map_err(|e| eprintln!("{}",e) )?; } + MonitorCommands::Latencymetrics => { + let _ = monitor_latency_metrics().await.map_err(|e| eprintln!("{}",e) )?; + } + MonitorCommands::Droppedpackets => { + let _ = monitor_dropped_packets().await.map_err(|e| eprintln!("{}",e) )?; + } } Some(Commands::Policies(policies_args)) => { match policies_args.policy_cmd { diff --git a/cli/src/monitoring.rs b/cli/src/monitoring.rs index 75941ac..03480e0 100644 --- a/cli/src/monitoring.rs +++ b/cli/src/monitoring.rs @@ -23,6 +23,14 @@ pub enum MonitorCommands { name = "connections", about = "Monitor the recent connections detected by the identity service" )] Connections, + #[command( + name = "latencymetrics", + about = "Monitor the latency metrics detected by the metrics service" + )] Latencymetrics, + #[command( + name = "droppedpackets", + about = "Monitor the dropped packets metrics detected by the metrics service" + )] Droppedpackets, } // cfcli monitor @@ -140,3 +148,120 @@ pub async fn monitor_identity_events() -> Result<(), Error> { Ok(()) } + +pub async fn monitor_latency_metrics() -> Result<(), Error> { + //function to monitor latency metrics + println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white()); + + match connect_to_client().await { + Ok(client) => { + println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green()); + //send request to get latency metrics + match agent_api::requests::send_latency_metrics_request(client).await { + Ok(response) => { + let resp = response.into_inner(); + if resp.metrics.is_empty() { + println!("{} No latency metrics found", "=====>".blue().bold()); + } else { + println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len()); + for (i, metric) in resp.metrics.iter().enumerate() { + println!( + "index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}", + "=====>".blue().bold(), + i, + metric.tgid, + metric.process_name, + metric.address_family, + metric.delta_us, + metric.src_address_v4, + metric.dst_address_v4, + format!("{:?}", metric.src_address_v6), + format!("{:?}", metric.dst_address_v6), + metric.local_port, + metric.remote_port, + metric.timestamp_us + ); + } + } + } + Err(e) => { + println!( + "{} {} {} {}", + "=====>".blue().bold(), + "An error occured".red(), + "Error:", + e + ); + return Err(e); + } + } + } + Err(e) =>{ + println!( + "{} {}", + "=====>".blue().bold(), + "Failed to connect to CortexFlow Client".red() + ); + return Err(e); + } + } + Ok(()) +} + +pub async fn monitor_dropped_packets() -> Result<(), Error> { + //function to monitor dropped packets metrics + println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white()); + + match connect_to_client().await { + Ok(client) => { + println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green()); + //send request to get dropped packets metrics + match agent_api::requests::send_dropped_packets_request(client).await { + Ok(response) => { + let resp = response.into_inner(); + if resp.metrics.is_empty() { + println!("{} No dropped packets metrics found", "=====>".blue().bold()); + } else { + println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len()); + for (i, metric) in resp.metrics.iter().enumerate() { + println!( + "{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs", + "=====>".blue().bold(), + i, + metric.tgid, + metric.process_name, + metric.sk_drops, + metric.sk_err, + metric.sk_err_soft, + metric.sk_backlog_len, + metric.sk_wmem_queued, + metric.sk_rcvbuf, + metric.sk_ack_backlog, + metric.timestamp_us + ); + } + } + } + Err(e) => { + println!( + "{} {} {} {}", + "=====>".blue().bold(), + "An error occured".red(), + "Error:", + e + ); + return Err(e); + } + } + } + Err(e) =>{ + println!( + "{} {}", + "=====>".blue().bold(), + "Failed to connect to CortexFlow Client".red() + ); + return Err(e); + } + } + Ok(()) +} \ No newline at end of file diff --git a/core/api/protos/agent.proto b/core/api/protos/agent.proto index 9a0ad4e..3cd236b 100644 --- a/core/api/protos/agent.proto +++ b/core/api/protos/agent.proto @@ -3,6 +3,8 @@ package agent; import "google/protobuf/empty.proto"; +// Active connections + message RequestActiveConnections{ optional string pod_ip = 2 ; } @@ -18,9 +20,57 @@ message ActiveConnectionResponse{ repeated ConnectionEvent events = 2; // List of connection events } +// Network metrics + +// Latency metrics request and response messages + +message LatencyMetric { + uint64 delta_us = 1; // Latency in microseconds + uint64 timestamp_us = 2; // Event timestamp + uint32 tgid = 3; // Thread group ID + string process_name = 4; // Process name (comm) + uint32 local_port = 5; // Local port + uint32 remote_port = 6; // Remote port (big-endian) + uint32 address_family = 7; // "IPv4" or "IPv6" + string src_address_v4 = 8; // Source IP address + string dst_address_v4 = 9; // Destination IP address + string src_address_v6 = 10; // Source IPv6 address + string dst_address_v6 = 11; // Destination IPv6 address +} + +message LatencyMetricsResponse { + string status = 1; + repeated LatencyMetric metrics = 2; + uint32 total_count = 3; + double average_latency_us = 4; // Average latency + double min_latency_us = 5; // Minimum latency + double max_latency_us = 6; // Maximum latency +} + +// Dropped TCP Packets + +message DroppedPacketMetric { + uint32 tgid = 1; // Thread group ID + string process_name = 2; // Process name + int32 sk_drops = 3; // Socket drops (from sk_drops field) + int32 sk_err = 4; // Socket errors + int32 sk_err_soft = 5; // Soft errors + uint32 sk_backlog_len = 6; // Backlog length (congestion indicator) + int32 sk_wmem_queued = 7; // Write memory queued + int32 sk_rcvbuf = 8; // Receive buffer size + uint32 sk_ack_backlog = 9; // ACK backlog + uint64 timestamp_us = 10; // Event timestamp +} + +message DroppedPacketsResponse { + string status = 1; + repeated DroppedPacketMetric metrics = 2; + uint32 total_drops = 3; // Total drops across all connections +} + + //declare agent api service Agent{ - // active connections endpoint rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse); @@ -31,6 +81,11 @@ service Agent{ // remove ip from blocklist endpoint rpc RmIpFromBlocklist(RmIpFromBlocklistRequest) returns (RmIpFromBlocklistResponse); + // metrics data + rpc GetLatencyMetrics(google.protobuf.Empty) returns (LatencyMetricsResponse); + + // dropped packets + rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse); } message AddIpToBlocklistRequest{ diff --git a/core/api/src/agent.rs b/core/api/src/agent.rs index c7045f9..c6f5126 100644 --- a/core/api/src/agent.rs +++ b/core/api/src/agent.rs @@ -24,6 +24,103 @@ pub struct ActiveConnectionResponse { pub events: ::prost::alloc::vec::Vec, } #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct LatencyMetric { + /// Latency in microseconds + #[prost(uint64, tag = "1")] + pub delta_us: u64, + /// Event timestamp + #[prost(uint64, tag = "2")] + pub timestamp_us: u64, + /// Thread group ID + #[prost(uint32, tag = "3")] + pub tgid: u32, + /// Process name (comm) + #[prost(string, tag = "4")] + pub process_name: ::prost::alloc::string::String, + /// Local port + #[prost(uint32, tag = "5")] + pub local_port: u32, + /// Remote port (big-endian) + #[prost(uint32, tag = "6")] + pub remote_port: u32, + /// "IPv4" or "IPv6" + #[prost(uint32, tag = "7")] + pub address_family: u32, + /// Source IP address + #[prost(string, tag = "8")] + pub src_address_v4: ::prost::alloc::string::String, + /// Destination IP address + #[prost(string, tag = "9")] + pub dst_address_v4: ::prost::alloc::string::String, + /// Source IPv6 address + #[prost(string, tag = "10")] + pub src_address_v6: ::prost::alloc::string::String, + /// Destination IPv6 address + #[prost(string, tag = "11")] + pub dst_address_v6: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LatencyMetricsResponse { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "3")] + pub total_count: u32, + /// Average latency + #[prost(double, tag = "4")] + pub average_latency_us: f64, + /// Minimum latency + #[prost(double, tag = "5")] + pub min_latency_us: f64, + /// Maximum latency + #[prost(double, tag = "6")] + pub max_latency_us: f64, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DroppedPacketMetric { + /// Thread group ID + #[prost(uint32, tag = "1")] + pub tgid: u32, + /// Process name + #[prost(string, tag = "2")] + pub process_name: ::prost::alloc::string::String, + /// Socket drops (from sk_drops field) + #[prost(int32, tag = "3")] + pub sk_drops: i32, + /// Socket errors + #[prost(int32, tag = "4")] + pub sk_err: i32, + /// Soft errors + #[prost(int32, tag = "5")] + pub sk_err_soft: i32, + /// Backlog length (congestion indicator) + #[prost(uint32, tag = "6")] + pub sk_backlog_len: u32, + /// Write memory queued + #[prost(int32, tag = "7")] + pub sk_wmem_queued: i32, + /// Receive buffer size + #[prost(int32, tag = "8")] + pub sk_rcvbuf: i32, + /// ACK backlog + #[prost(uint32, tag = "9")] + pub sk_ack_backlog: u32, + /// Event timestamp + #[prost(uint64, tag = "10")] + pub timestamp_us: u64, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DroppedPacketsResponse { + #[prost(string, tag = "1")] + pub status: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "2")] + pub metrics: ::prost::alloc::vec::Vec, + /// Total drops across all connections + #[prost(uint32, tag = "3")] + pub total_drops: u32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct AddIpToBlocklistRequest { #[prost(string, optional, tag = "1")] pub ip: ::core::option::Option<::prost::alloc::string::String>, @@ -244,6 +341,56 @@ pub mod agent_client { .insert(GrpcMethod::new("agent.Agent", "RmIpFromBlocklist")); self.inner.unary(req, path, codec).await } + /// metrics data + pub async fn get_latency_metrics( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/agent.Agent/GetLatencyMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("agent.Agent", "GetLatencyMetrics")); + self.inner.unary(req, path, codec).await + } + /// dropped packets + pub async fn get_dropped_packets_metrics( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/agent.Agent/GetDroppedPacketsMetrics", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("agent.Agent", "GetDroppedPacketsMetrics")); + self.inner.unary(req, path, codec).await + } } } /// Generated server implementations. @@ -290,6 +437,22 @@ pub mod agent_server { tonic::Response, tonic::Status, >; + /// metrics data + async fn get_latency_metrics( + &self, + request: tonic::Request<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// dropped packets + async fn get_dropped_packets_metrics( + &self, + request: tonic::Request<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// declare agent api #[derive(Debug)] @@ -543,6 +706,87 @@ pub mod agent_server { }; Box::pin(fut) } + "/agent.Agent/GetLatencyMetrics" => { + #[allow(non_camel_case_types)] + struct GetLatencyMetricsSvc(pub Arc); + impl tonic::server::UnaryService<()> + for GetLatencyMetricsSvc { + type Response = super::LatencyMetricsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_latency_metrics(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetLatencyMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/agent.Agent/GetDroppedPacketsMetrics" => { + #[allow(non_camel_case_types)] + struct GetDroppedPacketsMetricsSvc(pub Arc); + impl tonic::server::UnaryService<()> + for GetDroppedPacketsMetricsSvc { + type Response = super::DroppedPacketsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_dropped_packets_metrics(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetDroppedPacketsMetricsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } _ => { Box::pin(async move { let mut response = http::Response::new( diff --git a/core/api/src/api.rs b/core/api/src/api.rs index 51ff64d..27641b4 100644 --- a/core/api/src/api.rs +++ b/core/api/src/api.rs @@ -1,116 +1,173 @@ #![allow(warnings)] use anyhow::Context; use chrono::Local; +use cortexbrain_common::{ + formatters::{format_ipv4, format_ipv6}, +}; use prost::bytes::BytesMut; -use std::str::FromStr; -use std::{ sync::Mutex }; -use tonic::{ Request, Response, Status }; +use std::{str::FromStr, sync::Arc}; +use std::sync::Mutex; +use tonic::{Request, Response, Status}; use tracing::info; -use aya::{ maps::{ MapData, PerfEventArray }, util::online_cpus }; +use aya::{ + maps::{MapData, PerfEventArray}, + util::online_cpus, +}; use std::result::Result::Ok; use tonic::async_trait; -use std::collections::HashMap; use aya::maps::HashMap as ayaHashMap; +use std::collections::HashMap; use tokio::sync::mpsc; use tokio::task; -use crate::agent::ConnectionEvent; +use crate::{ + agent::{ + ConnectionEvent, DroppedPacketMetric, DroppedPacketsResponse, + LatencyMetric, LatencyMetricsResponse, + }, +}; + +use crate::structs::{NetworkMetrics, PacketLog, TimeStampMetrics}; + // * contains agent api configuration use crate::agent::{ - agent_server::Agent, - ActiveConnectionResponse, - RequestActiveConnections, - AddIpToBlocklistRequest, - BlocklistResponse, - RmIpFromBlocklistRequest, - RmIpFromBlocklistResponse, + agent_server::Agent, ActiveConnectionResponse, AddIpToBlocklistRequest, BlocklistResponse, + RequestActiveConnections, RmIpFromBlocklistRequest, RmIpFromBlocklistResponse, }; +use crate::constants::PIN_BLOCKLIST_MAP_PATH; + +use crate::helpers::comm_to_string; use aya::maps::Map; -use bytemuck_derive::Zeroable; +use cortexbrain_common::constants::BPF_PATH; use cortexflow_identity::enums::IpProtocols; use std::net::Ipv4Addr; use tracing::warn; -#[repr(C)] -#[derive(Clone, Copy, Zeroable)] -pub struct PacketLog { - pub proto: u8, - pub src_ip: u32, - pub src_port: u16, - pub dst_ip: u32, - pub dst_port: u16, - pub pid: u32, -} -unsafe impl aya::Pod for PacketLog {} - pub struct AgentApi { //* event_rx is an istance of a mpsc receiver. //* is used to receive the data from the transmitter (tx) - event_rx: Mutex, Status>>>, - event_tx: mpsc::Sender, Status>>, + active_connection_event_rx: Mutex, Status>>>, + active_connection_event_tx: mpsc::Sender, Status>>, + latency_metrics_rx: Mutex, Status>>>, + latency_metrics_tx: mpsc::Sender, Status>>, + dropped_packet_metrics_rx: Mutex, Status>>>, + dropped_packet_metrics_tx: mpsc::Sender, Status>>, } //* Event sender trait. Takes an event from a map and send that to the mpsc channel //* using the send_map function #[async_trait] pub trait EventSender: Send + Sync + 'static { - async fn send_event(&self, event: Vec); - async fn send_map( + async fn send_active_connection_event(&self, event: Vec); + async fn send_active_connection_event_map( &self, map: Vec, - tx: mpsc::Sender, Status>> + tx: mpsc::Sender, Status>>, ) { let status = Status::new(tonic::Code::Ok, "success"); let event = Ok(map); let _ = tx.send(event).await; } + + async fn send_latency_metrics_event(&self, event: Vec); + async fn send_latency_metrics_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; + } + + async fn send_dropped_packet_metrics_event(&self, event: Vec); + async fn send_dropped_packet_metrics_event_map( + &self, + map: Vec, + tx: mpsc::Sender, Status>>, + ) { + let status = Status::new(tonic::Code::Ok, "success"); + let event = Ok(map); + let _ = tx.send(event).await; + } + } // send event function. takes an HashMap and send that using mpsc event_tx #[async_trait] impl EventSender for AgentApi { - async fn send_event(&self, event: Vec) { - self.send_map(event, self.event_tx.clone()).await; + async fn send_active_connection_event(&self, event: Vec) { + self.send_active_connection_event_map(event, self.active_connection_event_tx.clone()) + .await; + } + + async fn send_latency_metrics_event(&self, event: Vec) { + self.send_latency_metrics_event_map(event, self.latency_metrics_tx.clone()) + .await; } -} -const BPF_PATH: &str = "BPF_PATH"; -const PIN_BLOCKLIST_MAP_PATH: &str = "PIN_BLOCKLIST_MAP_PATH"; + async fn send_dropped_packet_metrics_event(&self, event: Vec) { + self.send_dropped_packet_metrics_event_map(event, self.dropped_packet_metrics_tx.clone()) + .await; + } +} //initialize a default trait for AgentApi. Loads a name and a bpf istance. //this trait is essential for init the Agent. impl Default for AgentApi { //TODO:this part needs a better error handling fn default() -> Self { - // load maps mapdata - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map").expect( - "cannot open events_map Mapdata" - ); - let map = Map::PerfEventArray(mapdata); //creates a PerfEventArray from the mapdata + // load connections maps mapdata + let active_connection_mapdata = MapData::from_pin("/sys/fs/bpf/maps/events_map") + .expect("cannot open events_map Mapdata"); + let active_connection_map = Map::PerfEventArray(active_connection_mapdata); //creates a PerfEventArray from the mapdata + + let mut active_connection_events_array = PerfEventArray::try_from(active_connection_map) + .expect("Error while initializing events array"); + + // load network metrics maps mapdata + let network_metrics_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/net_metrics") + .expect("cannot open net_metrics Mapdata"); + let network_metrics_map = Map::PerfEventArray(network_metrics_mapdata); //creates a PerfEventArray from the mapdata + let mut network_metrics_events_array = PerfEventArray::try_from(network_metrics_map) + .expect("Error while initializing network metrics array"); + + // load time stamp events maps mapdata + let time_stamp_events_mapdata = MapData::from_pin("/sys/fs/bpf/trace_maps/time_stamp_events") + .expect("cannot open time_stamp_events Mapdata"); + let time_stamp_events_map = Map::PerfEventArray(time_stamp_events_mapdata); // + let mut time_stamp_events_array = PerfEventArray::try_from(time_stamp_events_map) + .expect("Error while initializing time stamp events array"); //init a mpsc channel - let (tx, rx) = mpsc::channel(1024); + let (conn_tx, conn_rx) = mpsc::channel(1024); + let (lat_tx, lat_rx) = mpsc::channel(2048); + let (drop_tx, drop_rx) = mpsc::channel(2048); + let api = AgentApi { - event_rx: rx.into(), - event_tx: tx.clone(), + active_connection_event_rx: conn_rx.into(), + active_connection_event_tx: conn_tx.clone(), + latency_metrics_rx: Mutex::new(lat_rx), + latency_metrics_tx: lat_tx.clone(), + dropped_packet_metrics_rx: Mutex::new(drop_rx), + dropped_packet_metrics_tx: drop_tx.clone(), }; - let mut events_array = PerfEventArray::try_from(map).expect( - "Error while initializing events array" - ); + // For network metrics - //spawn an event reader + //spawn an event readers task::spawn(async move { let mut net_events_buffer = Vec::new(); //scan the cpus to read the data for cpu_id in online_cpus() .map_err(|e| anyhow::anyhow!("Error {:?}", e)) - .unwrap() { - let buf = events_array + .unwrap() + { + let buf = active_connection_events_array .open(cpu_id, None) .expect("Error during the creation of net_events_buf structure"); @@ -129,9 +186,8 @@ impl Default for AgentApi { for i in 0..events.read { let data = &buffers[i]; if data.len() >= std::mem::size_of::() { - let pl: PacketLog = unsafe { - std::ptr::read(data.as_ptr() as *const _) - }; + let pl: PacketLog = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; let src = Ipv4Addr::from(u32::from_be(pl.src_ip)); let dst = Ipv4Addr::from(u32::from_be(pl.dst_ip)); let src_port = u16::from_be(pl.src_port as u16); @@ -162,13 +218,12 @@ impl Default for AgentApi { dst_ip_port: format!("{}:{}", dst, dst_port), }); info!("sending events to the MPSC channel"); - let _ = tx.send(Ok(evt)).await; + let _ = conn_tx.send(Ok(evt)).await; } Err(_) => { info!( "Event Id: {} Protocol: Unknown ({})", - event_id, - pl.proto + event_id, pl.proto ); } }; @@ -194,6 +249,165 @@ impl Default for AgentApi { } }); + task::spawn(async move { + let mut net_metrics_buffer = Vec::new(); + + //scan the cpus to read the data + for cpu_id in online_cpus() + .map_err(|e| anyhow::anyhow!("Error {:?}", e)) + .unwrap() + { + let buf = network_metrics_events_array + .open(cpu_id, None) + .expect("Error during the creation of net_metrics_buf structure"); + + let buffers = vec![BytesMut::with_capacity(4096); 8]; + net_metrics_buffer.push((buf, buffers)); + } + + info!("Starting network metrics listener"); + + //send the data through a mpsc channel + loop { + for (buf, buffers) in net_metrics_buffer.iter_mut() { + match buf.read_events(buffers) { + Ok(events) => { + //read the events, this function is similar to the one used in identity/helpers.rs/display_events + if events.read > 0 { + for i in 0..events.read { + let data = &buffers[i]; + if data.len() >= std::mem::size_of::() { + let nm: NetworkMetrics = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; + + let dropped_packet_metrics = DroppedPacketMetric { + tgid: nm.tgid, + process_name: comm_to_string(&nm.comm), + sk_drops: nm.sk_drops, + sk_err: nm.sk_err, + sk_err_soft: nm.sk_err_soft, + sk_backlog_len: nm.sk_backlog_len as u32, + sk_wmem_queued: nm.sk_write_memory_queued, + sk_rcvbuf: nm.sk_receive_buffer_size, + sk_ack_backlog: nm.sk_ack_backlog, + timestamp_us: nm.ts_us, + }; + + if dropped_packet_metrics.sk_drops > 0 { + let mut evt = Vec::new(); + info!( + "Dropped Packet Metric - tgid: {}, process_name: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_wmem_queued: {}, sk_rcvbuf: {}, sk_ack_backlog: {}, timestamp_us: {}", + dropped_packet_metrics.tgid, + dropped_packet_metrics.process_name, + dropped_packet_metrics.sk_drops, + dropped_packet_metrics.sk_err, + dropped_packet_metrics.sk_err_soft, + dropped_packet_metrics.sk_backlog_len, + dropped_packet_metrics.sk_wmem_queued, + dropped_packet_metrics.sk_rcvbuf, + dropped_packet_metrics.sk_ack_backlog, + dropped_packet_metrics.timestamp_us + ); + evt.push(dropped_packet_metrics.clone()); + let _ = drop_tx.send(Ok(evt)).await; + } + } else { + warn!( + "Received network metrics data too small: {} bytes", + data.len() + ); + } + } + } + } + Err(e) => { + eprintln!("Errore nella lettura network metrics eventi: {}", e); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + } + // small delay to avoid cpu congestion + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + }); + + task::spawn(async move { + let mut ts_events_buffer = Vec::new(); + //scan the cpus to read the data + for cpu_id in online_cpus() + .map_err(|e| anyhow::anyhow!("Error {:?}", e)) + .unwrap() + { + let buf = time_stamp_events_array + .open(cpu_id, None) + .expect("Error during the creation of time stamp events buf structure"); + + let buffers = vec![BytesMut::with_capacity(4096); 8]; + ts_events_buffer.push((buf, buffers)); + } + + info!("Starting time stamp events listener"); + + //send the data through a mpsc channel + loop { + for (buf, buffers) in ts_events_buffer.iter_mut() { + match buf.read_events(buffers) { + Ok(events) => { + //read the events, this function is similar to the one used in identity/helpers.rs/display_events + if events.read > 0 { + for i in 0..events.read { + let data = &buffers[i]; + if data.len() >= std::mem::size_of::() { + let tsm: TimeStampMetrics = + unsafe { std::ptr::read(data.as_ptr() as *const _) }; + let latency_metric = LatencyMetric { + delta_us: tsm.delta_us, + timestamp_us: tsm.ts_us, + tgid: tsm.tgid, + process_name: comm_to_string(&tsm.comm), + local_port: tsm.lport as u32, + remote_port: tsm.dport_be as u32, + address_family: tsm.af as u32, + src_address_v4: format_ipv4(tsm.saddr_v4), + dst_address_v4: format_ipv4(tsm.daddr_v4), + src_address_v6: format_ipv6(&tsm.saddr_v6), + dst_address_v6: format_ipv6(&tsm.daddr_v6), + }; + info!( + "Latency Metric - tgid: {}, process_name: {}, delta_us: {}, timestamp_us: {}, local_port: {}, remote_port: {}, address_family: {}, src_address_v4: {}, dst_address_v4: {}, src_address_v6: {}, dst_address_v6: {}", + latency_metric.tgid, + latency_metric.process_name, + latency_metric.delta_us, + latency_metric.timestamp_us, + latency_metric.local_port, + latency_metric.remote_port, + latency_metric.address_family, + latency_metric.src_address_v4, + latency_metric.dst_address_v4, + latency_metric.src_address_v6, + latency_metric.dst_address_v6 + ); + let mut evt = Vec::new(); + evt.push(latency_metric.clone()); + let _ = lat_tx.send(Ok(evt)).await; + } else { + warn!( + "Received time stamp metrics data too small: {} bytes", + data.len() + ); + } + } + } + } + Err(e) => { + eprintln!("Errore nella lettura time stamp eventi: {}", e); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + } + } + }); + api } } @@ -206,7 +420,7 @@ impl Agent for AgentApi { // * active connections. The data are transformed and sent to the api with a mpsc channel async fn active_connections( &self, - request: Request + request: Request, ) -> Result, Status> { //read request let req = request.into_inner(); @@ -215,40 +429,39 @@ impl Agent for AgentApi { let mut aggregated_events: Vec = Vec::new(); //aggregate events - while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() { + while let Ok(evt) = self.active_connection_event_rx.lock().unwrap().try_recv() { if let Ok(vec) = evt { aggregated_events.extend(vec); } } //log response for debugging - info!("DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}", aggregated_events); + info!( + "DEBUGGING RESPONSE FROM ACTIVE CONNECTION REQUEST: {:?}", + aggregated_events + ); //return response - Ok( - Response::new(ActiveConnectionResponse { - status: "success".to_string(), - events: aggregated_events, - }) - ) + Ok(Response::new(ActiveConnectionResponse { + status: "success".to_string(), + events: aggregated_events, + })) } // * creates and add ip to the blocklist async fn add_ip_to_blocklist( &self, - request: Request + request: Request, ) -> Result, Status> { //read request let req = request.into_inner(); //open blocklist map - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect( - "cannot open blocklist_map Mapdata" - ); + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map") + .expect("cannot open blocklist_map Mapdata"); let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata - let mut blocklist_map: ayaHashMap = ayaHashMap - ::try_from(blocklist_mapdata) - .unwrap(); + let mut blocklist_map: ayaHashMap = + ayaHashMap::try_from(blocklist_mapdata).unwrap(); if req.ip.is_none() { // log blocklist event @@ -265,8 +478,7 @@ impl Agent for AgentApi { blocklist_map.insert(u8_4_ip, u8_4_ip, 0); info!("CURRENT BLOCKLIST: {:?}", blocklist_map); } - let path = std::env - ::var(PIN_BLOCKLIST_MAP_PATH) + let path = std::env::var(PIN_BLOCKLIST_MAP_PATH) .context("Blocklist map path not found!") .unwrap(); @@ -281,28 +493,24 @@ impl Agent for AgentApi { } //save ip into the blocklist - Ok( - Response::new(BlocklistResponse { - status: "success".to_string(), - events: converted_blocklist_map, - }) - ) + Ok(Response::new(BlocklistResponse { + status: "success".to_string(), + events: converted_blocklist_map, + })) } async fn check_blocklist( &self, - request: Request<()> + request: Request<()>, ) -> Result, Status> { info!("Returning blocklist hashmap"); //open blocklist map - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect( - "cannot open blocklist_map Mapdata" - ); + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map") + .expect("cannot open blocklist_map Mapdata"); let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata - let blocklist_map: ayaHashMap = ayaHashMap - ::try_from(blocklist_mapdata) - .unwrap(); + let blocklist_map: ayaHashMap = + ayaHashMap::try_from(blocklist_mapdata).unwrap(); //convert the maps with a buffer to match the protobuffer types @@ -314,28 +522,24 @@ impl Agent for AgentApi { let value = Ipv4Addr::from(k).to_string(); converted_blocklist_map.insert(key, value); } - Ok( - Response::new(BlocklistResponse { - status: "success".to_string(), - events: converted_blocklist_map, - }) - ) + Ok(Response::new(BlocklistResponse { + status: "success".to_string(), + events: converted_blocklist_map, + })) } async fn rm_ip_from_blocklist( &self, - request: Request + request: Request, ) -> Result, Status> { //read request let req = request.into_inner(); info!("Removing ip from blocklist map"); //open blocklist map - let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map").expect( - "cannot open blocklist_map Mapdata" - ); + let mapdata = MapData::from_pin("/sys/fs/bpf/maps/blocklist_map") + .expect("cannot open blocklist_map Mapdata"); let blocklist_mapdata = Map::HashMap(mapdata); //load mapdata - let mut blocklist_map: ayaHashMap = ayaHashMap - ::try_from(blocklist_mapdata) - .unwrap(); + let mut blocklist_map: ayaHashMap = + ayaHashMap::try_from(blocklist_mapdata).unwrap(); //remove the address let ip_to_remove = req.ip; let u8_4_ip_to_remove = Ipv4Addr::from_str(&ip_to_remove).unwrap().octets(); @@ -351,11 +555,108 @@ impl Agent for AgentApi { converted_blocklist_map.insert(key, value); } - Ok( - Response::new(RmIpFromBlocklistResponse { - status: "Ip removed from blocklist".to_string(), - events: converted_blocklist_map, - }) - ) + Ok(Response::new(RmIpFromBlocklistResponse { + status: "Ip removed from blocklist".to_string(), + events: converted_blocklist_map, + })) + } + + async fn get_latency_metrics( + &self, + request: Request<()>, + ) -> Result, Status> { + // Extract the request parameters + let req = request.into_inner(); + info!("Getting latency metrics"); + + // Here you would typically query your data source for the latency metrics + // For demonstration purposes, we'll return a dummy response + + let mut aggregated_latency_metrics_events: Vec = Vec::new(); + + while let Ok(evt) = self.latency_metrics_rx.lock().unwrap().try_recv() { + if let Ok(vec) = evt { + aggregated_latency_metrics_events.extend(vec); + } + } + + let total_count = aggregated_latency_metrics_events.len() as u32; + + let (average_latency_us, min_latency_us, max_latency_us) = + if !aggregated_latency_metrics_events.is_empty() { + let sum: u64 = aggregated_latency_metrics_events + .iter() + .map(|m| m.delta_us) + .sum(); + let avg = sum as f64 / aggregated_latency_metrics_events.len() as f64; + + let min = aggregated_latency_metrics_events + .iter() + .map(|m| m.delta_us) + .min() + .unwrap_or(0) as f64; + + let max = aggregated_latency_metrics_events + .iter() + .map(|m| m.delta_us) + .max() + .unwrap_or(0) as f64; + + (avg, min, max) + } else { + (0.0, 0.0, 0.0) + }; + + info!( + "Latency metrics - total_count: {}, average: {:.2}us, min: {:.2}us, max: {:.2}us", + total_count, average_latency_us, min_latency_us, max_latency_us + ); + + let response = LatencyMetricsResponse { + status: "success".to_string(), + metrics: aggregated_latency_metrics_events, + total_count, + average_latency_us, + max_latency_us, + min_latency_us, + }; + + Ok(Response::new(response)) + } + + async fn get_dropped_packets_metrics( + &self, + request: Request<()>, + ) -> Result, Status> { + // Extract the request parameters + let req = request.into_inner(); + info!("Getting dropped packets metrics"); + + let mut aggregated_dropped_packet_metrics: Vec = Vec::new(); + let mut total_drops = 0u32; + + // Collect all metrics from channel + while let Ok(evt) = self.dropped_packet_metrics_rx.lock().unwrap().try_recv() { + if let Ok(vec) = evt { + for metric in vec { + total_drops += metric.sk_drops as u32; + aggregated_dropped_packet_metrics.push(metric); + } + } + } + + info!( + "Dropped packets metrics - total_metrics: {}, total_drops: {}", + aggregated_dropped_packet_metrics.len(), + total_drops + ); + + let response = DroppedPacketsResponse { + status: "success".to_string(), + metrics: aggregated_dropped_packet_metrics, + total_drops, + }; + + Ok(Response::new(response)) } } diff --git a/core/api/src/constants.rs b/core/api/src/constants.rs new file mode 100644 index 0000000..b23bb8d --- /dev/null +++ b/core/api/src/constants.rs @@ -0,0 +1,2 @@ +pub const PIN_BLOCKLIST_MAP_PATH: &str = "PIN_BLOCKLIST_MAP_PATH"; +pub const TASK_COMM_LEN: usize = 16; diff --git a/core/api/src/helpers.rs b/core/api/src/helpers.rs new file mode 100644 index 0000000..c060949 --- /dev/null +++ b/core/api/src/helpers.rs @@ -0,0 +1,6 @@ +use crate::constants::TASK_COMM_LEN; + +pub fn comm_to_string(comm: &[u8; TASK_COMM_LEN]) -> String { + let end = comm.iter().position(|&c| c == 0).unwrap_or(comm.len()); + String::from_utf8_lossy(&comm[..end]).to_string() +} diff --git a/core/api/src/lib.rs b/core/api/src/lib.rs index 0b13fb6..03ecd68 100644 --- a/core/api/src/lib.rs +++ b/core/api/src/lib.rs @@ -1,4 +1,7 @@ pub mod api; pub mod agent; pub mod client; -pub mod requests; \ No newline at end of file +pub mod requests; +pub mod structs; +pub mod constants; +pub mod helpers; diff --git a/core/api/src/main.rs b/core/api/src/main.rs index 4410458..0843684 100644 --- a/core/api/src/main.rs +++ b/core/api/src/main.rs @@ -4,6 +4,9 @@ use cortexbrain_common::logger; mod agent; mod api; +mod structs; +mod constants; +mod helpers; mod agent_proto { use tonic::include_file_descriptor_set; diff --git a/core/api/src/mod.rs b/core/api/src/mod.rs deleted file mode 100644 index e5b9c8d..0000000 --- a/core/api/src/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod api; -pub mod agent; -pub mod requests; \ No newline at end of file diff --git a/core/api/src/requests.rs b/core/api/src/requests.rs index e9fb2b1..a518f4a 100644 --- a/core/api/src/requests.rs +++ b/core/api/src/requests.rs @@ -15,6 +15,8 @@ use crate::agent::BlocklistResponse; use crate::agent::AddIpToBlocklistRequest; use crate::agent::RmIpFromBlocklistRequest; use crate::agent::RmIpFromBlocklistResponse; +use crate::agent::DroppedPacketsResponse; +use crate::agent::LatencyMetricsResponse; #[cfg(feature = "client")] pub async fn send_active_connection_request( @@ -68,3 +70,25 @@ pub async fn remove_ip_from_blocklist_request( let response = client.rm_ip_from_blocklist(request).await?; Ok(response) } + +#[cfg(feature = "client")] +pub async fn send_dropped_packets_request( + mut client: AgentClient, +) -> Result, Error> { + let request = Request::new(()); + let response = client.get_dropped_packets_metrics( + request + ).await?; + Ok(response) +} + +#[cfg(feature = "client")] +pub async fn send_latency_metrics_request( + mut client: AgentClient, +) -> Result, Error> { + let request = Request::new(()); + let response = client.get_latency_metrics( + request + ).await?; + Ok(response) +} diff --git a/core/api/src/structs.rs b/core/api/src/structs.rs new file mode 100644 index 0000000..b15fa22 --- /dev/null +++ b/core/api/src/structs.rs @@ -0,0 +1,48 @@ +use bytemuck::Zeroable; +use crate::constants::TASK_COMM_LEN; + + +#[repr(C)] +#[derive(Clone, Copy, Zeroable)] +pub struct PacketLog { + pub proto: u8, + pub src_ip: u32, + pub src_port: u16, + pub dst_ip: u32, + pub dst_port: u16, + pub pid: u32, +} +unsafe impl aya::Pod for PacketLog {} + +#[repr(C, packed)] +#[derive(Clone, Copy, Zeroable)] +pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, + pub sk_err: i32, + pub sk_err_soft: i32, + pub sk_backlog_len: i32, + pub sk_write_memory_queued: i32, + pub sk_receive_buffer_size: i32, + pub sk_ack_backlog: u32, + pub sk_drops: i32, +} +unsafe impl aya::Pod for NetworkMetrics {} + +#[repr(C)] +#[derive(Clone, Copy, Zeroable)] +pub struct TimeStampMetrics { + pub delta_us: u64, + pub ts_us: u64, + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub lport: u16, + pub dport_be: u16, + pub af: u16, + pub saddr_v4: u32, + pub daddr_v4: u32, + pub saddr_v6: [u32; 4], + pub daddr_v6: [u32; 4], +} +unsafe impl aya::Pod for TimeStampMetrics {} diff --git a/core/common/src/formatters.rs b/core/common/src/formatters.rs new file mode 100644 index 0000000..ebf4306 --- /dev/null +++ b/core/common/src/formatters.rs @@ -0,0 +1,15 @@ +use std::net::Ipv4Addr; + +pub fn format_ipv4(ip: u32) -> String { + Ipv4Addr::from(u32::from_be(ip)).to_string() +} + +pub fn format_ipv6(ip: &[u32; 4]) -> String { + format!( + "{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}", + (ip[0] >> 16) & 0xFFFF, ip[0] & 0xFFFF, + (ip[1] >> 16) & 0xFFFF, ip[1] & 0xFFFF, + (ip[2] >> 16) & 0xFFFF, ip[2] & 0xFFFF, + (ip[3] >> 16) & 0xFFFF, ip[3] & 0xFFFF + ) +} diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index c5d4373..f8fadc6 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -1,2 +1,3 @@ pub mod constants; pub mod logger; +pub mod formatters; \ No newline at end of file diff --git a/core/src/components/identity/src/main.rs b/core/src/components/identity/src/main.rs index 9804256..9ee00e9 100644 --- a/core/src/components/identity/src/main.rs +++ b/core/src/components/identity/src/main.rs @@ -59,9 +59,9 @@ async fn main() -> Result<(), anyhow::Error> { match init_bpf_maps(bpf.clone()) { std::result::Result::Ok(mut bpf_maps) => { info!("Successfully loaded bpf maps"); - - //TODO: save the bpf maps in a Vec instead of using a tuple - match map_pinner(&bpf_maps, &bpf_map_save_path.into()).await { + let pin_path = std::path::PathBuf::from(&bpf_map_save_path); + info!("About to call map_pinner with path: {:?}", pin_path); + match map_pinner(&bpf_maps, &pin_path).await { std::result::Result::Ok(_) => { info!("maps pinned successfully"); //load veth_trace program ref veth_trace.rs diff --git a/core/src/components/identity/src/map_handlers.rs b/core/src/components/identity/src/map_handlers.rs index 49697b7..6f2e818 100644 --- a/core/src/components/identity/src/map_handlers.rs +++ b/core/src/components/identity/src/map_handlers.rs @@ -1,6 +1,6 @@ use anyhow::Error; use anyhow::Ok; -use aya::Bpf; +use aya::Ebpf; use aya::maps::HashMap; use aya::maps::Map; use k8s_openapi::api::core::v1::ConfigMap; @@ -13,7 +13,7 @@ use std::sync::Mutex; use tokio::fs; use tracing::{error, info}; -pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map), anyhow::Error> { +pub fn init_bpf_maps(bpf: Arc>) -> Result<(Map, Map, Map), anyhow::Error> { // this function init the bpfs maps used in the main program /* index 0: events_map diff --git a/core/src/components/metrics/src/helpers.rs b/core/src/components/metrics/src/helpers.rs index 8fa552d..1b4628e 100644 --- a/core/src/components/metrics/src/helpers.rs +++ b/core/src/components/metrics/src/helpers.rs @@ -34,6 +34,9 @@ pub async fn display_metrics_map( if data.len() >= std::mem::size_of::() { let net_metrics: NetworkMetrics = unsafe { std::ptr::read_unaligned(data.as_ptr() as *const _) }; + let tgid = net_metrics.tgid; + let comm = String::from_utf8_lossy(&net_metrics.comm); + let ts_us = net_metrics.ts_us; let sk_drop_count = net_metrics.sk_drops; let sk_err = net_metrics.sk_err; let sk_err_soft = net_metrics.sk_err_soft; @@ -42,8 +45,8 @@ pub async fn display_metrics_map( let sk_ack_backlog = net_metrics.sk_ack_backlog; let sk_receive_buffer_size = net_metrics.sk_receive_buffer_size; info!( - "sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", - sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size + "tgid: {}, comm: {}, ts_us: {}, sk_drops: {}, sk_err: {}, sk_err_soft: {}, sk_backlog_len: {}, sk_write_memory_queued: {}, sk_ack_backlog: {}, sk_receive_buffer_size: {}", + tgid, comm, ts_us, sk_drop_count, sk_err, sk_err_soft, sk_backlog_len, sk_write_memory_queued, sk_ack_backlog, sk_receive_buffer_size ); } else { info!("Received data too small: {} bytes, expected: {}", data.len(), std::mem::size_of::()); diff --git a/core/src/components/metrics/src/main.rs b/core/src/components/metrics/src/main.rs index 2b0f2e9..6b22a86 100644 --- a/core/src/components/metrics/src/main.rs +++ b/core/src/components/metrics/src/main.rs @@ -47,7 +47,9 @@ async fn main() -> Result<(), anyhow::Error> { match init_ebpf_maps(bpf.clone()) { std::result::Result::Ok(maps) => { info!("BPF maps loaded successfully"); - match map_pinner(&maps, &bpf_map_save_path.clone().into()).await { + let pin_path = std::path::PathBuf::from(&bpf_map_save_path); + info!("About to call map_pinner with path: {:?}", pin_path); + match map_pinner(&maps, &pin_path).await { std::result::Result::Ok(_) => { info!("BPF maps pinned successfully to {}", bpf_map_save_path); diff --git a/core/src/components/metrics/src/structs.rs b/core/src/components/metrics/src/structs.rs index 75b3d9c..dc63ace 100644 --- a/core/src/components/metrics/src/structs.rs +++ b/core/src/components/metrics/src/structs.rs @@ -1,8 +1,12 @@ + pub const TASK_COMM_LEN: usize = 16; // linux/sched.h -#[repr(C)] +#[repr(C, packed)] #[derive(Clone, Copy)] pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, pub sk_err: i32, // Offset 284 pub sk_err_soft: i32, // Offset 600 pub sk_backlog_len: i32, // Offset 196 diff --git a/core/src/components/metrics_tracer/src/data_structures.rs b/core/src/components/metrics_tracer/src/data_structures.rs index 9e89bbd..f6d7afe 100644 --- a/core/src/components/metrics_tracer/src/data_structures.rs +++ b/core/src/components/metrics_tracer/src/data_structures.rs @@ -4,6 +4,9 @@ pub const TASK_COMM_LEN: usize = 16; pub struct NetworkMetrics { + pub tgid: u32, + pub comm: [u8; TASK_COMM_LEN], + pub ts_us: u64, pub sk_err: i32, // Offset 284 pub sk_err_soft: i32, // Offset 600 pub sk_backlog_len: i32, // Offset 196 diff --git a/core/src/components/metrics_tracer/src/main.rs b/core/src/components/metrics_tracer/src/main.rs index a1964f9..2f5e5a1 100644 --- a/core/src/components/metrics_tracer/src/main.rs +++ b/core/src/components/metrics_tracer/src/main.rs @@ -37,6 +37,9 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { return Err(1); } + let tgid = (unsafe { bpf_get_current_pid_tgid() } >> 32) as u32; + let comm = unsafe { bpf_get_current_comm() }.map_err(|_| 1i64)?; + let ts_us: u64 = unsafe { bpf_ktime_get_ns() } / 1_000; let sk_err_offset = 284; let sk_err_soft_offset = 600; let sk_backlog_len_offset = 196; @@ -54,13 +57,16 @@ fn try_metrics_tracer(ctx: ProbeContext) -> Result { let sk_drops = unsafe { bpf_probe_read_kernel::(sk_pointer.add(sk_drops_offset) as *const i32).map_err(|_| 1)? }; let net_metrics = NetworkMetrics { - sk_err, - sk_err_soft, - sk_backlog_len, - sk_write_memory_queued, - sk_receive_buffer_size, - sk_ack_backlog, - sk_drops, + tgid: tgid, + comm: comm, + ts_us: ts_us, + sk_err: sk_err, + sk_err_soft: sk_err_soft, + sk_backlog_len: sk_backlog_len, + sk_write_memory_queued: sk_write_memory_queued, + sk_receive_buffer_size: sk_receive_buffer_size, + sk_ack_backlog: sk_ack_backlog, + sk_drops: sk_drops, }; unsafe {