WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit eb10c76

Browse files
siddh34LorenzoTettamanti
authored andcommitted
CortexFlow[#143]: CLI changes & packet loss metrics
1 parent 2ec6378 commit eb10c76

File tree

14 files changed

+320
-342
lines changed

14 files changed

+320
-342
lines changed

cli/src/main.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tracing::debug;
1616
use crate::essential::{ CliError, info, update_cli };
1717
use crate::install::{ InstallArgs, InstallCommands, install_cortexflow, install_simple_example };
1818
use crate::logs::{ LogsArgs, logs_command };
19-
use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_identity_events };
19+
use crate::monitoring::{ MonitorArgs, MonitorCommands, list_features, monitor_dropped_packets, monitor_identity_events, monitor_latency_metrics };
2020
use crate::policies::{
2121
PoliciesArgs,
2222
PoliciesCommands,
@@ -109,6 +109,12 @@ async fn args_parser() -> Result<(), CliError> {
109109
MonitorCommands::Connections => {
110110
let _ = monitor_identity_events().await.map_err(|e| eprintln!("{}",e) )?;
111111
}
112+
MonitorCommands::Latencymetrics => {
113+
let _ = monitor_latency_metrics().await.map_err(|e| eprintln!("{}",e) )?;
114+
}
115+
MonitorCommands::Droppedpackets => {
116+
let _ = monitor_dropped_packets().await.map_err(|e| eprintln!("{}",e) )?;
117+
}
112118
}
113119
Some(Commands::Policies(policies_args)) => {
114120
match policies_args.policy_cmd {

cli/src/monitoring.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,14 @@ pub enum MonitorCommands {
2323
name = "connections",
2424
about = "Monitor the recent connections detected by the identity service"
2525
)] Connections,
26+
#[command(
27+
name = "latencymetrics",
28+
about = "Monitor the latency metrics detected by the metrics service"
29+
)] Latencymetrics,
30+
#[command(
31+
name = "droppedpackets",
32+
about = "Monitor the dropped packets metrics detected by the metrics service"
33+
)] Droppedpackets,
2634
}
2735

2836
// cfcli monitor <args>
@@ -140,3 +148,120 @@ pub async fn monitor_identity_events() -> Result<(), Error> {
140148

141149
Ok(())
142150
}
151+
152+
pub async fn monitor_latency_metrics() -> Result<(), Error> {
153+
//function to monitor latency metrics
154+
println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white());
155+
156+
match connect_to_client().await {
157+
Ok(client) => {
158+
println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green());
159+
//send request to get latency metrics
160+
match agent_api::requests::send_latency_metrics_request(client).await {
161+
Ok(response) => {
162+
let resp = response.into_inner();
163+
if resp.metrics.is_empty() {
164+
println!("{} No latency metrics found", "=====>".blue().bold());
165+
} else {
166+
println!("{} Found {} latency metrics", "=====>".blue().bold(), resp.metrics.len());
167+
for (i, metric) in resp.metrics.iter().enumerate() {
168+
println!(
169+
"index {} Latency[{}], tgid {} process_name {} address_family {} delta_us {} src_address_v4 {} dst_address_v4 {} src_address_v6 {} dst_address_v6 {} local_port {} remote_port {} timestamp_us {}",
170+
"=====>".blue().bold(),
171+
i,
172+
metric.tgid,
173+
metric.process_name,
174+
metric.address_family,
175+
metric.delta_us,
176+
metric.src_address_v4,
177+
metric.dst_address_v4,
178+
format!("{:?}", metric.src_address_v6),
179+
format!("{:?}", metric.dst_address_v6),
180+
metric.local_port,
181+
metric.remote_port,
182+
metric.timestamp_us
183+
);
184+
}
185+
}
186+
}
187+
Err(e) => {
188+
println!(
189+
"{} {} {} {}",
190+
"=====>".blue().bold(),
191+
"An error occured".red(),
192+
"Error:",
193+
e
194+
);
195+
return Err(e);
196+
}
197+
}
198+
}
199+
Err(e) =>{
200+
println!(
201+
"{} {}",
202+
"=====>".blue().bold(),
203+
"Failed to connect to CortexFlow Client".red()
204+
);
205+
return Err(e);
206+
}
207+
}
208+
Ok(())
209+
}
210+
211+
pub async fn monitor_dropped_packets() -> Result<(), Error> {
212+
//function to monitor dropped packets metrics
213+
println!("{} {}", "=====>".blue().bold(), "Connecting to cortexflow Client".white());
214+
215+
match connect_to_client().await {
216+
Ok(client) => {
217+
println!("{} {}", "=====>".blue().bold(), "Connected to CortexFlow Client".green());
218+
//send request to get dropped packets metrics
219+
match agent_api::requests::send_dropped_packets_request(client).await {
220+
Ok(response) => {
221+
let resp = response.into_inner();
222+
if resp.metrics.is_empty() {
223+
println!("{} No dropped packets metrics found", "=====>".blue().bold());
224+
} else {
225+
println!("{} Found {} dropped packets metrics", "=====>".blue().bold(), resp.metrics.len());
226+
for (i, metric) in resp.metrics.iter().enumerate() {
227+
println!(
228+
"{} DroppedPackets[{}]\n TGID: {}\n Process: {}\n SK Drops: {}\n Socket Errors: {}\n Soft Errors: {}\n Backlog Length: {}\n Write Memory Queued: {}\n Receive Buffer Size: {}\n ACK Backlog: {}\n Timestamp: {} µs",
229+
"=====>".blue().bold(),
230+
i,
231+
metric.tgid,
232+
metric.process_name,
233+
metric.sk_drops,
234+
metric.sk_err,
235+
metric.sk_err_soft,
236+
metric.sk_backlog_len,
237+
metric.sk_wmem_queued,
238+
metric.sk_rcvbuf,
239+
metric.sk_ack_backlog,
240+
metric.timestamp_us
241+
);
242+
}
243+
}
244+
}
245+
Err(e) => {
246+
println!(
247+
"{} {} {} {}",
248+
"=====>".blue().bold(),
249+
"An error occured".red(),
250+
"Error:",
251+
e
252+
);
253+
return Err(e);
254+
}
255+
}
256+
}
257+
Err(e) =>{
258+
println!(
259+
"{} {}",
260+
"=====>".blue().bold(),
261+
"Failed to connect to CortexFlow Client".red()
262+
);
263+
return Err(e);
264+
}
265+
}
266+
Ok(())
267+
}

