WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tracing::debug;
use crate::essential::{ CliError, info, update_cli };
use crate::install::{ InstallArgs, InstallCommands, install_cortexflow, install_simple_example };
use crate::logs::{ LogsArgs, logs_command };
use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_identity_events };
use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_dropped_packets, monitor_identity_events, monitor_latency_metrics };
use crate::policies::{
PoliciesArgs,
PoliciesCommands,
Expand Down Expand Up @@ -109,6 +109,12 @@ async fn args_parser() -> Result<(), CliError> {
MonitorCommands::Connections => {
let _ = monitor_identity_events().await.map_err(|e| eprintln!("{}",e) )?;
}
MonitorCommands::Latencymetrics => {
let _ = monitor_latency_metrics().await.map_err(|e| eprintln!("{}",e) )?;
}
MonitorCommands::Droppedpackets => {
let _ = monitor_dropped_packets().await.map_err(|e| eprintln!("{}",e) )?;
}
}
Some(Commands::Policies(policies_args)) => {
match policies_args.policy_cmd {
Expand Down
125 changes: 125 additions & 0 deletions cli/src/monitoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ pub enum MonitorCommands {
name = "connections",
about = "Monitor the recent connections detected by the identity service"
)] Connections,
#[command(
name = "latencymetrics",
about = "Monitor the latency metrics detected by the metrics service"
)] Latencymetrics,
#[command(
name = "droppedpackets",
about = "Monitor the dropped packets metrics detected by the metrics service"
)] Droppedpackets,
}

// cfcli monitor <args>
Expand Down Expand Up @@ -140,3 +148,120 @@ pub async fn monitor_identity_events() -> Result<(), Error> {

Ok(())
}

pub async fn monitor_latency_metrics() -> Result<(), Error> {
//function to monitor latency metrics
println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white());

match connect_to_client().await {
Ok(client) => {
println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green());
//send request to get latency metrics
match agent_api::requests::send_latency_metrics_request(client).await {
Ok(response) => {
let resp = response.into_inner();
if resp.metrics.is_empty() {
println!("{} No latency metrics found", "=====>".blue().bold());
} else {
println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len());
for (i, metric) in resp.metrics.iter().enumerate() {
println!(
"index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}",
"=====>".blue().bold(),
i,
metric.tgid,
metric.process_name,
metric.address_family,
metric.delta_us,
metric.src_address_v4,
metric.dst_address_v4,
format!("{:?}", metric.src_address_v6),
format!("{:?}", metric.dst_address_v6),
metric.local_port,
metric.remote_port,
metric.timestamp_us
);
}
}
}
Err(e) => {
println!(
"{} {} {} {}",
"=====>".blue().bold(),
"An error occured".red(),
"Error:",
e
);
return Err(e);
}
}
}
Err(e) =>{
println!(
"{} {}",
"=====>".blue().bold(),
"Failed to connect to CortexFlow Client".red()
);
return Err(e);
}
}
Ok(())
}

pub async fn monitor_dropped_packets() -> Result<(), Error> {
//function to monitor dropped packets metrics
println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white());

match connect_to_client().await {
Ok(client) => {
println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green());
//send request to get dropped packets metrics
match agent_api::requests::send_dropped_packets_request(client).await {
Ok(response) => {
let resp = response.into_inner();
if resp.metrics.is_empty() {
println!("{} No dropped packets metrics found", "=====>".blue().bold());
} else {
println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len());
for (i, metric) in resp.metrics.iter().enumerate() {
println!(
"{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs",
"=====>".blue().bold(),
i,
metric.tgid,
metric.process_name,
metric.sk_drops,
metric.sk_err,
metric.sk_err_soft,
metric.sk_backlog_len,
metric.sk_wmem_queued,
metric.sk_rcvbuf,
metric.sk_ack_backlog,
metric.timestamp_us
);
}
}
}
Err(e) => {
println!(
"{} {} {} {}",
"=====>".blue().bold(),
"An error occured".red(),
"Error:",
e
);
return Err(e);
}
}
}
Err(e) =>{
println!(
"{} {}",
"=====>".blue().bold(),
"Failed to connect to CortexFlow Client".red()
);
return Err(e);
}
}
Ok(())
}
57 changes: 56 additions & 1 deletion core/api/protos/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package agent;

import "google/protobuf/empty.proto";

// Active connections

message RequestActiveConnections{
optional string pod_ip = 2 ;
}
Expand All @@ -18,9 +20,57 @@ message ActiveConnectionResponse{
repeated ConnectionEvent events = 2; // List of connection events
}

// Network metrics

// Latency metrics request and response messages

message LatencyMetric {
uint64 delta_us = 1; // Latency in microseconds
uint64 timestamp_us = 2; // Event timestamp
uint32 tgid = 3; // Thread group ID
string process_name = 4; // Process name (comm)
uint32 local_port = 5; // Local port
uint32 remote_port = 6; // Remote port (big-endian)
uint32 address_family = 7; // "IPv4" or "IPv6"
string src_address_v4 = 8; // Source IP address
string dst_address_v4 = 9; // Destination IP address
string src_address_v6 = 10; // Source IPv6 address
string dst_address_v6 = 11; // Destination IPv6 address
}

message LatencyMetricsResponse {
string status = 1;
repeated LatencyMetric metrics = 2;
uint32 total_count = 3;
double average_latency_us = 4; // Average latency
double min_latency_us = 5; // Minimum latency
double max_latency_us = 6; // Maximum latency
}

// Dropped TCP Packets

message DroppedPacketMetric {
uint32 tgid = 1; // Thread group ID
string process_name = 2; // Process name
int32 sk_drops = 3; // Socket drops (from sk_drops field)
int32 sk_err = 4; // Socket errors
int32 sk_err_soft = 5; // Soft errors
uint32 sk_backlog_len = 6; // Backlog length (congestion indicator)
int32 sk_wmem_queued = 7; // Write memory queued
int32 sk_rcvbuf = 8; // Receive buffer size
uint32 sk_ack_backlog = 9; // ACK backlog
uint64 timestamp_us = 10; // Event timestamp
}

message DroppedPacketsResponse {
string status = 1;
repeated DroppedPacketMetric metrics = 2;
uint32 total_drops = 3; // Total drops across all connections
}


//declare agent api
service Agent{

// active connections endpoint
rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse);

Expand All @@ -31,6 +81,11 @@ service Agent{
// remove ip from blocklist endpoint
rpc RmIpFromBlocklist(RmIpFromBlocklistRequest) returns (RmIpFromBlocklistResponse);

// metrics data
rpc GetLatencyMetrics(google.protobuf.Empty) returns (LatencyMetricsResponse);

// dropped packets
rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse);
}

message AddIpToBlocklistRequest{
Expand Down
Loading