WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 16fde47

Browse files
committed
[python] add alternative queued signal implementation
1 parent fa85555 commit 16fde47

File tree

10 files changed

+352
-267
lines changed

10 files changed

+352
-267
lines changed

api/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ target_sources(dsr_api
5151
dsr_inner_eigen_api.cpp
5252
dsr_rt_api.cpp
5353
dsr_utils.cpp
54+
dsr_signal_emitter.cpp
5455
GHistorySaver.cpp
5556
${GEOM_API_SOURCES}
5657
${headers_to_moc}

api/dsr_api.cpp

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "dsr/api/dsr_graph_settings.h"
66
#include "dsr/core/topics/IDLGraph.hpp"
77
#include "dsr/core/types/translator.h"
8+
#include "include/dsr/api/dsr_signal_emitter.h"
89
#include <chrono>
910
#include <ctime>
1011
#include <dsr/api/dsr_api.h>
@@ -44,7 +45,11 @@ DSRGraph::DSRGraph(GraphSettings settings) :
4445

4546
qDebug() << "Agent name: " << QString::fromStdString(agent_name);
4647
utils = std::make_unique<Utilities>(this);
47-
48+
if (settings.signal_mode == SignalMode::QT) {
49+
set_qt_signals();
50+
} else {
51+
set_queued_signals();
52+
}
4853
// RTPS Create participant
4954
auto[suc, participant_handle] = dsrparticipant.init(agent_id, agent_name, settings.same_host,
5055
ParticipantChangeFunctor(this, [&](DSR::DSRGraph *graph,
@@ -122,8 +127,8 @@ DSRGraph::DSRGraph(GraphSettings settings) :
122127
qDebug() << __FUNCTION__ << "Constructor finished OK";
123128
}
124129

125-
DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id)
126-
: DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id})
130+
DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id, SignalMode mode)
131+
: DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id, mode})
127132
{}
128133

129134

@@ -202,10 +207,10 @@ std::optional<uint64_t> DSRGraph::insert_node(No &&node)
202207
if (delta.has_value())
203208
{
204209
dsrpub_node.write(&delta.value());
205-
emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
210+
emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
206211
for (const auto &[k, v]: node.fano())
207212
{
208-
emit update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id});
213+
emitter.update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id});
209214
}
210215
}
211216
}
@@ -291,13 +296,13 @@ requires (std::is_same_v<std::remove_cvref_t<No>, DSR::Node>)
291296
if (!copy) {
292297
if (vec_node_attr.has_value()) {
293298
dsrpub_node_attrs.write(&vec_node_attr.value());
294-
emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
299+
emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id});
295300
std::vector<std::string> atts_names(vec_node_attr->size());
296301
std::transform(std::make_move_iterator(vec_node_attr->begin()),
297302
std::make_move_iterator(vec_node_attr->end()),
298303
atts_names.begin(),
299304
[](auto &&x) { return x.attr_name(); });
300-
emit update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id});
305+
emitter.update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id});
301306

302307
}
303308
}
@@ -383,16 +388,16 @@ bool DSRGraph::delete_node(const std::string &name)
383388

384389
if (result) {
385390
if (!copy) {
386-
emit del_node_signal(id.value(), SignalInfo{agent_id});
387-
if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id});
391+
emitter.del_node_signal(id.value(), SignalInfo{agent_id});
392+
if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id});
388393
dsrpub_node.write(&deleted_node.value());
389394

390395
for (auto &a : delta_vec) {
391396
dsrpub_edge.write(&a);
392397
}
393398
for (auto &edge : deleted_edges) {
394-
emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
395-
emit deleted_edge_signal(edge);
399+
emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
400+
emitter.deleted_edge_signal(edge, SignalInfo{ agent_id });
396401
}
397402
}
398403
return true;
@@ -416,16 +421,16 @@ bool DSRGraph::delete_node(uint64_t id)
416421

417422
if (result) {
418423
if (!copy) {
419-
emit del_node_signal(id, SignalInfo{ agent_id });
420-
if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id});
424+
emitter.del_node_signal(id, SignalInfo{ agent_id });
425+
if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id});
421426
dsrpub_node.write(&deleted_node.value());
422427

423428
for (auto &a : delta_vec) {
424429
dsrpub_edge.write(&a);
425430
}
426431
for (auto &edge : deleted_edges) {
427-
emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
428-
emit deleted_edge_signal(edge);
432+
emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id });
433+
emitter.deleted_edge_signal(edge, SignalInfo{ agent_id });
429434
}
430435
}
431436
return true;
@@ -632,7 +637,7 @@ requires (std::is_same_v<std::remove_cvref_t<Ed>, DSR::Edge>)
632637
}
633638
if (result) {
634639
if (!copy) {
635-
emit update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id });
640+
emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id });
636641

