-
Notifications
You must be signed in to change notification settings - Fork 12
feat: implement echo connection check with ping/pong mechanism #1687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
I’m not particularly proud of this PR, mainly because it builds on top of an already messy P2P implementation and adds more complexity. To do ping/pong properly, this layer would really need a complete redesign (and if it had been designed well in the first place, the ping-pong mechanism would probably be redundant anyway). As for the comments I added: they helped me a lot in understanding the networking layer, but I can imagine they might feel excessive to someone already familiar with the code. If you think they should be removed, I’m totally fine with that. That said, feel free to review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR implements a ping/pong heartbeat mechanism to actively monitor P2P connection health, addressing issue #1647. The implementation adds bidirectional health checks where each node periodically sends ping packets to its peers and expects pong responses, automatically closing connections that fail to respond within a timeout period.
Key changes:
- Added sequence-numbered Ping/Pong packets for connection health monitoring
- Implemented a watchdog task that monitors pong timeouts and closes dead connections
- Added PongState shared between incoming/outgoing streams to track connection health with unidirectional I/O
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
@gilcu3 any idea whether this test is flaky, or if there’s something wrong with my implementation? BTW, I’ve refined the implementation a bit since I wrote this, so feel free to review it now (cc @gilcu3 @DSharifi). |
That test has something weird, as explained in #1691. But indeed here I am seeing: which means that after some time 2 nodes still did not have any presignature, so there must be something not working as expected. |
@gilcu3 I ran this test locally and it passes for me, so it seems CI-specific unless I’m missing something: |
Interesting, so all tests are passing locally for you now? Did you rebuild the mpc-node (or rebase on main, as it now occurs automatically)? If yes to all, can you try disabling that one only and see if CI passes? |
This reverts commit d047c9a.
Unfortunately, commenting out that one test didn’t help. There seems to be something going on during resharing when some nodes disconnect and then reconnect. I’ll get back to this early next year (🎄), as I wasn’t able to even quickly monkey-patch it, unfortunately. |
|
Ready for review now — I fixed the failing test (context): pytest --non-reproducible "tests/robust_ecdsa/test_parallel_sign_calls.py::test_parallel_sign_calls[3]"The fix itself was straightforward: just bumping Root cause: the initial ping/pong implementation could cause connection churn during resharing due to a timing issue. When Node A receives a By the time the outgoing connection is established, Node B has already hit the 5-second The test that used to fail now takes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
Copilot reviewed 1 out of 1 changed files in this pull request and generated 8 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tracing::info!( | ||
| "Could not connect to {}, retrying: {}, me {}", | ||
| tracing::warn!( | ||
| "Could not connect to {}: {:#}, retrying (me: {})", |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message format on line 449 uses {:#} for the error which provides the alternate debug format, but mixing it with the parenthetical "(me: {})" creates inconsistent formatting. Consider using a consistent format like "Could not connect to {}: {} (me: {})" with regular formatting, or explain why alternate formatting is needed here.
| "Could not connect to {}: {:#}, retrying (me: {})", | |
| "Could not connect to {}: {}, retrying (me: {})", |
| &format!("Ping sender for {}", target_participant_id), | ||
| async move { | ||
| let mut seq: u64 = 0; | ||
| let mut last_received_pong_seq: u64 = 0; |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The last_received_pong_seq is initialized to 0 on line 305, but the watch channel is also initialized with seq: 0 on line 263. This means if a spurious changed() event fires before the first ping is sent, the keepalive task will read seq: 0 and compare it to last_received_pong_seq: 0, treating it as a stale pong. While this is likely harmless, consider initializing last_received_pong_seq to a sentinel value like u64::MAX to make the initial state more explicit.
| conn.connectivity.any_outgoing_connection() | ||
| { | ||
| // Send the new pong info via watch channel | ||
| let _ = outgoing_conn.pong_tx.send(PongInfo { seq }); |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error from pong_tx.send() on line 623 is silently ignored with let _ =. If the watch channel receiver is closed (which happens when the keepalive task exits), this could indicate the connection is in an inconsistent state. Consider logging a warning when this send fails to help diagnose connection lifecycle issues.
| let _ = outgoing_conn.pong_tx.send(PongInfo { seq }); | |
| if let Err(err) = outgoing_conn.pong_tx.send(PongInfo { seq }) { | |
| tracing::warn!( | |
| "Failed to forward Pong({}) to keepalive for {}: {}", | |
| seq, | |
| peer_id, | |
| err | |
| ); | |
| } |
| // Register connection and drain any buffered Pongs | ||
| connectivity.set_outgoing_connection(&new_conn); | ||
| while let Ok(seq) = pong_buffer_rx.try_recv() { | ||
| let _ = new_conn.sender.send(Packet::Pong(seq)); | ||
| } |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a race condition in the pong buffer draining logic. After registering the connection on line 440, pongs could arrive and be sent via pong_tx.send() (line 623) while the buffer is being drained on lines 441-443. This could result in pongs being sent in the wrong order: buffered pong N+1 sent after live pong N. Consider draining the buffer before registering the connection, or using a mutex to ensure atomic drain-and-register.
| // Register connection and drain any buffered Pongs | |
| connectivity.set_outgoing_connection(&new_conn); | |
| while let Ok(seq) = pong_buffer_rx.try_recv() { | |
| let _ = new_conn.sender.send(Packet::Pong(seq)); | |
| } | |
| // Drain any buffered Pongs, then register connection | |
| while let Ok(seq) = pong_buffer_rx.try_recv() { | |
| let _ = new_conn.sender.send(Packet::Pong(seq)); | |
| } | |
| connectivity.set_outgoing_connection(&new_conn); |
| tracing::info!( | ||
| "Outgoing connection to {} is dead, closing incoming connection for clean reconnect", | ||
| peer_id | ||
| ); | ||
| break; |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The incoming handler breaks the connection loop on line 598 when the outgoing sender channel is closed, but this creates an asymmetric failure mode. If the outgoing connection fails, it will automatically reconnect via PersistentConnection, but breaking the incoming connection here forces the peer to also reconnect their outgoing connection. Consider logging this as info rather than breaking, or add a comment explaining why both connections must be reset together.
| tracing::info!( | |
| "Outgoing connection to {} is dead, closing incoming connection for clean reconnect", | |
| peer_id | |
| ); | |
| break; | |
| // The outgoing connection (and its sender) may be temporarily down. | |
| // Do not tear down the incoming connection here: the outgoing side | |
| // will be re-established via PersistentConnection, and keeping the | |
| // incoming connection alive avoids forcing the peer to reconnect | |
| // its own outgoing connection. | |
| tracing::info!( | |
| "Outgoing connection to {} is dead; keeping incoming connection open and relying on reconnect", | |
| peer_id | |
| ); |
| ) -> anyhow::Result<PersistentConnection> { | ||
| let connectivity_clone = connectivity.clone(); | ||
| let connectivity_clone = Arc::clone(&connectivity); | ||
| let (pong_buffer_tx, mut pong_buffer_rx) = mpsc::unbounded_channel::<u64>(); |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The pong buffer uses an unbounded channel which could accumulate pongs indefinitely if the outgoing connection never establishes. During extended network partitions, this could lead to unbounded memory growth. Consider using a bounded channel with a small capacity, or adding periodic cleanup of old buffered pongs to prevent memory leaks.
| /// Interval between consecutive pings. A new ping is sent 1 second after the previous | ||
| /// ping was sent, but only after receiving its pong response. |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "A new ping is sent 1 second after the previous ping was sent, but only after receiving its pong response." This is misleading - the code on lines 341-344 shows that the wait is until 1 second has elapsed since the ping was sent, not 1 second after receiving the pong. If the pong arrives after 0.5 seconds, the next ping is sent 0.5 seconds later (total 1 second from previous ping). Consider clarifying the comment to say "Pings are sent at 1-second intervals from when the previous ping was sent, waiting for pong response before sending the next."
| /// Interval between consecutive pings. A new ping is sent 1 second after the previous | |
| /// ping was sent, but only after receiving its pong response. | |
| /// Interval between consecutive pings. Pings are sent at 1-second intervals from when | |
| /// the previous ping was sent, waiting for its pong response before sending the next. |
| let expected_seq = last_received_pong_seq + 1; | ||
| if pong_info.seq != expected_seq { | ||
| tracing::warn!( | ||
| "Received pong {} from {}, expected {}, lost {} pong(s)", |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The warning message on lines 323-326 logs when pongs are received out of sequence, but the message format could be clearer. The phrase "lost N pong(s)" is ambiguous - it's not clear if the pongs were actually lost or just received in the wrong order. Consider rephrasing to "received pong {} from {}, expected {} (gap of {} pong(s))" to make it clear this is a sequence gap detection rather than a confirmed loss.
| "Received pong {} from {}, expected {}, lost {} pong(s)", | |
| "Received pong {} from {}, expected {} (gap of {} pong(s))", |
DSharifi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the work put into this.
Let's discuss the implementation over a call when you are back tomorrow. I think we can simplify things by not buffering pong messages, and only track the latest sequence number that has been sent with a ping.
The (un)structured logging is also a blocker.
| /// Channel for buffering Pongs when outgoing connection is temporarily unavailable. | ||
| pong_buffer: UnboundedSender<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use an UnboundedSender, if we can't consume incoming pong messages fast enough that we start buffering beyond a reasonable limit we should just shed them.
Also, why do we need to buffer multiple pongs to the same peer? Isn't the last sequence number the only one we care about?
| let expected_seq = last_received_pong_seq + 1; | ||
| if pong_info.seq != expected_seq { | ||
| tracing::warn!( | ||
| "Received pong {} from {}, expected {}, lost {} pong(s)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use structured logging for these values
| // Wait for either a pong response or timeout | ||
| tokio::select! { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to use a select! here if you want a timeout.
Checkout tokio::time::timeout instead.
| conn.connectivity.any_outgoing_connection() | ||
| { | ||
| // Send the new pong info via watch channel | ||
| let _ = outgoing_conn.pong_tx.send(PongInfo { seq }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we silently dropping the error?
| ); | ||
| new_conn | ||
|
|
||
| // Register connection and drain any buffered Pongs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is confusing. "drain" implies we'd discard the buffered pongs, but in the code below you are actually consuming the pongs to send them?
| tracing::info!( | ||
| "Could not connect to {}, retrying: {}, me {}", | ||
| tracing::warn!( | ||
| "Could not connect to {}: {:#}, retrying (me: {})", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above regarding structured logging
| { | ||
| if outgoing_conn.sender.send(Packet::Pong(seq)).is_err() { | ||
| tracing::info!( | ||
| "Outgoing connection to {} is dead, closing incoming connection for clean reconnect", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is this clean reconnect triggered?
| /// This struct manages **outgoing connections only** - one persistent TLS connection to each | ||
| /// peer in the network. When the application wants to send a message to a peer, it queues the | ||
| /// message through the corresponding [`PersistentConnection`], which handles automatic | ||
| /// reconnection if the connection drops. Each connection runs two background tasks: one for | ||
| /// sending data, and one for sending 1-second interval ping heartbeats and monitoring pong responses. | ||
| /// | ||
| /// Implements the [`MeshNetworkTransportSender`] trait to provide a high-level API for sending | ||
| /// messages (`.send()`, `.send_indexer_height()`) and checking connectivity status | ||
| /// (`.connectivity()`, `.wait_for_ready()`), while handling low-level connection management. | ||
| pub struct TlsMeshSender { | ||
| /// The participant ID of this node. | ||
| my_id: ParticipantId, | ||
| /// List of all participant IDs in the network (including this node). | ||
| participants: Vec<ParticipantId>, | ||
| /// Outgoing connections to all peers (excludes this node). Each connection automatically | ||
| /// retries on failure. This is where actual message sending happens - when you call | ||
| /// `.send()`, it looks up the connection here and queues the message. | ||
| connections: HashMap<ParticipantId, Arc<PersistentConnection>>, | ||
| /// Tracks connection state (incoming and outgoing) for all peers. This is separate from | ||
| /// `connections` because it monitors *both directions* - while `connections` only manages | ||
| /// our outgoing connections, `connectivities` tracks whether both our outgoing connection | ||
| /// to a peer AND their incoming connection to us are alive. Used by `.wait_for_ready()` | ||
| /// and `.connectivity()` to check bidirectional connectivity status. | ||
| connectivities: Arc<AllNodeConnectivities<TlsConnection, ()>>, | ||
| } | ||
|
|
||
| /// Implements MeshNetworkTransportReceiver. | ||
| /// This struct manages **incoming connections only** - it accepts TLS connections from all | ||
| /// peers and multiplexes their messages into a single channel. The application calls | ||
| /// `.receive()` to get the next message from any peer. Each incoming connection runs its own | ||
| /// background task that reads from the TLS stream, handles Ping/Pong packets, and forwards | ||
| /// MPC/IndexerHeight messages to the unified receiver channel. | ||
| /// | ||
| /// Ping/Pong handling uses cross-stream communication to maintain unidirectional I/O: when | ||
| /// receiving a Ping, this handler sends Pong via the outgoing connection to that peer; when | ||
| /// receiving a Pong, it notifies the outgoing connection's keepalive task via a watch channel. | ||
| /// | ||
| /// Implements [`MeshNetworkTransportReceiver`] to receive messages from all peers in the | ||
| /// mesh network. | ||
| pub struct TlsMeshReceiver { | ||
| /// Unified message queue receiving messages from all peers' incoming connections. | ||
| /// When any peer sends us a message, it gets queued here. The application calls | ||
| /// `.receive()` to dequeue the next message (which includes the sender's ID). | ||
| receiver: UnboundedReceiver<PeerMessage>, | ||
| /// Background task running the TCP acceptor loop on our listening port. It continuously | ||
| /// accepts incoming TCP connections and spawns a new task for each one that: | ||
| /// 1) Performs TLS handshake and authenticates the peer's identity | ||
| /// 2) Registers the connection with `connectivities` for bidirectional tracking | ||
| /// 3) Reads messages from the peer in a loop (read-only stream usage) | ||
| /// 4) On Ping: Sends Pong via our outgoing connection to maintain unidirectional I/O | ||
| /// 5) On Pong: Notifies the outgoing connection's keepalive task via watch channel | ||
| /// 6) Forwards MpcMessage and IndexerHeight to the unified `receiver` channel | ||
| /// | ||
| /// The [`AutoAbortTask`] wrapper ensures automatic cleanup on drop. | ||
| _incoming_connections_task: AutoAbortTask<()>, | ||
| } | ||
|
|
||
| /// Maps public keys to participant IDs. Used to identify incoming connections. | ||
| /// Maps public keys to [`ParticipantId`]s for authenticating incoming connections. | ||
| /// | ||
| /// This struct is populated at startup with the known public keys of all participants in the | ||
| /// network. When a peer establishes an incoming TLS connection, we extract their public key | ||
| /// from their TLS certificate and look it up in this map to determine their [`ParticipantId`]. | ||
| /// This ensures that only known participants can connect, and we can correctly attribute | ||
| /// incoming messages to the right peer. If a connection presents an unknown public key, it is | ||
| /// rejected during the authentication phase. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Appreciate you taking time to document all this :)
Fixes #1647