@@ -16,7 +16,7 @@ use tokio::sync::mpsc;
1616use tokio:: task;
1717
1818// * contains agent api configuration
19- use crate :: agent:: { agent_server:: Agent , ActiveConnectionResponse , RequestActiveConnections } ;
19+ use crate :: agent:: { agent_server:: Agent , ActiveConnectionResponse , RequestActiveConnections , ConnectionEvent } ;
2020use aya:: maps:: Map ;
2121use bytemuck_derive:: Zeroable ;
2222use cortexflow_identity:: enums:: IpProtocols ;
@@ -38,30 +38,31 @@ unsafe impl aya::Pod for PacketLog {}
3838pub struct AgentApi {
3939 //* event_rx is an istance of a mpsc receiver.
4040 //* is used to receive the data from the transmitter (tx)
41- event_rx : Mutex < mpsc:: Receiver < Result < HashMap < String , String > , Status > > > ,
42- event_tx : mpsc:: Sender < Result < HashMap < String , String > , Status > > ,
41+ event_rx : Mutex < mpsc:: Receiver < Result < Vec < ConnectionEvent > , Status > > > ,
42+ event_tx : mpsc:: Sender < Result < Vec < ConnectionEvent > , Status > > ,
4343}
4444
4545//* Event sender trait. Takes an event from a map and send that to the mpsc channel
4646//* using the send_map function
4747#[ async_trait]
4848pub trait EventSender : Send + Sync + ' static {
49- async fn send_event ( & self , event : HashMap < String , String > ) ;
49+ async fn send_event ( & self , event : Vec < ConnectionEvent > ) ;
5050 async fn send_map (
5151 & self ,
52- map : HashMap < String , String > ,
53- tx : mpsc:: Sender < Result < HashMap < String , String > , Status > > ,
52+ map : Vec < ConnectionEvent > ,
53+ tx : mpsc:: Sender < Result < Vec < ConnectionEvent > , Status > > ,
5454 ) {
5555 let status = Status :: new ( tonic:: Code :: Ok , "success" ) ;
5656 let event = Ok ( map) ;
5757
5858 let _ = tx. send ( event) . await ;
5959 }
6060}
61+
6162// send event function. takes an HashMap and send that using mpsc event_tx
6263#[ async_trait]
6364impl EventSender for AgentApi {
64- async fn send_event ( & self , event : HashMap < String , String > ) {
65+ async fn send_event ( & self , event : Vec < ConnectionEvent > ) {
6566 self . send_map ( event, self . event_tx . clone ( ) ) . await ;
6667 }
6768}
@@ -130,17 +131,18 @@ impl Default for AgentApi {
130131 "Event Id: {} Protocol: {:?} SRC: {}:{} -> DST: {}:{}" ,
131132 event_id, proto, src, src_port, dst, dst_port
132133 ) ;
133- info ! ( "creating hashmap for the aggregated data" ) ;
134- let mut evt = HashMap :: new ( ) ;
135- // insert event in the hashmap
136- info ! ( "Inserting events into the hashmap " ) ;
134+ info ! ( "creating vector for the aggregated data" ) ;
135+ let mut evt = Vec :: new ( ) ;
136+ // insert event in the vector
137+ info ! ( "Inserting events into the vector " ) ;
137138 //TODO: use a Arc<str> or Box<str> type instead of String type.
138139 //The data doesn't need to implement any .copy() or .clone() trait
139140 // using an Arc<str> type will also waste less resources
140- evt. insert (
141- format ! ( "{:?}" , event_id. to_string( ) ) ,
142- format ! ( "{:?}" , src. to_string( ) ) ,
143- ) ;
141+ evt. push ( ConnectionEvent {
142+ event_id : event_id. to_string ( ) ,
143+ src_ip_port : format ! ( "{}:{}" , src, src_port) ,
144+ dst_ip_port : format ! ( "{}:{}" , dst, dst_port) ,
145+ } ) ;
144146 info ! ( "sending events to the MPSC channel" ) ;
145147 let _ = tx. send ( Ok ( evt) ) . await ;
146148 }
@@ -160,8 +162,12 @@ impl Default for AgentApi {
160162 }
161163 } else if events. read == 0 {
162164 info ! ( "[Agent/API] 0 Events found" ) ;
163- let mut evt = HashMap :: new ( ) ;
164- evt. insert ( "0" . to_string ( ) , "0" . to_string ( ) ) ;
165+ let mut evt = Vec :: new ( ) ;
166+ evt. push ( ConnectionEvent {
167+ event_id : "0" . to_string ( ) ,
168+ src_ip_port : "0:0" . to_string ( ) ,
169+ dst_ip_port : "0:0" . to_string ( ) ,
170+ } ) ;
165171 let _ = tx. send ( Ok ( evt) ) . await ;
166172 }
167173 }
@@ -192,12 +198,12 @@ impl Agent for AgentApi {
192198 let req = request. into_inner ( ) ;
193199
194200 //create the hashmap to process events from the mpsc channel queue
195- let mut aggregated_events: HashMap < String , String > = HashMap :: new ( ) ;
201+ let mut aggregated_events: Vec < ConnectionEvent > = Vec :: new ( ) ;
196202
197203 //aggregate events
198204 while let Ok ( evt) = self . event_rx . lock ( ) . unwrap ( ) . try_recv ( ) {
199- if let Ok ( map ) = evt {
200- aggregated_events. extend ( map ) ;
205+ if let Ok ( vec ) = evt {
206+ aggregated_events. extend ( vec ) ;
201207 }
202208 }
203209
0 commit comments