637642
if (delta_edge.has_value()) { //Insert
638643
dsrpub_edge.write(&delta_edge.value());
@@ -645,7 +650,7 @@ requires (std::is_same_v<std::remove_cvref_t<Ed>, DSR::Edge>)
645650
atts_names.begin(),
646651
[](auto &&x) { return x.attr_name(); });
647652

648-
emit update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id });
653+
emitter.update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id });
649654

650655
}
651656
}
@@ -687,9 +692,9 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key)
687692
if (delta.has_value())
688693
{
689694
if (!copy) {
690-
emit del_edge_signal(from, to, key, SignalInfo{ agent_id });
695+
emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id });
691696
if (deleted_edge.has_value()) {
692-
emit deleted_edge_signal(*deleted_edge);
697+
emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id });
693698
}
694699
dsrpub_edge.write(&delta.value());
695700
}
@@ -718,9 +723,9 @@ bool DSRGraph::delete_edge(const std::string &from, const std::string &to, const
718723
if (delta.has_value())
719724
{
720725
if (!copy) {
721-
emit del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id });
726+
emitter.del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id });
722727
if (deleted_edge.has_value()) {
723-
emit deleted_edge_signal(*deleted_edge);
728+
emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id });
724729
}
725730
dsrpub_edge.write(&delta.value());
726731
}
@@ -1069,34 +1074,36 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg)
10691074

10701075
if (joined) {
10711076
if (signal) {
1072-
emit update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() });
1077+
emitter.update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() });
10731078
for (const auto &[k, v] : nodes.at(id).read_reg().fano()) {
10741079
//std::cout << "[JOIN NODE] add edge FROM: "<< id << ", " << k.first << ", " << k.second << std::endl;
1075-
emit update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() });
1080+
emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() });
10761081
}
10771082

10781083
for (const auto &[k, v]: map_new_to_edges)
10791084
{
10801085
//std::cout << "[JOIN NODE] add edge TO: "<< k << ", " << id << ", " << v << std::endl;
1081-
emit update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() });
1086+
emitter.update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() });
10821087
}
10831088
} else {
1084-
emit del_node_signal(id, SignalInfo{ mvreg.agent_id() });
1089+
emitter.del_node_signal(id, SignalInfo{ mvreg.agent_id() });
10851090
if (maybe_deleted_node.has_value()) {
1086-
emit deleted_node_signal(Node(*maybe_deleted_node));
1091+
Node tmp_node(*maybe_deleted_node);
1092+
emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id });
10871093
for (const auto &node: maybe_deleted_node->fano()) {
10881094
//std::cout << "[JOIN NODE] delete edge FROM: "<< node.second.read_reg().from() << ", " << node.second.read_reg().to() << ", " << node.second.read_reg().type() << std::endl;
1089-
emit del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(),
1095+
emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(),
10901096
node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() });
1091-
emit deleted_edge_signal(Edge(node.second.read_reg()));
1097+
Edge tmp_edge(node.second.read_reg());
1098+
emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id });
10921099
}
10931100
}
10941101

10951102
//TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong...
10961103
for (const auto &[from, type] : cache_map_to_edges.value()) {
10971104
//std::cout << "[JOIN NODE] delete edge TO: "<< from << ", " << id << ", " << type << std::endl;
1098-
emit del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() });
1099-
//emit deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this
1105+
emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() });
1106+
//emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this
11001107
}
11011108

11021109
}
@@ -1238,12 +1245,12 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg)
12381245
if (joined) {
12391246
if (signal) {
12401247
//std::cout << "[JOIN EDGE] add edge: "<< from << ", " << to << ", " << type << std::endl;
1241-
emit update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
1248+
emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
12421249
} else {
12431250
//std::cout << "[JOIN EDGE] delete edge: "<< from << ", " << to << ", " << type << std::endl;
1244-
emit del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
1251+
emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() });
12451252
if (deleted_edge.has_value()) {
1246-
emit deleted_edge_signal(*deleted_edge);
1253+
emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id });
12471254
}
12481255
}
12491256
}
@@ -1491,26 +1498,28 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph)
14911498
if (signal) {
14921499
//check what change is joined
14931500
if (!nd.has_value() || nd->attrs() != nodes[id].read_reg().attrs()) {
1494-
emit update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch });
1501+
emitter.update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch });
14951502
} else if (nd.value() != nodes[id].read_reg()) {
14961503
auto iter = nodes[id].read_reg().fano();
14971504
for (const auto &[k, v] : nd->fano()) {
14981505
if (!iter.contains(k)) {
1499-
emit del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
1506+
emitter.del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
15001507
if (v.dk.ds.size() > 0) {
1501-
emit deleted_edge_signal(Edge(v.read_reg()));
1508+
Edge tmp_edge(v.read_reg());
1509+
emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id });
15021510
}
15031511
}
15041512
}
15051513
for (const auto &[k, v] : iter) {
15061514
if (auto it = nd->fano().find(k); it == nd->fano().end() or it->second != v)
1507-
emit update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
1515+
emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch });
15081516
}
15091517
}
15101518
} else {
1511-
emit del_node_signal(id, SignalInfo{ agent_id_ch });
1519+
emitter.del_node_signal(id, SignalInfo{ agent_id_ch });
15121520
if (nd.has_value()) {
1513-
emit deleted_node_signal(Node(*nd));
1521+
Node tmp_node(*nd);
1522+
emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id_ch });
15141523
}
15151524
}
15161525

