WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion core/api/protos/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@ package agent;
message RequestActiveConnections{
optional string pod_ip = 2 ;
}

message ConnectionEvent {
string event_id = 1;
string src_ip_port = 2; // e.g., "192.168.1.1:8080" (src_ip:src_port)
string dst_ip_port = 3; // e.g., "10.0.0.1:80" (dst_ip:dst_port)
}

// TODO: the complete Response will be able to return all the context below
//* "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
//* event_id, proto, src, src_port, dst, dst_port
message ActiveConnectionResponse{
string status = 1;
map<string,string> events = 2 ; //for simplicity right now we only return event_id and src
repeated ConnectionEvent events = 2; // List of connection events
}

//declare agent api
Expand Down
20 changes: 14 additions & 6 deletions core/api/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ pub struct RequestActiveConnections {
#[prost(string, optional, tag = "2")]
pub pod_ip: ::core::option::Option<::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
pub struct ConnectionEvent {
#[prost(string, tag = "1")]
pub event_id: ::prost::alloc::string::String,
/// e.g., "192.168.1.1:8080" (src_ip:src_port)
#[prost(string, tag = "2")]
pub src_ip_port: ::prost::alloc::string::String,
/// e.g., "10.0.0.1:80" (dst_ip:dst_port)
#[prost(string, tag = "3")]
pub dst_ip_port: ::prost::alloc::string::String,
}
/// TODO: the complete Response will be able to return all the context below
///
/// * "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
Expand All @@ -12,12 +23,9 @@ pub struct RequestActiveConnections {
pub struct ActiveConnectionResponse {
#[prost(string, tag = "1")]
pub status: ::prost::alloc::string::String,
/// for simplicity right now we only return event_id and src
#[prost(map = "string, string", tag = "2")]
pub events: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
/// List of connection events
#[prost(message, repeated, tag = "2")]
pub events: ::prost::alloc::vec::Vec<ConnectionEvent>,
}
/// Generated client implementations.
pub mod agent_client {
Expand Down
46 changes: 26 additions & 20 deletions core/api/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::sync::mpsc;
use tokio::task;

// * contains agent api configuration
use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections};
use crate::agent::{agent_server::Agent, ActiveConnectionResponse, RequestActiveConnections, ConnectionEvent};
use aya::maps::Map;
use bytemuck_derive::Zeroable;
use cortexflow_identity::enums::IpProtocols;
Expand All @@ -38,30 +38,31 @@ unsafe impl aya::Pod for PacketLog {}
pub struct AgentApi {
//* event_rx is an istance of a mpsc receiver.
//* is used to receive the data from the transmitter (tx)
event_rx: Mutex<mpsc::Receiver<Result<HashMap<String, String>, Status>>>,
event_tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
event_rx: Mutex<mpsc::Receiver<Result<Vec<ConnectionEvent>, Status>>>,
event_tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
}

//* Event sender trait. Takes an event from a map and send that to the mpsc channel
//* using the send_map function
#[async_trait]
pub trait EventSender: Send + Sync + 'static {
async fn send_event(&self, event: HashMap<String, String>);
async fn send_event(&self, event: Vec<ConnectionEvent>);
async fn send_map(
&self,
map: HashMap<String, String>,
tx: mpsc::Sender<Result<HashMap<String, String>, Status>>,
map: Vec<ConnectionEvent>,
tx: mpsc::Sender<Result<Vec<ConnectionEvent>, Status>>,
) {
let status = Status::new(tonic::Code::Ok, "success");
let event = Ok(map);

let _ = tx.send(event).await;
}
}

// send event function. takes an HashMap and send that using mpsc event_tx
#[async_trait]
impl EventSender for AgentApi {
async fn send_event(&self, event: HashMap<String, String>) {
async fn send_event(&self, event: Vec<ConnectionEvent>) {
self.send_map(event, self.event_tx.clone()).await;
}
}
Expand Down Expand Up @@ -130,17 +131,18 @@ impl Default for AgentApi {
"Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}",
event_id, proto, src, src_port, dst, dst_port
);
info!("creating hashmap for the aggregated data");
let mut evt = HashMap::new();
// insert event in the hashmap
info!("Inserting events into the hashmap");
info!("creating vector for the aggregated data");
let mut evt = Vec::new();
// insert event in the vector
info!("Inserting events into the vector");
//TODO: use a Arc<str> or Box<str> type instead of String type.
//The data doesn't need to implement any .copy() or .clone() trait
// using an Arc<str> type will also waste less resources
evt.insert(
format!("{:?}", event_id.to_string()),
format!("{:?}", src.to_string()),
);
evt.push(ConnectionEvent {
event_id: event_id.to_string(),
src_ip_port: format!("{}:{}", src, src_port),
dst_ip_port: format!("{}:{}", dst, dst_port),
});
info!("sending events to the MPSC channel");
let _ = tx.send(Ok(evt)).await;
}
Expand All @@ -160,8 +162,12 @@ impl Default for AgentApi {
}
} else if events.read == 0 {
info!("[Agent/API] 0 Events found");
let mut evt = HashMap::new();
evt.insert("0".to_string(), "0".to_string());
let mut evt = Vec::new();
evt.push(ConnectionEvent {
event_id: "0".to_string(),
src_ip_port: "0:0".to_string(),
dst_ip_port: "0:0".to_string(),
});
let _ = tx.send(Ok(evt)).await;
}
}
Expand Down Expand Up @@ -192,12 +198,12 @@ impl Agent for AgentApi {
let req = request.into_inner();

//create the hashmap to process events from the mpsc channel queue
let mut aggregated_events: HashMap<String, String> = HashMap::new();
let mut aggregated_events: Vec<ConnectionEvent> = Vec::new();

//aggregate events
while let Ok(evt) = self.event_rx.lock().unwrap().try_recv() {
if let Ok(map) = evt {
aggregated_events.extend(map);
if let Ok(vec) = evt {
aggregated_events.extend(vec);
}
}

Expand Down