core/api/protos/agent.proto

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,8 @@ message ActiveConnectionResponse{
2222

2323
// Network metrics
2424

25-
26-
2725
// Latency metrics request and response messages
2826

29-
message LatencyMetricsRequest {
30-
optional uint32 tgid = 1; // Filter by thread group ID
31-
optional string process_name = 2; // Filter by process name
32-
optional uint64 start_time = 3; // Start timestamp (microseconds)
33-
optional uint64 end_time = 4; // End timestamp (microseconds)
34-
}
35-
3627
message LatencyMetric {
3728
uint64 delta_us = 1; // Latency in microseconds
3829
uint64 timestamp_us = 2; // Event timestamp
@@ -56,40 +47,8 @@ message LatencyMetricsResponse {
5647
double max_latency_us = 6; // Maximum latency
5748
}
5849

59-
// Packet Loss Metrics
60-
61-
message PacketLossMetricsRequest {
62-
optional uint32 tgid = 1; // Filter by thread group ID
63-
optional uint64 start_time = 2; // Start timestamp
64-
optional uint64 end_time = 3; // End timestamp
65-
}
66-
67-
message PacketLossMetric {
68-
uint32 tgid = 1; // Thread group ID
69-
string process_name = 2; // Process name
70-
uint64 timestamp_us = 3; // Event timestamp
71-
uint32 total_packets_lost = 4; // Total packets lost
72-
uint32 total_packets_transmitted = 5; // Total packets transmitted
73-
double packet_loss_percentage = 6; // % of total packet loss
74-
uint64 total_data_loss_bytes = 7; // Total size of data loss
75-
uint64 total_data_transmitted_bytes = 8; // Total transmitted data
76-
double data_loss_ratio = 9; // Ratio between loss and transmitted
77-
}
78-
79-
message PacketLossMetricsResponse {
80-
string status = 1;
81-
repeated PacketLossMetric metrics = 2;
82-
uint32 total_connections = 3;
83-
}
84-
8550
// Dropped TCP Packets
8651

87-
message DroppedPacketsRequest {
88-
optional uint32 tgid = 1; // Filter by thread group ID
89-
optional uint64 start_time = 2; // Start timestamp
90-
optional uint64 end_time = 3; // End timestamp
91-
}
92-
9352
message DroppedPacketMetric {
9453
uint32 tgid = 1; // Thread group ID
9554
string process_name = 2; // Process name
@@ -112,7 +71,6 @@ message DroppedPacketsResponse {
11271

11372
//declare agent api
11473
service Agent{
115-
11674
// active connections endpoint
11775
rpc ActiveConnections(RequestActiveConnections) returns (ActiveConnectionResponse);
11876

@@ -124,11 +82,10 @@ service Agent{
12482
rpc RmIpFromBlocklist(RmIpFromBlocklistRequest) returns (RmIpFromBlocklistResponse);
12583

12684
// metrics data
127-
rpc GetLatencyMetrics(LatencyMetricsRequest) returns (LatencyMetricsResponse);
128-
129-
rpc GetPacketLossMetrics(PacketLossMetricsRequest) returns (PacketLossMetricsResponse);
85+
rpc GetLatencyMetrics(google.protobuf.Empty) returns (LatencyMetricsResponse);
13086

131-
rpc GetDroppedPacketsMetrics(DroppedPacketsRequest) returns (DroppedPacketsResponse);
87+
// dropped packets
88+
rpc GetDroppedPacketsMetrics(google.protobuf.Empty) returns (DroppedPacketsResponse);
13289
}
13390

13491
message AddIpToBlocklistRequest{

0 commit comments

Comments
 (0)