WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit d20c072

Browse files
committed
FlowGrouper - Introduce new module
1 parent c3f5e5c commit d20c072

File tree

11 files changed

+704
-1
lines changed

11 files changed

+704
-1
lines changed

modules/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ add_subdirectory(sampler)
33
add_subdirectory(telemetry)
44
add_subdirectory(deduplicator)
55
add_subdirectory(clickhouse)
6+
add_subdirectory(flowGrouper)

modules/deduplicator/src/timeoutHashMap.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ class TimeoutHashMap {
302302
}
303303

304304
private:
305-
std::function<size_t(const FlowKey&)> m_hasher;
305+
std::function<size_t(const Key&)> m_hasher;
306306
typename HashMapTimeoutBucket::TimeoutBucketCallables m_timeoutBucketCallables;
307307
std::vector<HashMapTimeoutBucket> m_buckets;
308308
const uint64_t M_BUCKET_MASK;

modules/flowGrouper/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
add_subdirectory(src)

modules/flowGrouper/README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# FlowGrouper module - README
2+
3+
## Description
4+
FlowGrouper groups Unirec flow records that share the same 5-tuple (source IP,
5+
destination IP, source port, destination port and protocol) within a configurable
6+
time window and assigns a stable `FLOW_GROUP_KEY` to all records that belong to
7+
the same group.
8+
9+
This module is useful when aggregating flow records that may be
10+
received multiple times (e.g., from multiple exporters).
11+
12+
13+
## Interfaces
14+
- Input: 1
15+
- Output: 1
16+
17+
## Required Unirec Fields
18+
The module expects the input Unirec template to contain the following fields:
19+
- `SRC_IP` (ipaddr)
20+
- `DST_IP` (ipaddr)
21+
- `SRC_PORT` (uint16)
22+
- `DST_PORT` (uint16)
23+
- `PROTOCOL` (uint8)
24+
25+
FlowGrouper will extend the template by adding `uint64 FLOW_GROUP_KEY` to the output records.
26+
27+
## Parameters
28+
Command-line parameters follow the TRAP / Unirec conventions. The main module
29+
parameters are:
30+
31+
- `-s, --size <int>` Exponent N for the hash map size (2^N entries). Default value is 15
32+
- `-t, --timeout <int>` Time to consider similar flows as duplicates in milliseconds. Default value is 5000 (5s)
33+
34+
- `-m, --appfs-mountpoint <path>` Path where the appFs directory will be mounted
35+
36+
### Common TRAP / Unirec parameters
37+
- `-h` : print help and module-specific parameters
38+
- `-v`, `-vv`, `-vvv` : verbosity levels
39+
40+
## How Flow Grouping Works
41+
- Records are grouped when they arrive within the configured `--timeout`
42+
interval and share the same `SRC_IP`, `DST_IP`, `SRC_PORT`, `DST_PORT` and
43+
`PROTOCOL` values.
44+
- When a record arrives and no existing group matches, a new `FLOW_GROUP_KEY`
45+
is created and stored in an internal timeout hash map keyed by the 5-tuple.
46+
- Subsequent records that match the tuple within the timeout receive the same`FLOW_GROUP_KEY`.
47+
Note: FLOW_GROUP_KEY is not unique identifier. It identifies records that belong to the same group only in the context of the 5-tuple (SRC_IP, DST_IP, SRC_PORT, DST_PORT, PROTOCOL).
48+
## Telemetry data format
49+
50+
```
51+
├─ input/
52+
│ └─ stats
53+
└─ flowGrouper/
54+
└─ statistics
55+
```
56+
57+
Telemetry counters include:
58+
- **Inserted groups:** number of newly created flow groups
59+
- **Replaced groups:** number of times an existing bucket entry was replaced with new group
60+
- **Found groups:** number of times a matching group was found for an input record
61+
62+
63+
## Usage Examples
64+
Process Unirec records from a TRAP input and forward them with an added
65+
`FLOW_GROUP_KEY`. The example sets the hash map exponent to `15` (2^15 entries)
66+
and timeout to `1000` ms:
67+
68+
```
69+
$ FlowGrouper -i "u:in,u:out" -s 15 -t 1000
70+
```
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
add_executable(flowGrouper
2+
main.cpp
3+
flowGrouper.cpp
4+
)
5+
6+
target_link_libraries(flowGrouper PRIVATE
7+
telemetry::telemetry
8+
telemetry::appFs
9+
common
10+
rapidcsv
11+
unirec::unirec++
12+
unirec::unirec
13+
trap::trap
14+
argparse
15+
xxhash
16+
)
17+
18+
install(TARGETS flowGrouper DESTINATION ${INSTALL_DIR_BIN})
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
2+
#include "flowGrouper.hpp"
3+
4+
#include <stdexcept>
5+
#include <type_traits>
6+
#include <xxhash.h>
7+
8+
using namespace Nemea;
9+
10+
namespace FlowGrouper {
11+
12+
template <typename Key>
13+
static uint64_t xxHasher(const Key& key)
14+
{
15+
return XXH3_64bits(reinterpret_cast<const void*>(&key), sizeof(key));
16+
}
17+
18+
static FlowGrouper::Timestamp timeSum(const FlowGrouper::Timestamp& value, uint64_t timeout)
19+
{
20+
return value + std::chrono::milliseconds(timeout);
21+
}
22+
23+
static ur_field_id_t getUnirecIdByName(const char* str)
24+
{
25+
auto unirecId = ur_get_id_by_name(str);
26+
if (unirecId == UR_E_INVALID_NAME) {
27+
throw std::runtime_error(std::string("Invalid Unirec name:") + str);
28+
}
29+
return static_cast<ur_field_id_t>(unirecId);
30+
}
31+
32+
FlowGrouper::FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters)
33+
: m_hashMap(parameters, xxHasher<FlowKey>, std::less<>(), timeSum)
34+
{
35+
constexpr const size_t timeoutBucketSize = 256;
36+
static_assert(
37+
sizeof(FlowGrouperHashMap::HashMapTimeoutBucket) == timeoutBucketSize,
38+
"TimeoutBucket size is not 256 bytes");
39+
}
40+
41+
void FlowGrouper::updateUnirecIds()
42+
{
43+
m_ids.srcIpId = getUnirecIdByName("SRC_IP");
44+
m_ids.dstIpId = getUnirecIdByName("DST_IP");
45+
m_ids.srcPortId = getUnirecIdByName("SRC_PORT");
46+
m_ids.dstPortId = getUnirecIdByName("DST_PORT");
47+
m_ids.protocolId = getUnirecIdByName("PROTOCOL");
48+
m_ids.flowGroupKeyId = getUnirecIdByName(getOutputFieldName().c_str());
49+
50+
}
51+
52+
FlowGrouper::FlowGroupKey FlowGrouper::getFlowKey(Nemea::UnirecRecordView& view)
53+
{
54+
FlowKey flowKey;
55+
flowKey.srcIp = view.getFieldAsType<IpAddress>(m_ids.srcIpId);
56+
flowKey.dstIp = view.getFieldAsType<IpAddress>(m_ids.dstIpId);
57+
flowKey.srcPort = view.getFieldAsType<uint16_t>(m_ids.srcPortId);
58+
flowKey.dstPort = view.getFieldAsType<uint16_t>(m_ids.dstPortId);
59+
flowKey.proto = view.getFieldAsType<uint8_t>(m_ids.protocolId);
60+
61+
const FlowGrouper::FlowGroupKey newFlowKey = (FlowGrouper::FlowGroupKey) std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
62+
const auto [it, insertResult]
63+
= m_hashMap.insert({flowKey, newFlowKey}, std::chrono::steady_clock::now());
64+
65+
if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::INSERTED) {
66+
m_newInserted++;
67+
return newFlowKey;
68+
}
69+
if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::REPLACED) {
70+
m_replaced++;
71+
return newFlowKey;
72+
}
73+
m_found++;
74+
return *it;
75+
}
76+
77+
void FlowGrouper::addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord)
78+
{
79+
FlowGrouper::FlowGroupKey flowKey = getFlowKey(inputRecord);
80+
outputRecord.setFieldFromType<uint64_t>(flowKey,m_ids.flowGroupKeyId);
81+
}
82+
83+
void FlowGrouper::setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory)
84+
{
85+
m_holder.add(directory);
86+
87+
const telemetry::FileOps fileOps
88+
= {[this]() {
89+
telemetry::Dict dict;
90+
dict["replacedCount"] = telemetry::Scalar((long unsigned int) m_replaced);
91+
dict["newInsertedCount"] = telemetry::Scalar((long unsigned int) m_newInserted);
92+
dict["foundCount"] = telemetry::Scalar((long unsigned int) m_found);
93+
return dict;
94+
},
95+
nullptr};
96+
97+
m_holder.add(directory->addFile("statistics", fileOps));
98+
}
99+
100+
} // namespace FlowGrouper
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
#pragma once
2+
3+
#include "../../deduplicator/src/timeoutHashMap.hpp"
4+
5+
#include <atomic>
6+
#include <memory>
7+
#include <telemetry.hpp>
8+
#include <thread>
9+
#include <unirec++/unirecRecordView.hpp>
10+
#include <unirec++/unirecRecord.hpp>
11+
#include <unirec++/urTime.hpp>
12+
#include <vector>
13+
14+
namespace FlowGrouper {
15+
16+
/**
17+
* @brief FlowGrouper class to add same flowID to duplicate records
18+
*/
19+
class FlowGrouper {
20+
public:
21+
/**
22+
* @brief Timestamp type used by flowGrouper.
23+
*/
24+
using Timestamp = std::chrono::time_point<std::chrono::steady_clock>;
25+
26+
/**
27+
* @brief Field type representing Flow ID (FLOW_GROUP_KEY).
28+
*/
29+
using FlowGroupKey = uint64_t;
30+
31+
/**
32+
* @brief Represents key fields of flow that belong to the same group.
33+
*/
34+
struct FlowKey {
35+
Nemea::IpAddress srcIp; ///< Source IP address.
36+
Nemea::IpAddress dstIp; ///< Destination IP address.
37+
uint16_t srcPort; ///< Source port.
38+
uint16_t dstPort; ///< Destination port.
39+
uint8_t proto; ///< Protocol ID.
40+
};
41+
42+
43+
44+
/**
45+
* @brief Timeout hash map type used by FlowGrouper.
46+
*/
47+
using FlowGrouperHashMap = Deduplicator::TimeoutHashMap<
48+
FlowKey,
49+
FlowGroupKey,
50+
Timestamp,
51+
std::function<size_t(const FlowKey&)>,
52+
std::function<bool(const Timestamp&, const Timestamp&)>,
53+
std::function<Timestamp(const Timestamp&, uint64_t)>>;
54+
55+
static inline const uint64_t DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s
56+
static inline const uint32_t DEFAULT_HASHMAP_EXPONENT = 20; ///< Default size exponent - 2^20 entries
57+
58+
/**
59+
* @brief FlowGrouper constructor
60+
*
61+
* @param parameters Parameters to build hash table of flowGrouper
62+
*/
63+
explicit FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters);
64+
65+
/**
66+
* @brief Checks if the given UnirecRecordView group already exists in the hash map if not adds it.
67+
* @param view The Unirec record to check.
68+
* @return FlowGroupKey of the flow.
69+
*/
70+
FlowGroupKey getFlowKey(Nemea::UnirecRecordView& view);
71+
72+
/**
73+
* @brief Adds FLOW_GROUP_KEY field to the output Unirec record.
74+
* @param inputRecord The input Unirec record view to get field values from.
75+
* @param outputRecord The output Unirec record where FLOW_GROUP_KEY will be added.
76+
*/
77+
void addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord);
78+
79+
/**
80+
* @brief Sets the telemetry directory for the flowGrouper.
81+
* @param directory directory for flowGrouper telemetry.
82+
*/
83+
void setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory);
84+
85+
/**
86+
* @brief Update Unirec Id of required fields after template format change.
87+
*/
88+
void updateUnirecIds();
89+
90+
/**
91+
* @brief Gets the name of the output field added by FlowGrouper.
92+
* @return Name of the output field.
93+
*/
94+
static std::string getOutputFieldName() {
95+
return "FLOW_GROUP_KEY";
96+
}
97+
98+
/**
99+
* @brief Gets the output template string after adding FLOW_GROUP_KEY field.
100+
* @param inputTemplate The input Unirec template string.
101+
* @return The output Unirec template string with FLOW_GROUP_KEY field added.
102+
*/
103+
static std::string getOutputTemplate(std::string inputTemplate) {
104+
//check if input template already contains output field
105+
if (inputTemplate.find(" "+getOutputFieldName()) != std::string::npos) {
106+
return inputTemplate;
107+
}
108+
return inputTemplate + ", uint64 " + getOutputFieldName();
109+
}
110+
111+
private:
112+
FlowGrouperHashMap m_hashMap; ///< Hash map to keep flows
113+
114+
uint32_t m_newInserted {0}; ///< Count of new groups
115+
uint32_t m_replaced {0}; ///< Count of replaced groups
116+
uint32_t m_found {0}; ///< Count of when groupkey was found
117+
118+
telemetry::Holder m_holder; ///< Telemetry holder
119+
120+
struct UnirecIdStorage {
121+
ur_field_id_t srcIpId; ///< Unirec ID of source ip.
122+
ur_field_id_t dstIpId; ///< Unirec ID of destination ip.
123+
ur_field_id_t srcPortId; ///< Unirec ID of source port.
124+
ur_field_id_t dstPortId; ///< Unirec ID of destination port.
125+
ur_field_id_t protocolId; ///< Unirec ID of protocol field.
126+
127+
ur_field_id_t flowGroupKeyId; ///< Unirec ID of FLOW_GROUP_KEY field.
128+
};
129+
130+
UnirecIdStorage m_ids; ///< Ids of Unirec fields used by flowGrouper module
131+
};
132+
133+
} // namespace FlowGrouper

0 commit comments

Comments
 (0)