diff --git a/Cargo.lock b/Cargo.lock index cc591f0b9a5..0337c777231 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -316,6 +316,12 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "async-rx" version = "0.1.3" @@ -3150,6 +3156,7 @@ dependencies = [ "assert_matches", "assert_matches2", "async-channel", + "async-once-cell", "async-stream", "async-trait", "axum", diff --git a/Cargo.toml b/Cargo.toml index 936e88b41d5..cf4b8b3e8b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,9 @@ as_variant = "1.3.0" assert-json-diff = "2.0.2" assert_matches = "1.5.0" assert_matches2 = "0.1.2" +async_cell = "0.2.3" async-compat = "0.2.5" +async-once-cell = "0.5.4" async-rx = "0.1.3" # Bumping this to 0.3.6 produces a test failure because the semantic between the # versions changed subtly: https://github.com/matrix-org/matrix-rust-sdk/issues/4599 diff --git a/crates/matrix-sdk-base/src/latest_event.rs b/crates/matrix-sdk-base/src/latest_event.rs index 16abda68857..1ecf18375a8 100644 --- a/crates/matrix-sdk-base/src/latest_event.rs +++ b/crates/matrix-sdk-base/src/latest_event.rs @@ -56,6 +56,13 @@ impl LatestEventValue { Self::None | Self::Remote(_) => false, } } + + /// Check whether the [`LatestEventValue`] is not set, i.e. [`None`]. + /// + /// [`None`]: LatestEventValue::None + pub fn is_none(&self) -> bool { + matches!(self, Self::None) + } } /// Represents the value for [`LatestEventValue::Remote`]. diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index a0fd460cfb0..8f2d7495dd5 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -36,7 +36,7 @@ experimental-encrypted-state-events = [ as_variant.workspace = true async-rx.workspace = true async-stream.workspace = true -async_cell = "0.2.3" +async_cell.workspace = true bitflags.workspace = true chrono.workspace = true eyeball.workspace = true diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index ba50b8ecfed..82d0eeb95f1 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -1876,7 +1876,7 @@ async fn test_room_sorting() -> Result<(), Error> { // Now, let's define a filter. dynamic_entries.set_filter(Box::new(new_filter_non_left())); - // Assert rooms are sorted by recency and by name!. + // Assert rooms are sorted by recency and by name! assert_entries_batch! { [stream] reset [ @@ -1925,7 +1925,7 @@ async fn test_room_sorting() -> Result<(), Error> { "timeline": [{ "content": { "body": "foo", - "msgtype": "m.text" + "msgtype": "m.text", }, "event_id": "$ev7", "origin_server_ts": 7, @@ -2015,7 +2015,7 @@ async fn test_room_sorting() -> Result<(), Error> { // | 3 | !r4 | 5 | | // | 4 | !r3 | 4 | | - // The Latest Events are updated. + // Rooms are individually updated. assert_entries_batch! { [stream] set [ 1 ] [ "!r0:bar.org" ]; @@ -2086,19 +2086,7 @@ async fn test_room_sorting() -> Result<(), Error> { assert_entries_batch! { [stream] - insert [ 3 ] [ "!r6:bar.org" ]; - end; - }; - - // The Latest Event is updated. - assert_entries_batch! { - [stream] - set [ 3 ] [ "!r6:bar.org" ]; - end; - }; - assert_entries_batch! { - [stream] - set [ 3 ] [ "!r6:bar.org" ]; + insert [ 1 ] [ "!r6:bar.org" ]; end; }; @@ -2113,6 +2101,18 @@ async fn test_room_sorting() -> Result<(), Error> { // | 4 | !r4 | 5 | | // | 5 | !r3 | 4 | | + // Rooms are individually updated. + assert_entries_batch! { + [stream] + set [ 1 ] [ "!r6:bar.org" ]; + end; + }; + assert_entries_batch! { + [stream] + set [ 1 ] [ "!r6:bar.org" ]; + end; + }; + assert_entries_batch! { [stream] remove [ 5 ]; @@ -2131,10 +2131,10 @@ async fn test_room_sorting() -> Result<(), Error> { // | 4 | !r1 | 6 | Aaa | // | 5 | !r4 | 5 | | - // The Latest Events are updated. + // Rooms are individually updated. assert_entries_batch! { [stream] - set [ 4 ] [ "!r6:bar.org" ]; + set [ 2 ] [ "!r6:bar.org" ]; end; }; assert_entries_batch! { @@ -2142,6 +2142,11 @@ async fn test_room_sorting() -> Result<(), Error> { set [ 0 ] [ "!r3:bar.org" ]; end; }; + assert_entries_batch! { + [stream] + set [ 2 ] [ "!r6:bar.org" ]; + end; + }; assert_pending!(stream); diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index f9f9c17ef4e..25aae4132b1 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -26,6 +26,9 @@ All notable changes to this project will be documented in this file. ### Bugfix +- Latest Event is lazier: a `RoomLatestEvents` can be registered even if its + associated `RoomEventCache` isn't created yet. + ([#5947](https://github.com/matrix-org/matrix-rust-sdk/pull/5947)) - Allow granting of QR login to a new client whose device ID is not a base64 encoded Curve25519 public key. ([#5940](https://github.com/matrix-org/matrix-rust-sdk/pull/5940)) diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 6dd6e0ad9b5..1feec37149e 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -86,6 +86,7 @@ assert_matches2 = { workspace = true, optional = true } async-channel = "2.5.0" async-stream.workspace = true async-trait.workspace = true +async-once-cell.workspace = true axum = { version = "0.8.4", optional = true } bytes = "1.11.0" bytesize = "2.3.0" diff --git a/crates/matrix-sdk/src/latest_events/latest_event.rs b/crates/matrix-sdk/src/latest_events/latest_event.rs index dabb5150013..6cd3156f44e 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event.rs @@ -14,10 +14,10 @@ use std::{ iter::once, - ops::{Deref, Not}, + ops::{Deref, DerefMut, Not}, }; -use eyeball::{AsyncLock, SharedObservable, Subscriber}; +use eyeball::{AsyncLock, ObservableWriteGuard, SharedObservable, Subscriber}; pub use matrix_sdk_base::latest_event::{ LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue, }; @@ -62,18 +62,24 @@ pub(super) struct LatestEvent { } impl LatestEvent { - pub(super) async fn new( + pub fn new( weak_room: &WeakRoom, thread_id: Option<&EventId>, - room_event_cache: &RoomEventCache, - ) -> Self { - Self { - weak_room: weak_room.clone(), - _thread_id: thread_id.map(ToOwned::to_owned), - buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(), - current_value: SharedObservable::new_async( - LatestEventValueBuilder::new_remote(room_event_cache, weak_room).await, - ), + ) -> With { + let latest_event_value = match thread_id { + Some(_thread_id) => LatestEventValue::default(), + None => weak_room.get().map(|room| room.latest_event()).unwrap_or_default(), + }; + let is_none = latest_event_value.is_none(); + + With { + result: Self { + weak_room: weak_room.clone(), + _thread_id: thread_id.map(ToOwned::to_owned), + buffer_of_values_for_local_events: LatestEventValuesForLocalEvents::new(), + current_value: SharedObservable::new_async(latest_event_value), + }, + with: is_none, } } @@ -82,6 +88,11 @@ impl LatestEvent { self.current_value.subscribe().await } + #[cfg(test)] + pub async fn get(&self) -> LatestEventValue { + self.current_value.get().await + } + /// Update the inner latest event value, based on the event cache /// (specifically with the [`RoomEventCache`]), if and only if there is no /// local latest event value waiting. @@ -94,7 +105,7 @@ impl LatestEvent { pub async fn update_with_event_cache( &mut self, room_event_cache: &RoomEventCache, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) { if self.buffer_of_values_for_local_events.is_empty().not() { @@ -104,12 +115,8 @@ impl LatestEvent { return; } - let new_value = LatestEventValueBuilder::new_remote_with_power_levels( - room_event_cache, - own_user_id, - power_levels, - ) - .await; + let new_value = + LatestEventValueBuilder::new_remote(room_event_cache, own_user_id, power_levels).await; self.update(new_value).await; } @@ -120,7 +127,7 @@ impl LatestEvent { &mut self, send_queue_update: &RoomSendQueueUpdate, room_event_cache: &RoomEventCache, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) { let new_value = LatestEventValueBuilder::new_local( @@ -143,8 +150,24 @@ impl LatestEvent { /// been found, we want to latest event value to be `None`, so that it is /// erased correctly. async fn update(&mut self, new_value: LatestEventValue) { - self.current_value.set(new_value.clone()).await; - self.store(new_value).await; + // Ideally, we would set `new_value` if and only if it is different from the + // previous value. However, `LatestEventValue` cannot implement `PartialEq` at + // the time of writing (2025-12-12). So we are only updating if + // `LatestEventValue` is not `None` and if the previous value isn't `None`; + // basically, replacing `None` with `None` will not update the value. + { + let mut guard = self.current_value.write().await; + let previous_value = guard.deref(); + + if (previous_value.is_none() && new_value.is_none()).not() { + ObservableWriteGuard::set(&mut guard, new_value.clone()); + + // Release the write guard over the current value before hitting the store. + drop(guard); + + self.store(new_value).await; + } + } } /// Update the `RoomInfo` associated to this room to set the new @@ -180,8 +203,56 @@ impl LatestEvent { } } +/// Semantic type similar to a tuple where the left part is the main result and +/// the right part is an “attached” value. +pub(super) struct With { + /// The main value. + result: T, + + /// The “attached” value. + with: W, +} + +impl With { + /// Map the main result without changing the “attached” value. + pub fn map(this: With, f: F) -> With + where + F: FnOnce(T) -> O, + { + With { result: f(this.result), with: this.with } + } + + /// Get the main result. + pub fn inner(this: With) -> T { + this.result + } + + /// Get a tuple. + pub fn unzip(this: With) -> (T, W) { + (this.result, this.with) + } +} + +impl Deref for With { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.result + } +} + +impl DerefMut for With { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.result + } +} + +pub(super) type IsLatestEventValueNone = bool; + #[cfg(all(not(target_family = "wasm"), test))] mod tests_latest_event { + use std::ops::Not; + use assert_matches::assert_matches; use matrix_sdk_base::{ RoomInfoNotableUpdateReasons, RoomState, @@ -194,8 +265,11 @@ mod tests_latest_event { events::{AnyMessageLikeEventContent, room::message::RoomMessageEventContent}, room_id, user_id, }; + use stream_assert::{assert_next_matches, assert_pending}; - use super::{LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent}; + use super::{ + LatestEvent, LatestEventValue, LocalLatestEventValue, SerializableEventContent, With, + }; use crate::{ client::WeakClient, room::WeakRoom, @@ -232,6 +306,47 @@ mod tests_latest_event { } } + #[async_test] + async fn test_new_loads_from_room_info() { + let room_id = room_id!("!r0"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let weak_client = WeakClient::from_client(&client); + + // Create the room. + let room = client.base_client().get_or_create_room(room_id, RoomState::Joined); + let weak_room = WeakRoom::new(weak_client, room_id.to_owned()); + + // First time `LatestEvent` is created: we get `None`. + { + let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None)); + + // By default, it's `None`. + assert_matches!(latest_event.current_value.get().await, LatestEventValue::None); + assert!(is_none); + } + + // Set the `RoomInfo`. + { + let mut room_info = room.clone_info(); + room_info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo"))); + room.set_room_info(room_info, Default::default()); + } + + // Second time. We get `LocalIsSending` from `RoomInfo`. + { + let (latest_event, is_none) = With::unzip(LatestEvent::new(&weak_room, None)); + + // By default, it's `None`. + assert_matches!( + latest_event.current_value.get().await, + LatestEventValue::LocalIsSending(_) + ); + assert!(is_none.not()); + } + } + #[async_test] async fn test_update_do_not_ignore_none_value() { let room_id = room_id!("!r0"); @@ -248,9 +363,7 @@ mod tests_latest_event { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); - - let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await; + let mut latest_event = LatestEvent::new(&weak_room, None); // First off, check the default value is `None`! assert_matches!(latest_event.current_value.get().await, LatestEventValue::None); @@ -269,6 +382,52 @@ mod tests_latest_event { assert_matches!(latest_event.current_value.get().await, LatestEventValue::None); } + #[async_test] + async fn test_update_ignore_none_if_previous_value_is_none() { + let room_id = room_id!("!r0"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let weak_client = WeakClient::from_client(&client); + + // Create the room. + client.base_client().get_or_create_room(room_id, RoomState::Joined); + let weak_room = WeakRoom::new(weak_client, room_id.to_owned()); + + let mut latest_event = LatestEvent::new(&weak_room, None); + + let mut stream = latest_event.subscribe().await; + + assert_pending!(stream); + + // Set a non-`None` value. + latest_event.update(LatestEventValue::LocalIsSending(local_room_message("foo"))).await; + // We get it. + assert_next_matches!(stream, LatestEventValue::LocalIsSending(_)); + + // Set a `None` value. + latest_event.update(LatestEventValue::None).await; + // We get it. + assert_next_matches!(stream, LatestEventValue::None); + + // Set a `None` value, again! + latest_event.update(LatestEventValue::None).await; + // We get it? No! + assert_pending!(stream); + + // Set a `None` value, again, and again! + latest_event.update(LatestEventValue::None).await; + // No means No! + assert_pending!(stream); + + // Set a non-`None` value. + latest_event.update(LatestEventValue::LocalIsSending(local_room_message("bar"))).await; + // We get it. Oof. + assert_next_matches!(stream, LatestEventValue::LocalIsSending(_)); + + assert_pending!(stream); + } + #[async_test] async fn test_local_has_priority_over_remote() { let room_id = room_id!("!r0").to_owned(); @@ -314,11 +473,11 @@ mod tests_latest_event { let send_queue = client.send_queue(); let room_send_queue = send_queue.for_room(room); - let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await; + let mut latest_event = LatestEvent::new(&weak_room, None); // First, let's create a `LatestEventValue` from the event cache. It must work. { - latest_event.update_with_event_cache(&room_event_cache, None, None).await; + latest_event.update_with_event_cache(&room_event_cache, user_id, None).await; assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_)); } @@ -335,7 +494,7 @@ mod tests_latest_event { content, }); - latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await; + latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await; assert_matches!( latest_event.current_value.get().await, @@ -347,7 +506,7 @@ mod tests_latest_event { // Nothing must happen, it cannot overwrite the current // `LatestEventValue` because the local event isn't sent yet. { - latest_event.update_with_event_cache(&room_event_cache, None, None).await; + latest_event.update_with_event_cache(&room_event_cache, user_id, None).await; assert_matches!( latest_event.current_value.get().await, @@ -363,7 +522,7 @@ mod tests_latest_event { event_id: event_id!("$ev1").to_owned(), }; - latest_event.update_with_send_queue(&update, &room_event_cache, None, None).await; + latest_event.update_with_send_queue(&update, &room_event_cache, user_id, None).await; assert_matches!( latest_event.current_value.get().await, @@ -374,7 +533,7 @@ mod tests_latest_event { // Finally, let's create a `LatestEventValue` from the event cache. _Now_ it's // possible, because there is no more local events. { - latest_event.update_with_event_cache(&room_event_cache, None, None).await; + latest_event.update_with_event_cache(&room_event_cache, user_id, None).await; assert_matches!(latest_event.current_value.get().await, LatestEventValue::Remote(_)); } @@ -442,8 +601,8 @@ mod tests_latest_event { // Generate a new `LatestEventValue`. { - let mut latest_event = LatestEvent::new(&weak_room, None, &room_event_cache).await; - latest_event.update_with_event_cache(&room_event_cache, None, None).await; + let mut latest_event = LatestEvent::new(&weak_room, None); + latest_event.update_with_event_cache(&room_event_cache, user_id, None).await; assert_matches!( latest_event.current_value.get().await, @@ -490,38 +649,12 @@ impl LatestEventValueBuilder { /// Create a new [`LatestEventValue::Remote`]. async fn new_remote( room_event_cache: &RoomEventCache, - weak_room: &WeakRoom, - ) -> LatestEventValue { - // Get the power levels of the user for the current room if the `WeakRoom` is - // still valid. - let room = weak_room.get(); - let (own_user_id, power_levels) = match &room { - Some(room) => { - let power_levels = room.power_levels().await.ok(); - - (Some(room.own_user_id()), power_levels) - } - - None => (None, None), - }; - - Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels.as_ref()) - .await - } - - /// Create a new [`LatestEventValue::Remote`] based on existing power - /// levels. - async fn new_remote_with_power_levels( - room_event_cache: &RoomEventCache, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) -> LatestEventValue { let _timer = timer!( tracing::Level::INFO, - format!( - "`LatestEventValueBuilder::new_remote_with_power_levels` for {:?}", - room_event_cache.room_id() - ) + format!("`LatestEventValueBuilder::new_remote` for {:?}", room_event_cache.room_id()) ); let value = if let Ok(Some(event)) = room_event_cache @@ -547,7 +680,7 @@ impl LatestEventValueBuilder { send_queue_update: &RoomSendQueueUpdate, buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents, room_event_cache: &RoomEventCache, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) -> LatestEventValue { use crate::send_queue::{LocalEcho, LocalEchoContent}; @@ -755,13 +888,13 @@ impl LatestEventValueBuilder { async fn new_local_or_remote( buffer_of_values_for_local_events: &mut LatestEventValuesForLocalEvents, room_event_cache: &RoomEventCache, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) -> LatestEventValue { if let Some(value) = buffer_of_values_for_local_events.last() { value.clone() } else { - Self::new_remote_with_power_levels(room_event_cache, own_user_id, power_levels).await + Self::new_remote(room_event_cache, own_user_id, power_levels).await } } } @@ -951,7 +1084,7 @@ impl LatestEventValuesForLocalEvents { fn filter_timeline_event( event: &TimelineEvent, previous_event: Option<&TimelineEvent>, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) -> bool { // Cast the event into an `AnySyncTimelineEvent`. If deserializing fails, we @@ -1039,20 +1172,20 @@ fn filter_any_message_like_event_content( fn filter_any_sync_state_event( event: AnySyncStateEvent, - own_user_id: Option<&UserId>, + own_user_id: &UserId, power_levels: Option<&RoomPowerLevels>, ) -> bool { match event { AnySyncStateEvent::RoomMember(member) => { match member.membership() { MembershipState::Knock => { - let can_accept_or_decline_knocks = match (own_user_id, power_levels) { - (Some(own_user_id), Some(room_power_levels)) => { + let can_accept_or_decline_knocks = match power_levels { + Some(room_power_levels) => { room_power_levels.user_can_invite(own_user_id) || room_power_levels .user_can_kick_user(own_user_id, member.state_key()) } - _ => false, + None => false, }; // The current user can act on the knock changes, so they should be @@ -1071,9 +1204,7 @@ fn filter_any_sync_state_event( match member { // We can only decide whether the user is invited if the event isn't // redacted. - SyncStateEvent::Original(state) => { - Some(state.state_key.deref()) == own_user_id - } + SyncStateEvent::Original(state) => state.state_key.deref() == own_user_id, _ => false, } @@ -1119,7 +1250,7 @@ mod tests_latest_event_content { $event_builder }; - assert_eq!(filter_timeline_event(&event, None, Some(user_id!("@mnt_io:matrix.org")), None), $expect ); + assert_eq!(filter_timeline_event(&event, None, user_id!("@mnt_io:matrix.org"), None), $expect ); }; } @@ -1149,7 +1280,7 @@ mod tests_latest_event_content { { let previous_event_id = None; - assert!(filter_timeline_event(&event, previous_event_id, Some(user_id), None).not()); + assert!(filter_timeline_event(&event, previous_event_id, user_id, None).not()); } // With a previous event, but not the one being replaced. @@ -1157,9 +1288,7 @@ mod tests_latest_event_content { let previous_event = Some(event_factory.text_msg("no!").event_id(event_id!("$ev1")).into_event()); - assert!( - filter_timeline_event(&event, previous_event.as_ref(), Some(user_id), None).not() - ); + assert!(filter_timeline_event(&event, previous_event.as_ref(), user_id, None).not()); } // With a previous event, and that's the one being replaced! @@ -1167,7 +1296,7 @@ mod tests_latest_event_content { let previous_event = Some(event_factory.text_msg("hello").event_id(event_id!("$ev0")).into_event()); - assert!(filter_timeline_event(&event, previous_event.as_ref(), Some(user_id), None)); + assert!(filter_timeline_event(&event, previous_event.as_ref(), user_id, None)); } } @@ -1339,7 +1468,7 @@ mod tests_latest_event_content { room_power_levels.invite = 10.into(); room_power_levels.kick = 10.into(); assert!( - filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(), + filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).not(), "cannot accept, cannot decline", ); } @@ -1349,7 +1478,7 @@ mod tests_latest_event_content { room_power_levels.invite = 0.into(); room_power_levels.kick = 10.into(); assert!( - filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)), + filter_timeline_event(&event, None, user_id, Some(&room_power_levels)), "can accept, cannot decline", ); } @@ -1359,7 +1488,7 @@ mod tests_latest_event_content { room_power_levels.invite = 10.into(); room_power_levels.kick = 0.into(); assert!( - filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)), + filter_timeline_event(&event, None, user_id, Some(&room_power_levels)), "cannot accept, can decline", ); } @@ -1369,7 +1498,7 @@ mod tests_latest_event_content { room_power_levels.invite = 0.into(); room_power_levels.kick = 0.into(); assert!( - filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)), + filter_timeline_event(&event, None, user_id, Some(&room_power_levels)), "can accept, can decline", ); } @@ -1385,7 +1514,7 @@ mod tests_latest_event_content { room_power_levels.kick = 0.into(); assert!( - filter_timeline_event(&event, None, Some(user_id), Some(&room_power_levels)).not(), + filter_timeline_event(&event, None, user_id, Some(&room_power_levels)).not(), "cannot accept, can decline, at least same user levels", ); } @@ -1709,8 +1838,6 @@ mod tests_latest_event_value_builder { }; use crate::{ Client, Error, - client::WeakClient, - room::WeakRoom, send_queue::{AbstractProgress, LocalEcho, LocalEchoContent, RoomSendQueue, SendHandle}, test_utils::mocks::MatrixMockServer, }; @@ -1808,13 +1935,12 @@ mod tests_latest_event_value_builder { event_cache.subscribe().unwrap(); let (room_event_cache, _) = event_cache.for_room(room_id).await.unwrap(); - let weak_room = WeakRoom::new(WeakClient::from_client(&client), room_id.to_owned()); assert_remote_value_matches_room_message_with_body!( // We get `event_id_1` because `event_id_2` isn't a candidate, // and `event_id_0` hasn't been read yet (because events are read // backwards). - LatestEventValueBuilder::new_remote(&room_event_cache, &weak_room).await => with body = "world" + LatestEventValueBuilder::new_remote(&room_event_cache, user_id, None).await => with body = "world" ); } @@ -1858,7 +1984,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_new_local_event() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); @@ -1871,7 +1998,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "A" ); } @@ -1885,7 +2012,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B" ); } @@ -1895,7 +2022,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_new_local_event_when_previous_local_event_cannot_be_sent() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); @@ -1911,7 +2039,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "A" ); @@ -1930,7 +2058,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it still matches the latest local // event but it's marked as “cannot be sent”. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalCannotBeSent => with body = "A" ); @@ -1949,7 +2077,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalCannotBeSent => with body = "B" ); } @@ -1961,7 +2089,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_cancelled_local_event() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id_0 = OwnedTransactionId::from("txnid0"); @@ -1982,7 +2111,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = body ); } @@ -2000,7 +2129,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` hasn't changed, it still matches the latest local // event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "C" ); @@ -2017,7 +2146,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it matches the previous (so the first) // local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "A" ); @@ -2038,7 +2167,7 @@ mod tests_latest_event_value_builder { &update, &mut buffer, &room_event_cache, - None, + user_id, None ) .await, @@ -2051,7 +2180,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_sent_event() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id_0 = OwnedTransactionId::from("txnid0"); @@ -2069,7 +2199,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = body ); } @@ -2088,7 +2218,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` hasn't changed, it still matches the latest local // event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B" ); @@ -2105,7 +2235,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` hasn't changed. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B" ); @@ -2115,7 +2245,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_replaced_local_event() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id_0 = OwnedTransactionId::from("txnid0"); @@ -2133,7 +2264,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = body ); } @@ -2159,7 +2290,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` hasn't changed, it still matches the latest local // event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B" ); @@ -2184,7 +2315,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it still matches the latest local // event but with its new content. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B." ); @@ -2194,7 +2325,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_replaced_local_event_by_a_non_suitable_event() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id = OwnedTransactionId::from("txnid0"); @@ -2210,7 +2342,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "A" ); @@ -2241,7 +2373,7 @@ mod tests_latest_event_value_builder { &update, &mut buffer, &room_event_cache, - None, + user_id, None ) .await, @@ -2254,7 +2386,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_send_error() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id_0 = OwnedTransactionId::from("txnid0"); @@ -2272,7 +2405,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = body ); } @@ -2292,7 +2425,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it still matches the latest local // event but it's marked as “cannot be sent”. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalCannotBeSent => with body = "B" ); @@ -2313,7 +2446,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it still matches the latest local // event but it's “is sending”. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B" ); @@ -2324,7 +2457,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_retry_event() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id_0 = OwnedTransactionId::from("txnid0"); @@ -2342,7 +2476,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = body ); } @@ -2362,7 +2496,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it still matches the latest local // event but it's marked as “cannot be sent”. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalCannotBeSent => with body = "B" ); @@ -2380,7 +2514,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` has changed, it still matches the latest local // event but it's “is sending”. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "B" ); @@ -2392,7 +2526,8 @@ mod tests_latest_event_value_builder { #[async_test] async fn test_local_media_upload() { - let (_client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let (client, _room_id, room_send_queue, room_event_cache) = local_prelude().await; + let user_id = client.user_id().unwrap(); let mut buffer = LatestEventValuesForLocalEvents::new(); let transaction_id = OwnedTransactionId::from("txnid"); @@ -2408,7 +2543,7 @@ mod tests_latest_event_value_builder { // The `LatestEventValue` matches the new local event. assert_local_value_matches_room_message_with_body!( - LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, None, None).await, + LatestEventValueBuilder::new_local(&update, &mut buffer, &room_event_cache, user_id, None).await, LatestEventValue::LocalIsSending => with body = "A" ); @@ -2432,7 +2567,7 @@ mod tests_latest_event_value_builder { &update, &mut buffer, &room_event_cache, - None, + user_id, None ) .await, @@ -2504,7 +2639,7 @@ mod tests_latest_event_value_builder { }, &mut buffer, &room_event_cache, - None, + user_id, None, ) .await diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index e828f86068b..8195e5bc75b 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -51,14 +51,14 @@ mod latest_event; mod room_latest_events; use std::{ - collections::{HashMap, HashSet}, - ops::{ControlFlow, Not}, + collections::HashMap, + ops::{ControlFlow, DerefMut, Not}, sync::Arc, }; pub use error::LatestEventsError; use eyeball::{AsyncLock, Subscriber}; -use latest_event::LatestEvent; +use latest_event::{LatestEvent, With}; pub use latest_event::{LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue}; use matrix_sdk_base::timer; use matrix_sdk_common::executor::{AbortOnDrop, JoinHandleExt as _, spawn}; @@ -104,16 +104,14 @@ impl LatestEvents { event_cache: EventCache, send_queue: SendQueue, ) -> Self { - let (room_registration_sender, room_registration_receiver) = mpsc::unbounded_channel(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); let registered_rooms = - Arc::new(RegisteredRooms::new(room_registration_sender, weak_client, &event_cache)); + Arc::new(RegisteredRooms::new(weak_client, &event_cache, &latest_event_queue_sender)); // The task listening to the event cache and the send queue updates. let listen_task_handle = spawn(listen_to_event_cache_and_send_queue_updates_task( registered_rooms.clone(), - room_registration_receiver, event_cache, send_queue, latest_event_queue_sender, @@ -233,35 +231,30 @@ struct RegisteredRooms { /// All the registered [`RoomLatestEvents`]. rooms: RwLock>, - /// The sender part of the channel about room registration. - /// - /// When a room is registered (with [`LatestEvents::listen_to_room`] or - /// [`LatestEvents::listen_to_thread`]) or unregistered (with - /// [`LatestEvents::forget_room`] or [`LatestEvents::forget_thread`]), a - /// room registration message is passed on this channel. - /// - /// The receiver part of the channel is in the - /// [`listen_to_event_cache_and_send_queue_updates_task`]. - room_registration_sender: mpsc::UnboundedSender, - /// The (weak) client. weak_client: WeakClient, /// The event cache. event_cache: EventCache, + + /// The sender part of the channel used by [`compute_latest_events_task`]. + /// + /// This is used to _trigger_ a computation of a `LatestEventValue` if the + /// restored value is `None`. + latest_event_queue_sender: mpsc::UnboundedSender, } impl RegisteredRooms { fn new( - room_registration_sender: mpsc::UnboundedSender, weak_client: WeakClient, event_cache: &EventCache, + latest_event_queue_sender: &mpsc::UnboundedSender, ) -> Self { Self { rooms: RwLock::new(HashMap::default()), - room_registration_sender, weak_client, event_cache: event_cache.clone(), + latest_event_queue_sender: latest_event_queue_sender.clone(), } } @@ -276,6 +269,33 @@ impl RegisteredRooms { room_id: &RoomId, thread_id: Option<&EventId>, ) -> Result>, LatestEventsError> { + fn create_and_insert_room_latest_events( + room_id: &RoomId, + rooms: &mut HashMap, + weak_client: &WeakClient, + event_cache: &EventCache, + latest_event_queue_sender: &mpsc::UnboundedSender, + ) { + let (room_latest_events, is_latest_event_value_none) = + With::unzip(RoomLatestEvents::new( + WeakRoom::new(weak_client.clone(), room_id.to_owned()), + event_cache, + )); + + // Insert the new `RoomLatestEvents`. + rooms.insert(room_id.to_owned(), room_latest_events); + + // If the `LatestEventValue` restored by `RoomLatestEvents` is of kind `None`, + // let's try to re-compute it without waiting on the Event Cache (so the sync + // usually) or the Send Queue. Maybe the system has migrated to a new version + // and the `LatestEventValue` has been erased, while it is still possible to + // compute a correct value. + if is_latest_event_value_none { + let _ = latest_event_queue_sender + .send(LatestEventQueueUpdate::EventCache { room_id: room_id.to_owned() }); + } + } + Ok(match thread_id { // Get the room latest event with the aim of fetching the latest event for a particular // thread. @@ -287,19 +307,13 @@ impl RegisteredRooms { // The `RoomLatestEvents` doesn't exist. Let's create and insert it. if rooms.contains_key(room_id).not() { - // Insert the room if it's been successfully created. - if let Some(room_latest_event) = RoomLatestEvents::new( - WeakRoom::new(self.weak_client.clone(), room_id.to_owned()), + create_and_insert_room_latest_events( + room_id, + rooms.deref_mut(), + &self.weak_client, &self.event_cache, - ) - .await? - { - rooms.insert(room_id.to_owned(), room_latest_event); - - let _ = self - .room_registration_sender - .send(RoomRegistration::Add(room_id.to_owned())); - } + &self.latest_event_queue_sender, + ); } if let Some(room_latest_event) = rooms.get(room_id) { @@ -308,9 +322,7 @@ impl RegisteredRooms { // In `RoomLatestEvents`, the `LatestEvent` for this thread doesn't exist. Let's // create and insert it. if room_latest_event.has_thread(thread_id).not() { - room_latest_event - .create_and_insert_latest_event_for_thread(thread_id) - .await; + room_latest_event.create_and_insert_latest_event_for_thread(thread_id); } } @@ -333,19 +345,13 @@ impl RegisteredRooms { let mut rooms = self.rooms.write().await; if rooms.contains_key(room_id).not() { - // Insert the room if it's been successfully created. - if let Some(room_latest_event) = RoomLatestEvents::new( - WeakRoom::new(self.weak_client.clone(), room_id.to_owned()), + create_and_insert_room_latest_events( + room_id, + rooms.deref_mut(), + &self.weak_client, &self.event_cache, - ) - .await? - { - rooms.insert(room_id.to_owned(), room_latest_event); - - let _ = self - .room_registration_sender - .send(RoomRegistration::Add(room_id.to_owned())); - } + &self.latest_event_queue_sender, + ); } RwLockWriteGuard::try_downgrade_map(rooms, |rooms| rooms.get(room_id)).ok() @@ -389,8 +395,6 @@ impl RegisteredRooms { // Remove the whole `RoomLatestEvents`. rooms.remove(room_id); } - - let _ = self.room_registration_sender.send(RoomRegistration::Remove(room_id.to_owned())); } /// Forget a thread. @@ -415,22 +419,9 @@ impl RegisteredRooms { } } -/// Represents whether a room has been registered or forgotten. -/// -/// This is used by [`RegisteredRooms::for_room`], -/// [`RegisteredRooms::for_thread`], [`RegisteredRooms::forget_room`] and -/// [`RegisteredRooms::forget_thread`]. -#[derive(Debug)] -enum RoomRegistration { - /// [`LatestEvents`] wants to listen to this room. - Add(OwnedRoomId), - - /// [`LatestEvents`] wants to no longer listen to this room. - Remove(OwnedRoomId), -} - /// Represents the kind of updates the [`compute_latest_events_task`] will have /// to deal with. +#[derive(Debug)] enum LatestEventQueueUpdate { /// An update from the [`EventCache`] happened. EventCache { @@ -452,14 +443,10 @@ enum LatestEventQueueUpdate { /// When an update is received and is considered relevant, a message is sent to /// the [`compute_latest_events_task`] to compute a new [`LatestEvent`]. /// -/// This task also listens to [`RoomRegistration`]. It keeps an internal list of -/// registered rooms, which helps to filter out updates we aren't interested by. -/// /// When an update is considered relevant, a message is sent over the /// `latest_event_queue_sender` channel. See [`compute_latest_events_task`]. async fn listen_to_event_cache_and_send_queue_updates_task( registered_rooms: Arc, - mut room_registration_receiver: mpsc::UnboundedReceiver, event_cache: EventCache, send_queue: SendQueue, latest_event_queue_sender: mpsc::UnboundedSender, @@ -468,20 +455,11 @@ async fn listen_to_event_cache_and_send_queue_updates_task( event_cache.subscribe_to_room_generic_updates(); let mut send_queue_generic_updates_subscriber = send_queue.subscribe(); - // Initialise the list of rooms that are listened. - // - // Technically, we can use `registered_rooms.rooms` every time to get this - // information, but it would involve a read-lock. In order to reduce the - // pressure on this lock, we use this intermediate structure. - let mut listened_rooms = - HashSet::from_iter(registered_rooms.rooms.read().await.keys().cloned()); - loop { if listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, + ®istered_rooms.rooms, &mut event_cache_generic_updates_subscriber, &mut send_queue_generic_updates_subscriber, - &mut listened_rooms, &latest_event_queue_sender, ) .await @@ -499,36 +477,15 @@ async fn listen_to_event_cache_and_send_queue_updates_task( /// Having this function detached from its task is helpful for testing and for /// state isolation. async fn listen_to_event_cache_and_send_queue_updates( - room_registration_receiver: &mut mpsc::UnboundedReceiver, + registered_rooms: &RwLock>, event_cache_generic_updates_subscriber: &mut broadcast::Receiver, send_queue_generic_updates_subscriber: &mut broadcast::Receiver, - listened_rooms: &mut HashSet, latest_event_queue_sender: &mpsc::UnboundedSender, ) -> ControlFlow<()> { - // We need a biased select here: `room_registration_receiver` must have the - // priority over other futures. select! { - biased; - - update = room_registration_receiver.recv() => { - match update { - Some(RoomRegistration::Add(room_id)) => { - listened_rooms.insert(room_id); - } - Some(RoomRegistration::Remove(room_id)) => { - listened_rooms.remove(&room_id); - } - None => { - error!("`room_registration` channel has been closed"); - - return ControlFlow::Break(()); - } - } - } - room_event_cache_generic_update = event_cache_generic_updates_subscriber.recv() => { if let Ok(RoomEventCacheGenericUpdate { room_id }) = room_event_cache_generic_update { - if listened_rooms.contains(&room_id) { + if registered_rooms.read().await.contains_key(&room_id) { let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::EventCache { room_id }); @@ -542,7 +499,7 @@ async fn listen_to_event_cache_and_send_queue_updates( send_queue_generic_update = send_queue_generic_updates_subscriber.recv() => { if let Ok(SendQueueUpdate { room_id, update }) = send_queue_generic_update { - if listened_rooms.contains(&room_id) { + if registered_rooms.read().await.contains_key(&room_id) { let _ = latest_event_queue_sender.send(LatestEventQueueUpdate::SendQueue { room_id, update @@ -627,28 +584,44 @@ async fn compute_latest_events( #[cfg(all(test, not(target_family = "wasm")))] mod tests { - use std::ops::Not; + use std::{collections::HashMap, ops::Not}; use assert_matches::assert_matches; use matrix_sdk_base::{ RoomState, deserialized_responses::TimelineEventKind, linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update}, + store::SerializableEventContent, }; use matrix_sdk_test::{JoinedRoomBuilder, async_test, event_factory::EventFactory}; use ruma::{ - OwnedTransactionId, event_id, - events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent}, + MilliSecondsSinceUnixEpoch, OwnedTransactionId, event_id, + events::{ + AnyMessageLikeEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent, + SyncMessageLikeEvent, room::message::RoomMessageEventContent, + }, owned_room_id, room_id, user_id, }; use stream_assert::assert_pending; + use tokio::task::yield_now; use super::{ - HashSet, LatestEventValue, RemoteLatestEventValue, RoomEventCacheGenericUpdate, - RoomRegistration, RoomSendQueueUpdate, SendQueueUpdate, broadcast, + LatestEventValue, LocalLatestEventValue, RegisteredRooms, RemoteLatestEventValue, + RoomEventCacheGenericUpdate, RoomLatestEvents, RoomSendQueueUpdate, RwLock, + SendQueueUpdate, WeakClient, WeakRoom, With, broadcast, listen_to_event_cache_and_send_queue_updates, mpsc, }; - use crate::test_utils::mocks::MatrixMockServer; + use crate::{latest_events::LatestEventQueueUpdate, test_utils::mocks::MatrixMockServer}; + + fn local_room_message(body: &str) -> LocalLatestEventValue { + LocalLatestEventValue { + timestamp: MilliSecondsSinceUnixEpoch::now(), + content: SerializableEventContent::new(&AnyMessageLikeEventContent::RoomMessage( + RoomMessageEventContent::text_plain(body), + )) + .unwrap(), + } + } #[async_test] async fn test_latest_events_are_lazy() { @@ -806,183 +779,66 @@ mod tests { } #[async_test] - async fn test_inputs_task_can_listen_to_room_registration() { - let room_id_0 = owned_room_id!("!r0"); - let room_id_1 = owned_room_id!("!r1"); + async fn test_inputs_task_can_listen_to_room_event_cache() { + let room_id = owned_room_id!("!r0"); - let (room_registration_sender, mut room_registration_receiver) = mpsc::unbounded_channel(); - let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let weak_client = WeakClient::from_client(&client); + let weak_room = WeakRoom::new(weak_client, room_id.clone()); + + let event_cache = client.event_cache(); + + let registered_rooms = RwLock::new(HashMap::new()); + let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = broadcast::channel(1); - let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); - // Send a _room update_ for the first time. - { - // It mimics `LatestEvents::for_room`. - room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).unwrap(); - - // Run the task. - assert!( - listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, - &mut room_event_cache_generic_update_receiver, - &mut send_queue_generic_update_receiver, - &mut listened_rooms, - &latest_event_queue_sender, - ) - .await - .is_continue() - ); - - assert_eq!(listened_rooms.len(), 1); - assert!(listened_rooms.contains(&room_id_0)); - assert!(latest_event_queue_receiver.is_empty()); - } - - // Send a _room update_ for the second time. It's the same room. + // New event cache update, but the `LatestEvents` isn't listening to it. { - room_registration_sender.send(RoomRegistration::Add(room_id_0.clone())).unwrap(); + room_event_cache_generic_update_sender + .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() }) + .unwrap(); // Run the task. assert!( listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, + ®istered_rooms, &mut room_event_cache_generic_update_receiver, &mut send_queue_generic_update_receiver, - &mut listened_rooms, &latest_event_queue_sender, ) .await .is_continue() ); - // This is the second time this room is added. Nothing happens. - assert_eq!(listened_rooms.len(), 1); - assert!(listened_rooms.contains(&room_id_0)); + // No latest event computation has been triggered. assert!(latest_event_queue_receiver.is_empty()); } - // Send another _room update_. It's a different room. + // New event cache update, but this time, the `LatestEvents` is listening to it. { - room_registration_sender.send(RoomRegistration::Add(room_id_1.to_owned())).unwrap(); - - // Run the task. - assert!( - listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, - &mut room_event_cache_generic_update_receiver, - &mut send_queue_generic_update_receiver, - &mut listened_rooms, - &latest_event_queue_sender, - ) - .await - .is_continue() + registered_rooms.write().await.insert( + room_id.clone(), + With::inner(RoomLatestEvents::new(weak_room, event_cache)), ); - - // This is the first time this room is added. It must appear. - assert_eq!(listened_rooms.len(), 2); - assert!(listened_rooms.contains(&room_id_0)); - assert!(listened_rooms.contains(&room_id_1)); - assert!(latest_event_queue_receiver.is_empty()); - } - } - - #[async_test] - async fn test_inputs_task_stops_when_room_registration_channel_is_closed() { - let (_room_registration_sender, mut room_registration_receiver) = mpsc::unbounded_channel(); - let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = - broadcast::channel(1); - let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = - broadcast::channel(1); - let mut listened_rooms = HashSet::new(); - let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); - - // Close the receiver to close the channel. - room_registration_receiver.close(); - - // Run the task. - assert!( - listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, - &mut room_event_cache_generic_update_receiver, - &mut send_queue_generic_update_receiver, - &mut listened_rooms, - &latest_event_queue_sender, - ) - .await - // It breaks! - .is_break() - ); - - assert_eq!(listened_rooms.len(), 0); - assert!(latest_event_queue_receiver.is_empty()); - } - - #[async_test] - async fn test_inputs_task_can_listen_to_room_event_cache() { - let room_id = owned_room_id!("!r0"); - - let (room_registration_sender, mut room_registration_receiver) = mpsc::unbounded_channel(); - let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = - broadcast::channel(1); - let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = - broadcast::channel(1); - let mut listened_rooms = HashSet::new(); - let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); - - // New event cache update, but the `LatestEvents` isn't listening to it. - { room_event_cache_generic_update_sender .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() }) .unwrap(); - // Run the task. assert!( listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, + ®istered_rooms, &mut room_event_cache_generic_update_receiver, &mut send_queue_generic_update_receiver, - &mut listened_rooms, &latest_event_queue_sender, ) .await .is_continue() ); - assert!(listened_rooms.is_empty()); - - // No latest event computation has been triggered. - assert!(latest_event_queue_receiver.is_empty()); - } - - // New event cache update, but this time, the `LatestEvents` is listening to it. - { - room_registration_sender.send(RoomRegistration::Add(room_id.clone())).unwrap(); - room_event_cache_generic_update_sender - .send(RoomEventCacheGenericUpdate { room_id: room_id.clone() }) - .unwrap(); - - // Run the task to handle the `RoomRegistration` and the - // `RoomEventCacheGenericUpdate`. - for _ in 0..2 { - assert!( - listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, - &mut room_event_cache_generic_update_receiver, - &mut send_queue_generic_update_receiver, - &mut listened_rooms, - &latest_event_queue_sender, - ) - .await - .is_continue() - ); - } - - assert_eq!(listened_rooms.len(), 1); - assert!(listened_rooms.contains(&room_id)); - // A latest event computation has been triggered! assert!(latest_event_queue_receiver.is_empty().not()); } @@ -992,12 +848,19 @@ mod tests { async fn test_inputs_task_can_listen_to_send_queue() { let room_id = owned_room_id!("!r0"); - let (room_registration_sender, mut room_registration_receiver) = mpsc::unbounded_channel(); + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let weak_client = WeakClient::from_client(&client); + let weak_room = WeakRoom::new(weak_client, room_id.clone()); + + let event_cache = client.event_cache(); + + let registered_rooms = RwLock::new(HashMap::new()); + let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = broadcast::channel(1); - let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); // New send queue update, but the `LatestEvents` isn't listening to it. @@ -1015,25 +878,25 @@ mod tests { // Run the task. assert!( listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, + ®istered_rooms, &mut room_event_cache_generic_update_receiver, &mut send_queue_generic_update_receiver, - &mut listened_rooms, &latest_event_queue_sender, ) .await .is_continue() ); - assert!(listened_rooms.is_empty()); - // No latest event computation has been triggered. assert!(latest_event_queue_receiver.is_empty()); } // New send queue update, but this time, the `LatestEvents` is listening to it. { - room_registration_sender.send(RoomRegistration::Add(room_id.clone())).unwrap(); + registered_rooms.write().await.insert( + room_id.clone(), + With::inner(RoomLatestEvents::new(weak_room, event_cache)), + ); send_queue_generic_update_sender .send(SendQueueUpdate { room_id: room_id.clone(), @@ -1044,23 +907,16 @@ mod tests { }) .unwrap(); - // Run the task to handle the `RoomRegistration` and the `SendQueueUpdate`. - for _ in 0..2 { - assert!( - listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, - &mut room_event_cache_generic_update_receiver, - &mut send_queue_generic_update_receiver, - &mut listened_rooms, - &latest_event_queue_sender, - ) - .await - .is_continue() - ); - } - - assert_eq!(listened_rooms.len(), 1); - assert!(listened_rooms.contains(&room_id)); + assert!( + listen_to_event_cache_and_send_queue_updates( + ®istered_rooms, + &mut room_event_cache_generic_update_receiver, + &mut send_queue_generic_update_receiver, + &latest_event_queue_sender, + ) + .await + .is_continue() + ); // A latest event computation has been triggered! assert!(latest_event_queue_receiver.is_empty().not()); @@ -1069,12 +925,11 @@ mod tests { #[async_test] async fn test_inputs_task_stops_when_event_cache_channel_is_closed() { - let (_room_registration_sender, mut room_registration_receiver) = mpsc::unbounded_channel(); + let registered_rooms = RwLock::new(HashMap::new()); let (room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); let (_send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = broadcast::channel(1); - let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); // Drop the sender to close the channel. @@ -1083,10 +938,9 @@ mod tests { // Run the task. assert!( listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, + ®istered_rooms, &mut room_event_cache_generic_update_receiver, &mut send_queue_generic_update_receiver, - &mut listened_rooms, &latest_event_queue_sender, ) .await @@ -1094,18 +948,16 @@ mod tests { .is_break() ); - assert_eq!(listened_rooms.len(), 0); assert!(latest_event_queue_receiver.is_empty()); } #[async_test] async fn test_inputs_task_stops_when_send_queue_channel_is_closed() { - let (_room_registration_sender, mut room_registration_receiver) = mpsc::unbounded_channel(); + let registered_rooms = RwLock::new(HashMap::new()); let (_room_event_cache_generic_update_sender, mut room_event_cache_generic_update_receiver) = broadcast::channel(1); let (send_queue_generic_update_sender, mut send_queue_generic_update_receiver) = broadcast::channel(1); - let mut listened_rooms = HashSet::new(); let (latest_event_queue_sender, latest_event_queue_receiver) = mpsc::unbounded_channel(); // Drop the sender to close the channel. @@ -1114,10 +966,9 @@ mod tests { // Run the task. assert!( listen_to_event_cache_and_send_queue_updates( - &mut room_registration_receiver, + ®istered_rooms, &mut room_event_cache_generic_update_receiver, &mut send_queue_generic_update_receiver, - &mut listened_rooms, &latest_event_queue_sender, ) .await @@ -1125,7 +976,6 @@ mod tests { .is_break() ); - assert_eq!(listened_rooms.len(), 0); assert!(latest_event_queue_receiver.is_empty()); } @@ -1134,13 +984,67 @@ mod tests { let room_id = owned_room_id!("!r0"); let user_id = user_id!("@mnt_io:matrix.org"); let event_factory = EventFactory::new().sender(user_id).room(&room_id); - let event_id_0 = event_id!("$ev0"); - let event_id_1 = event_id!("$ev1"); let event_id_2 = event_id!("$ev2"); let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; + // Create the room. + client.base_client().get_or_create_room(&room_id, RoomState::Joined); + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let latest_events = client.latest_events().await; + + // Subscribe to the latest event values for this room. + let mut latest_event_stream = + latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap(); + + // The stream is pending: no new latest event for the moment. + assert_pending!(latest_event_stream); + + // Update the event cache with a sync. + server + .sync_room( + &client, + JoinedRoomBuilder::new(&room_id) + .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)), + ) + .await; + + // The event cache has received its update from the sync. It has emitted a + // generic update, which has been received by `LatestEvents` tasks, up to the + // `compute_latest_events` which has updated the latest event value. + assert_matches!( + latest_event_stream.next().await, + Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => { + assert_matches!( + event.deserialize().unwrap(), + AnySyncTimelineEvent::MessageLike( + AnySyncMessageLikeEvent::RoomMessage( + SyncMessageLikeEvent::Original(message_content) + ) + ) => { + assert_eq!(message_content.content.body(), "raclette !"); + } + ); + } + ); + + assert_pending!(latest_event_stream); + } + + #[async_test] + async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily() { + let room_id = owned_room_id!("!r0"); + let user_id = user_id!("@mnt_io:matrix.org"); + let event_factory = EventFactory::new().sender(user_id).room(&room_id); + let event_id_0 = event_id!("$ev0"); + + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + // Prelude. { // Create the room. @@ -1166,7 +1070,6 @@ mod tests { at: Position::new(ChunkIdentifier::new(0), 0), items: vec![ event_factory.text_msg("hello").event_id(event_id_0).into(), - event_factory.text_msg("world").event_id(event_id_1).into(), ], }, ], @@ -1180,57 +1083,82 @@ mod tests { let latest_events = client.latest_events().await; - // Subscribe to the latest event values for this room. let mut latest_event_stream = latest_events.listen_and_subscribe_to_room(&room_id).await.unwrap().unwrap(); - // The initial latest event value is set to `event_id_1` because it's… the… - // latest event! - assert_matches!( - latest_event_stream.get().await, - LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. }) => { - assert_matches!( - event.deserialize().unwrap(), - AnySyncTimelineEvent::MessageLike( - AnySyncMessageLikeEvent::RoomMessage( - SyncMessageLikeEvent::Original(message_content) - ) - ) => { - assert_eq!(message_content.content.body(), "world"); - } - ); - } - ); + // We have a race if the system is busy. Initially, the latest event + // value is `LatestEventValue::None`, then an Event Cache generic update + // is broadcasted manually, computing a new `LatestEventValue`. So let's + // wait on the system to finish this, and assert the final + // `LatestEventValue`. + yield_now().await; + assert_matches!(latest_event_stream.next_now().await, LatestEventValue::Remote(_)); - // The stream is pending: no new latest event for the moment. assert_pending!(latest_event_stream); + } - // Update the event cache with a sync. - server - .sync_room( - &client, - JoinedRoomBuilder::new(&room_id) - .add_timeline_event(event_factory.text_msg("raclette !").event_id(event_id_2)), - ) - .await; + /// This tests a part of + /// [`test_latest_event_value_is_initialized_by_the_event_cache_lazily`]. + /// + /// When `RegisteredRooms::room_latest_events` restores a + /// `LatestEventValue::None` (via `RoomLatestEvents::new`), + /// a `LatestEventQueueUpdate::EventCache` is broadcasted to compute a + /// `LatestEventValue` from the Event Cache lazily. + #[async_test] + async fn test_latest_event_value_is_initialized_by_the_event_cache_lazily_inner() { + let room_id_0 = owned_room_id!("!r0"); + let room_id_1 = owned_room_id!("!r1"); - // The event cache has received its update from the sync. It has emitted a - // generic update, which has been received by `LatestEvents` tasks, up to the - // `compute_latest_events` which has updated the latest event value. - assert_matches!( - latest_event_stream.next().await, - Some(LatestEventValue::Remote(RemoteLatestEventValue { kind: TimelineEventKind::PlainText { event }, .. })) => { - assert_matches!( - event.deserialize().unwrap(), - AnySyncTimelineEvent::MessageLike( - AnySyncMessageLikeEvent::RoomMessage( - SyncMessageLikeEvent::Original(message_content) - ) - ) => { - assert_eq!(message_content.content.body(), "raclette !"); - } - ); - } - ); + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + // Create the rooms. + let room_0 = client.base_client().get_or_create_room(&room_id_0, RoomState::Joined); + let room_1 = client.base_client().get_or_create_room(&room_id_1, RoomState::Joined); + + // Set up the rooms. + // `room_0` always has a `LatestEventValue::None` as its the default value. + let mut room_info_1 = room_0.clone_info(); + room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo"))); + room_1.set_room_info(room_info_1, Default::default()); + + let weak_client = WeakClient::from_client(&client); + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let (latest_event_queue_sender, mut latest_event_queue_receiver) = + mpsc::unbounded_channel(); + + let registered_rooms = + RegisteredRooms::new(weak_client, event_cache, &latest_event_queue_sender); + + // Room 0 has a `LatestEventValue::None`, a + // `LatestEventQueueUpdate::EventCache` will be broadcasted. + { + let room_latest_events = registered_rooms.for_room(&room_id_0).await.unwrap().unwrap(); + assert_matches!( + room_latest_events.read().await.for_room().get().await, + LatestEventValue::None + ); + assert_matches!( + latest_event_queue_receiver.recv().await, + Some(LatestEventQueueUpdate::EventCache { room_id }) => { + assert_eq!(room_id, room_id_0); + } + ); + assert!(latest_event_queue_receiver.is_empty()); + } + + // Room 1 has a `LatestEventValue::Local*`, a + // `LatestEventQueueUpdate::EventCache` will NOT be broadcasted. + { + let room_latest_events = registered_rooms.for_room(&room_id_1).await.unwrap().unwrap(); + assert_matches!( + room_latest_events.read().await.for_room().get().await, + LatestEventValue::LocalIsSending(_) + ); + assert!(latest_event_queue_receiver.is_empty()); + } } } diff --git a/crates/matrix-sdk/src/latest_events/room_latest_events.rs b/crates/matrix-sdk/src/latest_events/room_latest_events.rs index 242edb84daa..ac03013738d 100644 --- a/crates/matrix-sdk/src/latest_events/room_latest_events.rs +++ b/crates/matrix-sdk/src/latest_events/room_latest_events.rs @@ -14,10 +14,15 @@ use std::{collections::HashMap, sync::Arc}; +use async_once_cell::OnceCell; use ruma::{EventId, OwnedEventId}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; +use tracing::error; -use super::{LatestEvent, LatestEventsError}; +use super::{ + LatestEvent, + latest_event::{IsLatestEventValueNone, With}, +}; use crate::{ event_cache::{EventCache, EventCacheError, RoomEventCache}, room::WeakRoom, @@ -33,40 +38,28 @@ pub(super) struct RoomLatestEvents { impl RoomLatestEvents { /// Create a new [`RoomLatestEvents`]. - pub async fn new( + pub fn new( weak_room: WeakRoom, event_cache: &EventCache, - ) -> Result, LatestEventsError> { - let room_id = weak_room.room_id(); - let room_event_cache = match event_cache.for_room(room_id).await { - // It's fine to drop the `EventCacheDropHandles` here as the caller - // (`LatestEventState`) owns a clone of the `EventCache`. - Ok((room_event_cache, _drop_handles)) => room_event_cache, - Err(EventCacheError::RoomNotFound { .. }) => return Ok(None), - Err(err) => return Err(LatestEventsError::EventCache(err)), - }; + ) -> With { + let latest_event_with = Self::create_latest_event(&weak_room, None); - Ok(Some(Self { + With::map(latest_event_with, |for_the_room| Self { state: Arc::new(RwLock::new(RoomLatestEventsState { - for_the_room: Self::create_latest_event_for_inner( - &weak_room, - None, - &room_event_cache, - ) - .await, + for_the_room, per_thread: HashMap::new(), weak_room, - room_event_cache, + event_cache: event_cache.clone(), + room_event_cache: OnceCell::new(), })), - })) + }) } - async fn create_latest_event_for_inner( + fn create_latest_event( weak_room: &WeakRoom, thread_id: Option<&EventId>, - room_event_cache: &RoomEventCache, - ) -> LatestEvent { - LatestEvent::new(weak_room, thread_id, room_event_cache).await + ) -> With { + LatestEvent::new(weak_room, thread_id) } /// Lock this type with shared read access, and return an owned lock guard. @@ -90,8 +83,11 @@ struct RoomLatestEventsState { /// The latest events for each thread. per_thread: HashMap, - /// The room event cache associated to this room. - room_event_cache: RoomEventCache, + /// The event cache. + event_cache: EventCache, + + /// The room event cache (lazily-loaded). + room_event_cache: OnceCell, /// The (weak) room. /// @@ -128,15 +124,6 @@ pub(super) struct RoomLatestEventsWriteGuard { } impl RoomLatestEventsWriteGuard { - async fn create_latest_event_for(&self, thread_id: Option<&EventId>) -> LatestEvent { - RoomLatestEvents::create_latest_event_for_inner( - &self.inner.weak_room, - thread_id, - &self.inner.room_event_cache, - ) - .await - } - /// Check whether this [`RoomLatestEvents`] has a latest event for a /// particular thread. pub fn has_thread(&self, thread_id: &EventId) -> bool { @@ -145,10 +132,11 @@ impl RoomLatestEventsWriteGuard { /// Create the [`LatestEvent`] for thread `thread_id` and insert it in this /// [`RoomLatestEvents`]. - pub async fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) { - let latest_event = self.create_latest_event_for(Some(thread_id)).await; + pub fn create_and_insert_latest_event_for_thread(&mut self, thread_id: &EventId) { + let latest_event_with = + RoomLatestEvents::create_latest_event(&self.inner.weak_room, Some(thread_id)); - self.inner.per_thread.insert(thread_id.to_owned(), latest_event); + self.inner.per_thread.insert(thread_id.to_owned(), With::inner(latest_event_with)); } /// Forget the thread `thread_id`. @@ -164,21 +152,38 @@ impl RoomLatestEventsWriteGuard { // // Get it once for all the updates of all the latest events for this room (be // the room and its threads). - let room = self.inner.weak_room.get(); - let (own_user_id, power_levels) = match &room { - Some(room) => { - let power_levels = room.power_levels().await.ok(); - - (Some(room.own_user_id()), power_levels) - } + let Some(room) = self.inner.weak_room.get() else { + // No room? Let's stop the update. + error!(room = ?self.inner.weak_room, "Room is unknown"); - None => (None, None), + return; }; + let own_user_id = room.own_user_id(); + let power_levels = room.power_levels().await.ok(); let inner = &mut *self.inner; let for_the_room = &mut inner.for_the_room; let per_thread = &mut inner.per_thread; - let room_event_cache = &inner.room_event_cache; + + // Lazy-load the `RoomEventCache`. + let room_event_cache = match inner + .room_event_cache + .get_or_try_init(async { + // It's fine to drop the `EventCacheDropHandles` here as the caller + // (`LatestEventState`) owns a clone of the `EventCache`. + let (room_event_cache, _drop_handles) = + inner.event_cache.for_room(room.room_id()).await?; + + Ok::(room_event_cache) + }) + .await + { + Ok(room_event_cache) => room_event_cache, + Err(err) => { + error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`"); + return; + } + }; for_the_room .update_with_event_cache(room_event_cache, own_user_id, power_levels.as_ref()) @@ -199,21 +204,36 @@ impl RoomLatestEventsWriteGuard { // // Get it once for all the updates of all the latest events for this room (be // the room and its threads). - let room = self.inner.weak_room.get(); - let (own_user_id, power_levels) = match &room { - Some(room) => { - let power_levels = room.power_levels().await.ok(); - - (Some(room.own_user_id()), power_levels) - } - - None => (None, None), + let Some(room) = self.inner.weak_room.get() else { + // No room? Let's stop the update. + return; }; + let own_user_id = room.own_user_id(); + let power_levels = room.power_levels().await.ok(); let inner = &mut *self.inner; let for_the_room = &mut inner.for_the_room; let per_thread = &mut inner.per_thread; - let room_event_cache = &inner.room_event_cache; + + // Lazy-load the `RoomEventCache`. + let room_event_cache = match inner + .room_event_cache + .get_or_try_init(async { + // It's fine to drop the `EventCacheDropHandles` here as the caller + // (`LatestEventState`) owns a clone of the `EventCache`. + let (room_event_cache, _drop_handles) = + inner.event_cache.for_room(room.room_id()).await?; + + Ok::(room_event_cache) + }) + .await + { + Ok(room_event_cache) => room_event_cache, + Err(err) => { + error!(room_id = ?room.room_id(), ?err, "Failed to fetch the `RoomEventCache`"); + return; + } + }; for_the_room .update_with_send_queue(