WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 742e444

Browse files
committed
[#140]: Improve agent API response
1 parent 191b5ca commit 742e444

File tree

4 files changed

+49
-28
lines changed

4 files changed

+49
-28
lines changed

core/api/protos/agent.proto

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ package agent;
44
message RequestActiveConnections{
55
optional string pod_ip = 2 ;
66
}
7+
8+
message ConnectionEvent {
9+
string event_id = 1;
10+
string src_ip_port = 2; // e.g., "192.168.1.1:8080" (src_ip:src_port)
11+
string dst_ip_port = 3; // e.g., "10.0.0.1:80" (dst_ip:dst_port)
12+
}
13+
714
// TODO: the complete Response will be able to return all the context below
815
//* "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
916
//* event_id, proto, src, src_port, dst, dst_port
1017
message ActiveConnectionResponse{
1118
string status = 1;
12-
map<string,string> events = 2 ; //for simplicity right now we only return event_id and src
19+
repeated ConnectionEvent events = 2; // List of connection events
1320
}
1421

1522
//declare agent api

core/api/src/agent.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,17 @@ pub struct RequestActiveConnections {
44
#[prost(string, optional, tag = "2")]
55
pub pod_ip: ::core::option::Option<::prost::alloc::string::String>,
66
}
7+
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
8+
pub struct ConnectionEvent {
9+
#[prost(string, tag = "1")]
10+
pub event_id: ::prost::alloc::string::String,
11+
/// e.g., "192.168.1.1:8080" (src_ip:src_port)
12+
#[prost(string, tag = "2")]
13+
pub src_ip_port: ::prost::alloc::string::String,
14+
/// e.g., "10.0.0.1:80" (dst_ip:dst_port)
15+
#[prost(string, tag = "3")]
16+
pub dst_ip_port: ::prost::alloc::string::String,
17+
}
718
/// TODO: the complete Response will be able to return all the context below
819
///
920
/// * "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
@@ -12,12 +23,9 @@ pub struct RequestActiveConnections {
1223
pub struct ActiveConnectionResponse {
1324
#[prost(string, tag = "1")]
1425
pub status: ::prost::alloc::string::String,
15-
/// for simplicity right now we only return event_id and src
16-
#[prost(map = "string, string", tag = "2")]
17-
pub events: ::std::collections::HashMap<
18-
::prost::alloc::string::String,
19-
::prost::alloc::string::String,
20-
>,
26+
/// List of connection events
27+
#[prost(message, repeated, tag = "2")]
28+
pub events: ::prost::alloc::vec::Vec<ConnectionEvent>,
2129
}
2230
/// Generated client implementations.
2331
pub mod agent_client {

core/api/src/api.rs

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::sync::mpsc;
1616
use tokio::task;
1717

1818
// * contains agent api configuration
19-
use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections};
19+
use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections, ConnectionEvent};
2020
use aya::maps::Map;
2121
use bytemuck_derive::Zeroable;
2222
use cortexflow_identity::enums::IpProtocols;
@@ -38,30 +38,31 @@ unsafe impl aya::Pod for PacketLog {}
3838
pub struct AgentApi {
3939
//* event_rx is an istance of a mpsc receiver.
4040
//* is used to receive the data from the transmitter (tx)
41-
event_rx: Mutex<mpsc::Receiver<Result<HashMap<String, String>, Status>>>,
42-
event_tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
41+
event_rx: Mutex<mpsc::Receiver<Result<Vec<ConnectionEvent>, Status>>>,
42+
event_tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
4343
}
4444

4545
//* Event sender trait. Takes an event from a map and send that to the mpsc channel
4646
//* using the send_map function
4747
#[async_trait]
4848
pub trait EventSender: Send + Sync + 'static {
49-
async fn send_event(&self, event: HashMap<String, String>);
49+
async fn send_event(&self, event: Vec<ConnectionEvent>);
5050
async fn send_map(
5151
&self,
52-
map: HashMap<String, String>,
53-
tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
52+
map: Vec<ConnectionEvent>,
53+
tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
5454
) {
5555
let status = Status::new(tonic::Code::Ok, "success");
5656
let event = Ok(map);
5757

5858
let _ = tx.send(event).await;
5959
}
6060
}
61+
6162
// send event function. takes an HashMap and send that using mpsc event_tx
6263
#[async_trait]
6364
impl EventSender for AgentApi {
64-
async fn send_event(&self, event: HashMap<String, String>) {
65+
async fn send_event(&self, event: Vec<ConnectionEvent>) {
6566
self.send_map(event, self.event_tx.clone()).await;
6667
}
6768
}
@@ -130,17 +131,18 @@ impl Default for AgentApi {
130131
"Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
131132
event_id, proto, src, src_port, dst, dst_port
132133
);
133-
info!("creating hashmap for the aggregated data");
134-
let mut evt = HashMap::new();
135-
// insert event in the hashmap
136-
info!("Inserting events into the hashmap");
134+
info!("creating vector for the aggregated data");
135+
let mut evt = Vec::new();
136+
// insert event in the vector
137+
info!("Inserting events into the vector");
137138
//TODO: use a Arc<str> or Box<str> type instead of String type.
138139
//The data doesn't need to implement any .copy() or .clone() trait
139140
// using an Arc<str> type will also waste less resources
140-
evt.insert(
141-
format!("{:?}", event_id.to_string()),
142-
format!("{:?}", src.to_string()),
143-
);
141+
evt.push(ConnectionEvent {
142+
event_id: event_id.to_string(),
143+
src_ip_port: format!("{}:{}", src, src_port),
144+
dst_ip_port: format!("{}:{}", dst, dst_port),
145+
});
144146
info!("sending events to the MPSC channel");
145147
let _ = tx.send(Ok(evt)).await;
146148
}
@@ -160,8 +162,12 @@ impl Default for AgentApi {
160162
}
161163
} else if events.read == 0 {
162164
info!("[Agent/API] 0 Events found");
163-
let mut evt = HashMap::new();
164-
evt.insert("0".to_string(), "0".to_string());
165+
let mut evt = Vec::new();
166+
evt.push(ConnectionEvent {
167+
event_id: "0".to_string(),
168+
src_ip_port: "0:0".to_string(),
169+
dst_ip_port: "0:0".to_string(),
170+
});
165171
let _ = tx.send(Ok(evt)).await;
166172
}
167173
}
@@ -192,12 +198,12 @@ impl Agent for AgentApi {
192198
let req = request.into_inner();
193199

194200
//create the hashmap to process events from the mpsc channel queue
195-
let mut aggregated_events: HashMap<String, String> = HashMap::new();
201+
let mut aggregated_events: Vec<ConnectionEvent> = Vec::new();
196202

197203
//aggregate events
198204
while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() {
199-
if let Ok(map) = evt {
200-
aggregated_events.extend(map);
205+
if let Ok(vec) = evt {
206+
aggregated_events.extend(vec);
201207
}
202208
}
203209

core/src/testing/agent.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ spec:
1919
hostNetwork: true
2020
containers:
2121
- name: agent
22-
image: lorenzotettamanti/cortexflow-agent:latest
22+
image: cortexflow-agent:0.0.1
2323
command: ["/bin/bash", "-c"]
2424
args:
2525
- |

0 commit comments

Comments
 (0)