WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content

Commit 8ebaedb

Browse files
jsudano-ciscojsudano
authored andcommitted
Adding some client tests, updating doc-comments, fixing bug where I forgot to set handler.is_draining
1 parent 6a65f00 commit 8ebaedb

File tree

3 files changed

+168
-11
lines changed

3 files changed

+168
-11
lines changed

async-nats/src/client.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -617,10 +617,26 @@ impl Client {
617617
}
618618

619619
/// Drains all subscriptions, stops any new messages from being published, and flushes any remaining
620-
/// messages, then closes the connection
620+
/// messages, then closes the connection. Once completed, any associated streams associated with the
621+
/// client will be closed, and further client commands will fail
621622
///
622623
/// # Examples
623-
/// TODO
624+
///
625+
/// ```no_run
626+
/// # #[tokio::main]
627+
/// # async fn main() -> Result<(), async_nats::Error> {
628+
/// let client = async_nats::connect("demo.nats.io").await?;
629+
/// let mut subscription = client.subscribe("events.>").await?;
630+
///
631+
/// client.drain().await?;
632+
///
633+
/// # // existing subscriptions are closed and further commands will fail
634+
/// assert!(subscription.next().await.is_none());
635+
/// client.subscribe().await.expect_err("Expected further commands to fail");
636+
///
637+
/// # Ok(())
638+
/// # }
639+
/// ```
624640
pub async fn drain(&self) -> Result<(), DrainError> {
625641
// Drain all subscriptions
626642
self.sender.send(Command::Drain { sid: None }).await?;

async-nats/src/lib.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -539,15 +539,16 @@ impl ConnectionHandler {
539539
}
540540

541541
// Before handling any commands, drop any subscriptions which are draining
542-
// Note: safe to assume drain has completed, as we would have flushed all outgoing
543-
// UNSUB messages in the previous call to this fn, and we would have processed and delivered
544-
// any remaining messages to the subscription in the loop above.
542+
// Note: safe to assume subscription drain has completed at this point, as we would have flushed
543+
// all outgoing UNSUB messages in the previous call to this fn, and we would have processed and
544+
// delivered any remaining messages to the subscription in the loop above.
545545
self.handler.subscriptions.retain(|_, s| !s.is_draining);
546546

547547
if self.handler.is_draining {
548-
// The entire connection is draining. This means we flushed outgoing messages and all subs
549-
// were drained by the above retain and we should exit instead of processing any further
550-
// messages
548+
// The entire connection is draining. This means we flushed outgoing messages in the previous
549+
// call to this fn, we handled any remaining messages from the server in the loop above, and
550+
// all subs were drained, so drain is complete and we should exit instead of processing any
551+
// further messages
551552
return Poll::Ready(ExitReason::Closed);
552553
}
553554

@@ -806,6 +807,8 @@ impl ConnectionHandler {
806807
drain_sub(&sid, sub);
807808
}
808809
} else {
810+
// sid isn't set, so drain the whole client
811+
self.is_draining = true;
809812
for (sid, sub) in self.subscriptions.iter_mut() {
810813
drain_sub(sid, sub);
811814
}
@@ -1291,9 +1294,9 @@ impl Subscriber {
12911294
Ok(())
12921295
}
12931296

1294-
/// Unsubscribes from subscription immediately leaves the stream open for the configured drain period
1295-
/// to allow any in-flight messages on the subscription to be delivered. The stream will be closed
1296-
/// at the end of the drain period
1297+
/// Unsubscribes immediately but leaves the stream open to allow any in-flight messages on the
1298+
/// subscription to be delivered. The stream will be closed after any remaining messages are
1299+
/// delivered
12971300
///
12981301
/// # Examples
12991302
/// ```

async-nats/tests/client_tests.rs

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,4 +996,142 @@ mod client {
996996
assert!(stats.out_bytes.load(Ordering::Relaxed) != 0);
997997
assert_eq!(stats.connects.load(Ordering::Relaxed), 2);
998998
}
999+
1000+
#[tokio::test]
1001+
async fn drain_subscription_basic() {
1002+
use std::error::Error;
1003+
let server = nats_server::run_basic_server();
1004+
let client = async_nats::connect(server.client_url()).await.unwrap();
1005+
1006+
let mut sub = client.subscribe("test").await.unwrap();
1007+
1008+
// publish some data
1009+
client.publish("test", "data".into()).await.unwrap();
1010+
client.flush().await.unwrap();
1011+
1012+
// confirm we receive that data
1013+
assert!(sub.next().await.is_some());
1014+
1015+
// now drain the subscription
1016+
let result = sub.drain().await;
1017+
match result {
1018+
Ok(()) => println!("ok"),
1019+
Err(err) => {
1020+
println!("error: {}", err);
1021+
println!("source: {:?}", err.source())
1022+
}
1023+
}
1024+
1025+
// assert the stream is closed after draining
1026+
assert!(sub.next().await.is_none());
1027+
1028+
// confirm we can still reconnect and send messages on a new subscription
1029+
let mut sub2 = client.subscribe("test2").await.unwrap();
1030+
client.publish("test2", "data".into()).await.unwrap();
1031+
client.flush().await.unwrap();
1032+
assert!(sub2.next().await.is_some());
1033+
}
1034+
1035+
#[tokio::test]
1036+
async fn drain_subscription_unsub_after() {
1037+
let server = nats_server::run_basic_server();
1038+
let client = async_nats::connect(server.client_url()).await.unwrap();
1039+
1040+
let mut sub = client.subscribe("test").await.unwrap();
1041+
1042+
sub.unsubscribe_after(120)
1043+
.await
1044+
.expect("Expected to send unsub_after");
1045+
1046+
// publish some data
1047+
client.publish("test", "data".into()).await.unwrap();
1048+
client.publish("test", "data".into()).await.unwrap();
1049+
client.flush().await.unwrap();
1050+
1051+
// Send the drain command
1052+
sub.drain().await.expect("Expected to drain the sub");
1053+
1054+
// we should receive all published data then close immediately
1055+
assert!(sub.next().await.is_some());
1056+
assert!(sub.next().await.is_some());
1057+
assert!(sub.next().await.is_none());
1058+
}
1059+
1060+
#[tokio::test]
1061+
async fn drain_subscription_active() {
1062+
let server = nats_server::run_basic_server();
1063+
let client = async_nats::connect(server.client_url()).await.unwrap();
1064+
1065+
// spawn a task to constantly write to the subscription
1066+
let constant_writer = tokio::spawn({
1067+
let client = client.clone();
1068+
async move {
1069+
loop {
1070+
client.publish("test", "data".into()).await.unwrap();
1071+
client.flush().await.unwrap();
1072+
}
1073+
}
1074+
});
1075+
1076+
let mut sub = client.subscribe("test").await.unwrap();
1077+
1078+
// confirm we receive some data
1079+
assert!(sub.next().await.is_some());
1080+
1081+
// now drain the subscription
1082+
sub.drain().await.unwrap();
1083+
1084+
// yield to the runtime to ensure constant_writer gets a chance to publish a message or two to the subject
1085+
tokio::time::sleep(Duration::from_millis(1)).await;
1086+
1087+
// assert the subscription stream is closed after draining
1088+
let sleep_fut = async move { while let Some(_) = sub.next().await {} };
1089+
tokio::time::timeout(Duration::from_secs(10), sleep_fut)
1090+
.await
1091+
.expect("Expected stream to drain within 10s");
1092+
1093+
// assert constant_writer doesn't fail to write after the only sub is drained (i.e. client operations still work fine)
1094+
assert!(!constant_writer.is_finished());
1095+
1096+
// confirm we can still reconnect and receive messages on the same subject on a new subscription
1097+
let mut sub2 = client.subscribe("test").await.unwrap();
1098+
assert!(sub2.next().await.is_some());
1099+
}
1100+
1101+
#[tokio::test]
1102+
async fn drain_client_basic() {
1103+
let server = nats_server::run_basic_server();
1104+
let client = async_nats::connect(server.client_url()).await.unwrap();
1105+
1106+
let mut sub = client.subscribe("test").await.unwrap();
1107+
1108+
// publish some data
1109+
client.publish("test", "data".into()).await.unwrap();
1110+
client.flush().await.unwrap();
1111+
1112+
// confirm we receive that data
1113+
assert!(sub.next().await.is_some());
1114+
1115+
// now drain the client
1116+
client.drain().await.unwrap();
1117+
1118+
// assert the sub's stream is closed after draining
1119+
assert!(sub.next().await.is_none());
1120+
1121+
// we should not be able to perform any more operations on a drained client
1122+
client
1123+
.subscribe("test2")
1124+
.await
1125+
.expect_err("Expected client to be drained");
1126+
1127+
client
1128+
.publish("test", "data".into())
1129+
.await
1130+
.expect_err("Expected client to be drained");
1131+
1132+
// we should be able to connect with a new client
1133+
let _client2 = async_nats::connect(server.client_url())
1134+
.await
1135+
.expect("Expected to be able to create a new client");
1136+
}
9991137
}

0 commit comments

Comments
 (0)