@@ -1689,8 +1698,8 @@ void DSRGraph::edge_attrs_subscription_thread()
16891698
}
16901699

16911700

1692-
emit update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()});
1693-
emit update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()});
1701+
emitter.update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()});
1702+
emitter.update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()});
16941703

16951704
});
16961705
}
@@ -1759,8 +1768,8 @@ void DSRGraph::node_attrs_subscription_thread()
17591768
sig.emplace_back(std::move(opt_str.value()));
17601769
}
17611770

1762-
emit update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()});
1763-
emit update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()});
1771+
emitter.update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()});
1772+
emitter.update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()});
17641773
});
17651774
}
17661775
}

api/dsr_rt_api.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,12 +341,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, const std::vector<fl
341341

342342
if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value());
343343

344-
emit G->update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
345-
emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
344+
G->emitter.update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
345+
G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
346346
if (!no_send)
347347
{
348-
emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
349-
emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
348+
G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
349+
G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
350350
}
351351
}
352352
}
@@ -528,12 +528,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, std::vector<float> &
528528

529529
if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value());
530530

531-
emit G->update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
532-
emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
531+
G->emitter.update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id });
532+
G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id });
533533
if (!no_send)
534534
{
535-
emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
536-
emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
535+
G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id });
536+
G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id });
537537
}
538538
}
539539
}

api/dsr_signal_emitter.cpp

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#include <dsr/api/dsr_api.h>
2+
#include <dsr/api/dsr_signal_emitter.h>
3+
#include <dsr/api/dsr_signal_info.h>
4+
5+
void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a,
6+
const std::string &b,
7+
SignalInfo c) {
8+
9+
tp.spawn_task([=, this] {
10+
for (auto fn : uns_fns) {
11+
if (fn)
12+
fn(a, b);
13+
}
14+
});
15+
}
16+
void DSR::QueuedSignalRunner::run_update_node_attr_signal(
17+
std::uint64_t a, const std::vector<std::string> &b, SignalInfo c) {
18+
tp.spawn_task([=, this] {
19+
for (auto fn : unas_fns) {
20+
if (fn)
21+
fn(a, b);
22+
}
23+
});
24+
}
25+
void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a,
26+
std::uint64_t b,
27+
const std::string &c,
28+
SignalInfo d) {
29+
tp.spawn_task([=, this] {
30+
for (auto fn : ues_fns) {
31+
if (fn)
32+
fn(a, b, c);
33+
}
34+
});
35+
}
36+
37+
void DSR::QueuedSignalRunner::run_update_edge_attr_signal(
38+
std::uint64_t a, std::uint64_t b, const std::string &c,
39+
const std::vector<std::string> &d, SignalInfo e) {
40+
tp.spawn_task([=, this] {
41+
for (auto fn : ueas_fns) {
42+
if (fn)
43+
fn(a, b, c, d);
44+
}
45+
});
46+
}
47+
48+
void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a,
49+
std::uint64_t b,
50+
const std::string &c,
51+
SignalInfo d) {
52+
tp.spawn_task([=, this] {
53+
for (auto fn : des_fns) {
54+
if (fn)
55+
fn(a, b, c);
56+
}
57+
});
58+
}
59+
void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a,
60+
SignalInfo b) {
61+
tp.spawn_task([=, this] {
62+
for (auto fn : den_fns) {
63+
if (fn)
64+
fn(a);
65+
}
66+
});
67+
}
68+
69+
void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a,
70+
SignalInfo b) {
71+
tp.spawn_task([=, this] {
72+
for (auto fn : dn_fns) {
73+
if (fn)
74+
fn(a);
75+
}
76+
});
77+
}
78+
void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a,
79+
SignalInfo b) {
80+
tp.spawn_task([=, this] {
81+
for (auto fn : de_fns) {
82+
if (fn)
83+
fn(a);
84+
}
85+
});
86+
}

0 commit comments

Comments
 (0)