From d168be61999130d0c20cc31b3e6105647d928347 Mon Sep 17 00:00:00 2001 From: juancarlosgg Date: Fri, 31 Oct 2025 12:11:41 +0100 Subject: [PATCH] [python] add alternative queued signal implementation --- api/CMakeLists.txt | 1 + api/dsr_api.cpp | 95 ++++++----- api/dsr_rt_api.cpp | 16 +- api/dsr_signal_emitter.cpp | 86 ++++++++++ api/include/dsr/api/dsr_api.h | 50 +++++- api/include/dsr/api/dsr_graph_settings.h | 2 + api/include/dsr/api/dsr_signal_emitter.h | 126 ++++++++++++++ python-wrapper/python_api.cpp | 209 ++--------------------- python-wrapper/test/TestSignals.py | 2 + python-wrapper/test/TestSignalsQueued.py | 32 ++-- 10 files changed, 352 insertions(+), 267 deletions(-) create mode 100644 api/dsr_signal_emitter.cpp create mode 100644 api/include/dsr/api/dsr_signal_emitter.h diff --git a/api/CMakeLists.txt b/api/CMakeLists.txt index 154e90d..31d4e75 100644 --- a/api/CMakeLists.txt +++ b/api/CMakeLists.txt @@ -51,6 +51,7 @@ target_sources(dsr_api dsr_inner_eigen_api.cpp dsr_rt_api.cpp dsr_utils.cpp + dsr_signal_emitter.cpp GHistorySaver.cpp ${GEOM_API_SOURCES} ${headers_to_moc} diff --git a/api/dsr_api.cpp b/api/dsr_api.cpp index 1c06d6e..d7f837f 100644 --- a/api/dsr_api.cpp +++ b/api/dsr_api.cpp @@ -5,6 +5,7 @@ #include "dsr/api/dsr_graph_settings.h" #include "dsr/core/topics/IDLGraph.hpp" #include "dsr/core/types/translator.h" +#include "include/dsr/api/dsr_signal_emitter.h" #include #include #include @@ -44,7 +45,11 @@ DSRGraph::DSRGraph(GraphSettings settings) : qDebug() << "Agent name: " << QString::fromStdString(agent_name); utils = std::make_unique(this); - + if (settings.signal_mode == SignalMode::QT) { + set_qt_signals(); + } else { + set_queued_signals(); + } // RTPS Create participant auto[suc, participant_handle] = dsrparticipant.init(agent_id, agent_name, settings.same_host, ParticipantChangeFunctor(this, [&](DSR::DSRGraph *graph, @@ -122,8 +127,8 @@ DSRGraph::DSRGraph(GraphSettings settings) : qDebug() << __FUNCTION__ << "Constructor finished OK"; } -DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id) - : DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id}) +DSRGraph::DSRGraph(std::string name, uint32_t id, const std::string &dsr_input_file, bool all_same_host, int8_t domain_id, SignalMode mode) + : DSR::DSRGraph(GraphSettings {id, 5, 1, name, dsr_input_file, "", all_same_host, GraphSettings::LOGLEVEL::INFOL, domain_id, mode}) {} @@ -202,10 +207,10 @@ std::optional DSRGraph::insert_node(No &&node) if (delta.has_value()) { dsrpub_node.write(&delta.value()); - emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); + emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); for (const auto &[k, v]: node.fano()) { - emit update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id}); + emitter.update_edge_signal(node.id(), k.first, k.second, SignalInfo{agent_id}); } } } @@ -291,13 +296,13 @@ requires (std::is_same_v, DSR::Node>) if (!copy) { if (vec_node_attr.has_value()) { dsrpub_node_attrs.write(&vec_node_attr.value()); - emit update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); + emitter.update_node_signal(node.id(), node.type(), SignalInfo{agent_id}); std::vector atts_names(vec_node_attr->size()); std::transform(std::make_move_iterator(vec_node_attr->begin()), std::make_move_iterator(vec_node_attr->end()), atts_names.begin(), [](auto &&x) { return x.attr_name(); }); - emit update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id}); + emitter.update_node_attr_signal(node.id(), atts_names, SignalInfo{agent_id}); } } @@ -383,16 +388,16 @@ bool DSRGraph::delete_node(const std::string &name) if (result) { if (!copy) { - emit del_node_signal(id.value(), SignalInfo{agent_id}); - if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id}); + emitter.del_node_signal(id.value(), SignalInfo{agent_id}); + if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); for (auto &a : delta_vec) { dsrpub_edge.write(&a); } for (auto &edge : deleted_edges) { - emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); - emit deleted_edge_signal(edge); + emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); + emitter.deleted_edge_signal(edge, SignalInfo{ agent_id }); } } return true; @@ -416,16 +421,16 @@ bool DSRGraph::delete_node(uint64_t id) if (result) { if (!copy) { - emit del_node_signal(id, SignalInfo{ agent_id }); - if (node_signal) emit deleted_node_signal(*node_signal, SignalInfo{agent_id}); + emitter.del_node_signal(id, SignalInfo{ agent_id }); + if (node_signal) emitter.deleted_node_signal(*node_signal, SignalInfo{agent_id}); dsrpub_node.write(&deleted_node.value()); for (auto &a : delta_vec) { dsrpub_edge.write(&a); } for (auto &edge : deleted_edges) { - emit del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); - emit deleted_edge_signal(edge); + emitter.del_edge_signal(edge.from(), edge.to(), edge.type(), SignalInfo{ agent_id }); + emitter.deleted_edge_signal(edge, SignalInfo{ agent_id }); } } return true; @@ -632,7 +637,7 @@ requires (std::is_same_v, DSR::Edge>) } if (result) { if (!copy) { - emit update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); + emitter.update_edge_signal(attrs.from(), attrs.to(), attrs.type(), SignalInfo{ agent_id }); if (delta_edge.has_value()) { //Insert dsrpub_edge.write(&delta_edge.value()); @@ -645,7 +650,7 @@ requires (std::is_same_v, DSR::Edge>) atts_names.begin(), [](auto &&x) { return x.attr_name(); }); - emit update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id }); + emitter.update_edge_attr_signal(attrs.from(), attrs.to(), attrs.type(), atts_names, SignalInfo{ agent_id }); } } @@ -687,9 +692,9 @@ bool DSRGraph::delete_edge(uint64_t from, uint64_t to, const std::string &key) if (delta.has_value()) { if (!copy) { - emit del_edge_signal(from, to, key, SignalInfo{ agent_id }); + emitter.del_edge_signal(from, to, key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } dsrpub_edge.write(&delta.value()); } @@ -718,9 +723,9 @@ bool DSRGraph::delete_edge(const std::string &from, const std::string &to, const if (delta.has_value()) { if (!copy) { - emit del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id }); + emitter.del_edge_signal(id_from.value(), id_to.value(), key, SignalInfo{ agent_id }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } dsrpub_edge.write(&delta.value()); } @@ -1069,34 +1074,36 @@ void DSRGraph::join_delta_node(IDL::MvregNode &&mvreg) if (joined) { if (signal) { - emit update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() }); + emitter.update_node_signal(id, nodes.at(id).read_reg().type(), SignalInfo{ mvreg.agent_id() }); for (const auto &[k, v] : nodes.at(id).read_reg().fano()) { //std::cout << "[JOIN NODE] add edge FROM: "<< id << ", " << k.first << ", " << k.second << std::endl; - emit update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() }); + emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ mvreg.agent_id() }); } for (const auto &[k, v]: map_new_to_edges) { //std::cout << "[JOIN NODE] add edge TO: "<< k << ", " << id << ", " << v << std::endl; - emit update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() }); + emitter.update_edge_signal(k, id, v, SignalInfo{ mvreg.agent_id() }); } } else { - emit del_node_signal(id, SignalInfo{ mvreg.agent_id() }); + emitter.del_node_signal(id, SignalInfo{ mvreg.agent_id() }); if (maybe_deleted_node.has_value()) { - emit deleted_node_signal(Node(*maybe_deleted_node)); + Node tmp_node(*maybe_deleted_node); + emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id }); for (const auto &node: maybe_deleted_node->fano()) { //std::cout << "[JOIN NODE] delete edge FROM: "<< node.second.read_reg().from() << ", " << node.second.read_reg().to() << ", " << node.second.read_reg().type() << std::endl; - emit del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), + emitter.del_edge_signal(node.second.read_reg().from(), node.second.read_reg().to(), node.second.read_reg().type(), SignalInfo{ mvreg.agent_id() }); - emit deleted_edge_signal(Edge(node.second.read_reg())); + Edge tmp_edge(node.second.read_reg()); + emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id }); } } //TODO: deleted_edge_signal. update_maps_node_delete was called before so the maps are probably wrong... for (const auto &[from, type] : cache_map_to_edges.value()) { //std::cout << "[JOIN NODE] delete edge TO: "<< from << ", " << id << ", " << type << std::endl; - emit del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); - //emit deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this + emitter.del_edge_signal(from, id, type, SignalInfo{ mvreg.agent_id() }); + //emitter.deleted_edge_signal(Edge(node.second.read_reg())); TODO: fix this } } @@ -1238,12 +1245,12 @@ void DSRGraph::join_delta_edge(IDL::MvregEdge &&mvreg) if (joined) { if (signal) { //std::cout << "[JOIN EDGE] add edge: "<< from << ", " << to << ", " << type << std::endl; - emit update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + emitter.update_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); } else { //std::cout << "[JOIN EDGE] delete edge: "<< from << ", " << to << ", " << type << std::endl; - emit del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); + emitter.del_edge_signal(from, to, type, SignalInfo{ mvreg.agent_id() }); if (deleted_edge.has_value()) { - emit deleted_edge_signal(*deleted_edge); + emitter.deleted_edge_signal(*deleted_edge, SignalInfo{ agent_id }); } } } @@ -1491,26 +1498,28 @@ void DSRGraph::join_full_graph(IDL::OrMap &&full_graph) if (signal) { //check what change is joined if (!nd.has_value() || nd->attrs() != nodes[id].read_reg().attrs()) { - emit update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch }); + emitter.update_node_signal(id, nodes[id].read_reg().type(), SignalInfo{ agent_id_ch }); } else if (nd.value() != nodes[id].read_reg()) { auto iter = nodes[id].read_reg().fano(); for (const auto &[k, v] : nd->fano()) { if (!iter.contains(k)) { - emit del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); + emitter.del_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); if (v.dk.ds.size() > 0) { - emit deleted_edge_signal(Edge(v.read_reg())); + Edge tmp_edge(v.read_reg()); + emitter.deleted_edge_signal(tmp_edge, SignalInfo{ agent_id }); } } } for (const auto &[k, v] : iter) { if (auto it = nd->fano().find(k); it == nd->fano().end() or it->second != v) - emit update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); + emitter.update_edge_signal(id, k.first, k.second, SignalInfo{ agent_id_ch }); } } } else { - emit del_node_signal(id, SignalInfo{ agent_id_ch }); + emitter.del_node_signal(id, SignalInfo{ agent_id_ch }); if (nd.has_value()) { - emit deleted_node_signal(Node(*nd)); + Node tmp_node(*nd); + emitter.deleted_node_signal(tmp_node, SignalInfo{ agent_id_ch }); } } @@ -1689,8 +1698,8 @@ void DSRGraph::edge_attrs_subscription_thread() } - emit update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()}); - emit update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_edge_attr_signal(from, to, type, sig, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_edge_signal(from, to, type, SignalInfo{samples.vec().at(0).agent_id()}); }); } @@ -1759,8 +1768,8 @@ void DSRGraph::node_attrs_subscription_thread() sig.emplace_back(std::move(opt_str.value())); } - emit update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()}); - emit update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_node_attr_signal(id, sig, SignalInfo{samples.vec().at(0).agent_id()}); + emitter.update_node_signal(id, type, SignalInfo{samples.vec().at(0).agent_id()}); }); } } diff --git a/api/dsr_rt_api.cpp b/api/dsr_rt_api.cpp index 3998b8a..8fb43e8 100644 --- a/api/dsr_rt_api.cpp +++ b/api/dsr_rt_api.cpp @@ -341,12 +341,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, const std::vectordsrpub_node_attrs.write(&node2.value()); - emit G->update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); - emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); + G->emitter.update_edge_attr_signal(n.id(), to, "RT" ,{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); + G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); if (!no_send) { - emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); - emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); + G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); + G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); } } } @@ -528,12 +528,12 @@ void RT_API::insert_or_assign_edge_RT(Node &n, uint64_t to, std::vector & if (!no_send and node2.has_value()) G->dsrpub_node_attrs.write(&node2.value()); - emit G->update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); - emit G->update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); + G->emitter.update_edge_attr_signal(n.id(), to, "RT",{"rt_rotation_euler_xyz", "rt_translation"}, SignalInfo{ G->agent_id }); + G->emitter.update_edge_signal(n.id(), to, "RT", SignalInfo{ G->agent_id }); if (!no_send) { - emit G->update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); - emit G->update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); + G->emitter.update_node_signal(to_n->id(), to_n->type(), SignalInfo{ G->agent_id }); + G->emitter.update_node_attr_signal(to_n->id(), {"level", "parent"}, SignalInfo{ G->agent_id }); } } } diff --git a/api/dsr_signal_emitter.cpp b/api/dsr_signal_emitter.cpp new file mode 100644 index 0000000..bc8e1ee --- /dev/null +++ b/api/dsr_signal_emitter.cpp @@ -0,0 +1,86 @@ +#include +#include +#include + +void DSR::QueuedSignalRunner::run_update_node_signal(std::uint64_t a, + const std::string &b, + SignalInfo c) { + + tp.spawn_task([=, this] { + for (auto fn : uns_fns) { + if (fn) + fn(a, b); + } + }); +} +void DSR::QueuedSignalRunner::run_update_node_attr_signal( + std::uint64_t a, const std::vector &b, SignalInfo c) { + tp.spawn_task([=, this] { + for (auto fn : unas_fns) { + if (fn) + fn(a, b); + } + }); +} +void DSR::QueuedSignalRunner::run_update_edge_signal(std::uint64_t a, + std::uint64_t b, + const std::string &c, + SignalInfo d) { + tp.spawn_task([=, this] { + for (auto fn : ues_fns) { + if (fn) + fn(a, b, c); + } + }); +} + +void DSR::QueuedSignalRunner::run_update_edge_attr_signal( + std::uint64_t a, std::uint64_t b, const std::string &c, + const std::vector &d, SignalInfo e) { + tp.spawn_task([=, this] { + for (auto fn : ueas_fns) { + if (fn) + fn(a, b, c, d); + } + }); +} + +void DSR::QueuedSignalRunner::run_del_edge_signal(std::uint64_t a, + std::uint64_t b, + const std::string &c, + SignalInfo d) { + tp.spawn_task([=, this] { + for (auto fn : des_fns) { + if (fn) + fn(a, b, c); + } + }); +} +void DSR::QueuedSignalRunner::run_del_node_signal(std::uint64_t a, + SignalInfo b) { + tp.spawn_task([=, this] { + for (auto fn : den_fns) { + if (fn) + fn(a); + } + }); +} + +void DSR::QueuedSignalRunner::run_deleted_node_signal(const Node &a, + SignalInfo b) { + tp.spawn_task([=, this] { + for (auto fn : dn_fns) { + if (fn) + fn(a); + } + }); +} +void DSR::QueuedSignalRunner::run_deleted_edge_signal(const Edge &a, + SignalInfo b) { + tp.spawn_task([=, this] { + for (auto fn : de_fns) { + if (fn) + fn(a); + } + }); +} \ No newline at end of file diff --git a/api/include/dsr/api/dsr_api.h b/api/include/dsr/api/dsr_api.h index 1766740..b48dd06 100644 --- a/api/include/dsr/api/dsr_api.h +++ b/api/include/dsr/api/dsr_api.h @@ -30,11 +30,13 @@ #include "dsr/api/dsr_utils.h" #include "dsr/api/dsr_signal_info.h" #include "dsr/api/dsr_graph_settings.h" +#include "dsr/api/dsr_signal_emitter.h" #include "dsr/core/types/type_checking/dsr_edge_type.h" #include "dsr/core/types/type_checking/dsr_node_type.h" #include "dsr/core/types/type_checking/dsr_attr_name.h" #include "dsr/core/utils.h" #include "dsr/core/id_generator.h" +#include "dsr_signal_emitter.h" #include "threadpool/threadpool.h" #include @@ -57,9 +59,9 @@ namespace DSR size_t size() const; DSRGraph(GraphSettings settings); - DSRGraph(std::string name, uint32_t id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0); - [[deprecated("root parameter is not used anymore")]] DSRGraph(uint64_t root, std::string name, int id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0) - : DSRGraph(name, id, dsr_input_file, all_same_host, domain_id) + DSRGraph(std::string name, uint32_t id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0, SignalMode = SignalMode::QT); + [[deprecated("root parameter is not used anymore")]] DSRGraph(uint64_t root, std::string name, int id, const std::string& dsr_input_file = std::string(), bool all_same_host = true, int8_t domain_id=0, SignalMode mode = SignalMode::QT) + : DSRGraph(name, id, dsr_input_file, all_same_host, domain_id, mode) {} ~DSRGraph() override; @@ -539,6 +541,14 @@ namespace DSR return ret_vec; } + + ////////////////////////////////////////////////// + ///// QueuedSignals for python + ///////////////////////////////////////////////// + + QueuedSignalRunner* get_signal_runner() { + return emitter.runner.get(); + } private: DSRGraph(const DSRGraph& G); //Private constructor for DSRCopy @@ -557,6 +567,40 @@ namespace DSR bool same_host; id_generator generator; GraphSettings::LOGLEVEL log_level; + signals_fns emitter; + + ////////////////////////////////////////////////////////////////////////// + // Signal method + /////////////////////////////////////////////////////////////////////////// + + void set_qt_signals (){ + emitter = { + [this](std::uint64_t a, const std::string & b, SignalInfo c = {}) { emit update_node_signal(a, b, c); }, + [this](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { emit update_node_attr_signal(a, b, c); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { emit update_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { emit update_edge_attr_signal(a, b, c, d, e); }, + [this](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { emit del_edge_signal(a, b, c, d); }, + [this](std::uint64_t a, SignalInfo b = {}) { emit del_node_signal(a, b); }, + [this](const Node& a, SignalInfo b = {}) { emit deleted_node_signal(a, b); }, + [this](const Edge& a, SignalInfo b = {}) { emit deleted_edge_signal(a, b); }, + nullptr + }; + } + + void set_queued_signals (){ + auto runner = new QueuedSignalRunner(); + emitter = { + [runner](std::uint64_t a, const std::string & b, SignalInfo c = {}) { runner->run_update_node_signal(a, b, c); }, + [runner](std::uint64_t a, const std::vector &b, SignalInfo c = {}) { runner->run_update_node_attr_signal(a, b, c); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { runner->run_update_edge_signal(a, b, c, d); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, const std::vector &d, SignalInfo e = {}) { runner->run_update_edge_attr_signal(a, b, c, d, e); }, + [runner](std::uint64_t a, std::uint64_t b, const std::string & c, SignalInfo d = {}) { runner->run_del_edge_signal(a, b, c, d); }, + [runner](std::uint64_t a, SignalInfo b = {}) { runner->run_del_node_signal(a, b); }, + [runner](const Node& a, SignalInfo b = {}) { runner->run_deleted_node_signal(a, b); }, + [runner](const Edge& a, SignalInfo b = {}) { runner->run_deleted_edge_signal(a, b); }, + std::unique_ptr(runner) + }; + } ////////////////////////////////////////////////////////////////////////// // Cache maps diff --git a/api/include/dsr/api/dsr_graph_settings.h b/api/include/dsr/api/dsr_graph_settings.h index c9707d9..e26874d 100644 --- a/api/include/dsr/api/dsr_graph_settings.h +++ b/api/include/dsr/api/dsr_graph_settings.h @@ -2,6 +2,7 @@ #include #include +#include namespace DSR { @@ -17,6 +18,7 @@ struct GraphSettings { DEBUGL = 0, INFOL, WARNINGL, ERRORL } log_level {LOGLEVEL::INFOL}; int8_t domain_id = 0; + SignalMode signal_mode = QT; }; } \ No newline at end of file diff --git a/api/include/dsr/api/dsr_signal_emitter.h b/api/include/dsr/api/dsr_signal_emitter.h new file mode 100644 index 0000000..1392e6a --- /dev/null +++ b/api/include/dsr/api/dsr_signal_emitter.h @@ -0,0 +1,126 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DSR { + +struct Node; +struct Edge; +struct SignalInfo; + +enum SignalMode { QT, Queue }; + +typedef std::function + update_node_signal_t; +typedef std::function &, + SignalInfo)> + update_node_attr_signal_t; +typedef std::function + update_edge_signal_t; +typedef std::function &, SignalInfo)> + update_edge_attr_signal_t; +typedef std::function + del_edge_signal_t; +typedef std::function del_node_signal_t; +typedef std::function deleted_node_signal_t; +typedef std::function deleted_edge_signal_t; + +typedef std::function + update_node_signal_noinfo_t; +typedef std::function &)> + update_node_attr_signal_noinfo_t; +typedef std::function + update_edge_signal_noinfo_t; +typedef std::function &)> + update_edge_attr_signal_noinfo_t; +typedef update_edge_signal_noinfo_t del_edge_signal_noinfo_t; +typedef std::function del_node_signal_noinfo_t; +typedef std::function deleted_node_signal_noinfo_t; +typedef std::function deleted_edge_signal_noinfo_t; + +typedef std::variant + signal_fn_ptr_t; + +struct QueuedSignalRunner { + ThreadPool tp; + std::vector uns_fns; + std::vector unas_fns; + std::vector ues_fns; + std::vector ueas_fns; + std::vector des_fns; + std::vector den_fns; + std::vector dn_fns; + std::vector de_fns; + explicit QueuedSignalRunner() : tp(2) + {} + + void connect(signal_fn_ptr_t fn, const std::string& type) { + std::visit( + [&, this](auto &&arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) + unas_fns.push_back(arg); + else if constexpr (std::is_same_v) + uns_fns.push_back(arg); + else if constexpr (std::is_same_v) { + if (type == "UPDATE_EDGE") + ues_fns.push_back(arg); + else + des_fns.push_back(arg); + } + else if constexpr (std::is_same_v) + ueas_fns.push_back(arg); + else if constexpr (std::is_same_v) + den_fns.push_back(arg); + else if constexpr (std::is_same_v) + dn_fns.push_back(arg); + else if constexpr (std::is_same_v) + de_fns.push_back(arg); + else + std::cerr << "Python signals don't use SignalInfo parameter\n"; + }, + fn); + } + + void run_update_node_signal(std::uint64_t a, const std::string &b, + SignalInfo c); + void run_update_node_attr_signal(std::uint64_t a, + const std::vector &b, + SignalInfo c); + void run_update_edge_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, SignalInfo d); + void run_update_edge_attr_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, + const std::vector &d, + SignalInfo e); + void run_del_edge_signal(std::uint64_t a, std::uint64_t b, + const std::string &c, SignalInfo d); + void run_del_node_signal(std::uint64_t a, SignalInfo b); + void run_deleted_node_signal(const Node &a, SignalInfo b); + void run_deleted_edge_signal(const Edge &a, SignalInfo b); +}; + +struct signals_fns { + update_node_signal_t update_node_signal; + update_node_attr_signal_t update_node_attr_signal; + update_edge_signal_t update_edge_signal; + update_edge_attr_signal_t update_edge_attr_signal; + del_edge_signal_t del_edge_signal; + del_node_signal_t del_node_signal; + deleted_node_signal_t deleted_node_signal; + deleted_edge_signal_t deleted_edge_signal; + std::unique_ptr runner; +}; +} +; // namespace DSR \ No newline at end of file diff --git a/python-wrapper/python_api.cpp b/python-wrapper/python_api.cpp index 98b8e88..46d4adb 100644 --- a/python-wrapper/python_api.cpp +++ b/python-wrapper/python_api.cpp @@ -3,6 +3,8 @@ // #include "dsr/api/dsr_api.h" +#include +#include #include using namespace DSR; @@ -149,8 +151,6 @@ PYBIND11_MAKE_OPAQUE(std::map, Edge>) PYBIND11_MAKE_OPAQUE(std::map) -static QCoreApplication *app = nullptr; - PYBIND11_MODULE(pydsr, m) { py::bind_map, Edge>>(m, "MapStringEdge"); py::bind_dsr_map>(m, "MapStringAttribute"); @@ -159,30 +159,6 @@ PYBIND11_MODULE(pydsr, m) { uint64_t local_agent_id = -1; - /*std::thread signal_thread ([] { - int argc = 0; - char *argv[] = {nullptr}; - auto *QApp = new QCoreApplication(argc, argv); - app = QApp; - QApp->exec(); - }); - - signal_thread.detach();*/ - - // ugly busy wait until the QApp thread is running. - while (app != nullptr){} - - py::module::import("atexit").attr("register")( - py::cpp_function{ - [&]() -> void { - if (app) { - //QCoreApplication::quit(); - std::exit(0); - } - } - } - ); - //Disable messages from Qt. qInstallMessageHandler([](QtMsgType type, const QMessageLogContext &context, const QString &msg) { if (type == QtCriticalMsg || type == QtFatalMsg) { @@ -244,179 +220,18 @@ PYBIND11_MODULE(pydsr, m) { .export_values(); - sig.def("connect", [](DSRGraph *G, signal_type type, callback_types fn_callback) { - switch (type) { - case UPDATE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_signal, - std::get>(fn_callback)); + sig.def("connect", [&](DSRGraph *G, signal_type type, callback_types fn_callback) { - } catch (std::exception &e) { - std::cout << "Update Node Callback must be (int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_NODE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_attr_signal, - std::get &)>>( - fn_callback)); - - } catch (std::exception &e) { - std::cout << "Update Node Attribute Callback must be (int, [str])\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_signal, - std::get>( - fn_callback)); - } catch (std::exception &e) { - std::cout << "Update Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_attr_signal, - std::get &)>>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Update Edge Attribute Callback must be (int, int, str, [str])\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::del_edge_signal, - std::get>( - fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::del_node_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (int)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_node_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Node)\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_edge_signal, - std::get>(fn_callback)); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Edge)\n " << std::endl; - throw e; - } - break; - default: - throw std::logic_error("Invalid signal type"); + auto runner = G->get_signal_runner(); + if (!runner) { + throw std::runtime_error("Signal runner doesn't exist"); } - }); - - sig.def("connect2", [&](DSRGraph *G, signal_type type, callback_types fn_callback) { - - switch (type) { - case UPDATE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - - } catch (std::exception &e) { - std::cout << "Update Node Callback must be (int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_NODE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_node_attr_signal, app, - std::get &)>>(fn_callback), - Qt::QueuedConnection); - - } catch (std::exception &e) { - std::cout << "Update Node Attribute Callback must be (int, [str])\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Update Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case UPDATE_EDGE_ATTR: - try { - QObject::connect(G, &DSR::DSRGraph::update_edge_attr_signal, app, - std::get &)>>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Update Edge Attribute Callback must be (int, int, str, [str])\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE: - try { - QObject::connect(G, &DSR::DSRGraph::del_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Edge Callback must be (int, int, str)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE: - try { - QObject::connect(G, &DSR::DSRGraph::del_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (int)\n " << std::endl; - throw e; - } - break; - case DELETE_NODE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_node_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Node)\n " << std::endl; - throw e; - } - break; - case DELETE_EDGE_OBJ: - try { - QObject::connect(G, &DSR::DSRGraph::deleted_edge_signal, app, - std::get>(fn_callback), - Qt::QueuedConnection); - } catch (std::exception &e) { - std::cout << "Delete Node Callback must be (pydsr.Edge)\n " << std::endl; - throw e; - } - break; - default: - throw std::logic_error("Invalid signal type"); + try { + runner->connect(fn_callback, std::to_string(type)); + } catch (std::exception &e) { + std::cout << "Update Node Callback must be (int, str)\n " << std::endl; + throw e; } }); @@ -737,7 +552,7 @@ PYBIND11_MODULE(pydsr, m) { const std::string &dsr_input_file = "", bool all_same_host = true, int8_t domain_id = 0) -> std::unique_ptr { local_agent_id = id; - auto g = std::make_unique(root, name, id, dsr_input_file, all_same_host, domain_id); + auto g = std::make_unique(root, name, id, dsr_input_file, all_same_host, domain_id, SignalMode::Queue); return g; }), "root"_a, "name"_a, "id"_a, "dsr_input_file"_a = "", "all_same_host"_a = true, "domain_id"_a=0, py::call_guard()) diff --git a/python-wrapper/test/TestSignals.py b/python-wrapper/test/TestSignals.py index 1f83557..4e3464a 100644 --- a/python-wrapper/test/TestSignals.py +++ b/python-wrapper/test/TestSignals.py @@ -96,3 +96,5 @@ def connect(self): result = g.delete_node(2) console.print(result) +import time +time.sleep(1) \ No newline at end of file diff --git a/python-wrapper/test/TestSignalsQueued.py b/python-wrapper/test/TestSignalsQueued.py index e2ece4c..f0800b1 100644 --- a/python-wrapper/test/TestSignalsQueued.py +++ b/python-wrapper/test/TestSignalsQueued.py @@ -57,14 +57,14 @@ def deleted_edge_obj(self, edge: Edge): console.print(f"DELETED EDGE OBJ: {edge}", style='green') def connect(self): - signals.connect2(g, signals.UPDATE_NODE_ATTR, self.update_node_att) - signals.connect2(g, signals.UPDATE_NODE, self.update_node) - signals.connect2(g, signals.DELETE_NODE, self.delete_node) - signals.connect2(g, signals.UPDATE_EDGE, self.update_edge) - signals.connect2(g, signals.UPDATE_EDGE_ATTR, self.update_edge_att) - signals.connect2(g, signals.DELETE_EDGE, self.delete_edge) - signals.connect2(g, signals.DELETE_NODE_OBJ, self.deleted_node_obj) - signals.connect2(g, signals.DELETE_EDGE_OBJ, self.deleted_edge_obj) + signals.connect(g, signals.UPDATE_NODE_ATTR, self.update_node_att) + signals.connect(g, signals.UPDATE_NODE, self.update_node) + signals.connect(g, signals.DELETE_NODE, self.delete_node) + signals.connect(g, signals.UPDATE_EDGE, self.update_edge) + signals.connect(g, signals.UPDATE_EDGE_ATTR, self.update_edge_att) + signals.connect(g, signals.DELETE_EDGE, self.delete_edge) + signals.connect(g, signals.DELETE_NODE_OBJ, self.deleted_node_obj) + signals.connect(g, signals.DELETE_EDGE_OBJ, self.deleted_edge_obj) app = QtWidgets.QApplication(sys.argv) @@ -101,14 +101,14 @@ def deleted_edge_obj(edge: Edge): agent.connect() -signals.connect2(g, signals.UPDATE_NODE_ATTR, update_node_att) -signals.connect2(g, signals.UPDATE_NODE, update_node) -signals.connect2(g, signals.DELETE_NODE, delete_node) -signals.connect2(g, signals.UPDATE_EDGE, update_edge) -signals.connect2(g, signals.UPDATE_EDGE_ATTR, update_edge_att) -signals.connect2(g, signals.DELETE_EDGE, delete_edge) -signals.connect2(g, signals.DELETE_NODE_OBJ, deleted_node_obj) -signals.connect2(g, signals.DELETE_EDGE_OBJ, deleted_edge_obj) +signals.connect(g, signals.UPDATE_NODE_ATTR, update_node_att) +signals.connect(g, signals.UPDATE_NODE, update_node) +signals.connect(g, signals.DELETE_NODE, delete_node) +signals.connect(g, signals.UPDATE_EDGE, update_edge) +signals.connect(g, signals.UPDATE_EDGE_ATTR, update_edge_att) +signals.connect(g, signals.DELETE_EDGE, delete_edge) +signals.connect(g, signals.DELETE_NODE_OBJ, deleted_node_obj) +signals.connect(g, signals.DELETE_EDGE_OBJ, deleted_edge_obj)