2323//! - The event is a historic event and we need to first download the room
2424//! key from the backup.
2525//! - The event is a historic event in a previously unjoined room, we need
26- //! to receive historic room keys as defined in [MSC3061](https://github.com/matrix-org/matrix-spec/pull/1655#issuecomment-2213152255) .
26+ //! to receive historic room keys as defined in [MSC3061].
2727//!
2828//! R2D2 listens to the OlmMachine for received room keys and new
2929//! m.room_key.withheld events.
109109//! │ │ │ │
110110//! └──────────────┘ └──────────────┘
111111//! ```
112+ //!
113+ //! [MSC3061]: https://github.com/matrix-org/matrix-spec/pull/1655#issuecomment-2213152255
112114
113115use std:: { collections:: BTreeSet , pin:: Pin , sync:: Weak } ;
114116
@@ -638,6 +640,22 @@ impl EventCache {
638640 }
639641}
640642
643+ #[ inline( always) ]
644+ fn upgrade_event_cache ( cache : & Weak < EventCacheInner > ) -> Option < EventCache > {
645+ cache. upgrade ( ) . map ( |inner| EventCache { inner } )
646+ }
647+
648+ fn report_lag ( cache : & Weak < EventCacheInner > ) -> Result < ( ) , ( ) > {
649+ let Some ( cache) = upgrade_event_cache ( cache) else {
650+ return Err ( ( ) ) ;
651+ } ;
652+
653+ let message = RedecryptorReport :: Lagging ;
654+ let _ = cache. inner . redecryption_channels . utd_reporter . send ( message) ;
655+
656+ Ok ( ( ) )
657+ }
658+
641659/// Struct holding on to the redecryption task.
642660///
643661/// This struct implements the bulk of the redecryption task. It listens to the
@@ -694,11 +712,6 @@ impl Redecryptor {
694712 } )
695713 }
696714
697- #[ inline( always) ]
698- fn upgrade_event_cache ( cache : & Weak < EventCacheInner > ) -> Option < EventCache > {
699- cache. upgrade ( ) . map ( |inner| EventCache { inner } )
700- }
701-
702715 async fn redecryption_loop (
703716 cache : & Weak < EventCacheInner > ,
704717 decryption_request_stream : & mut Pin < & mut impl Stream < Item = DecryptionRetryRequest > > ,
@@ -720,7 +733,7 @@ impl Redecryptor {
720733 // An explicit request, presumably from the timeline, has been received to decrypt
721734 // events that were encrypted with a certain room key.
722735 Some ( request) = decryption_request_stream. next( ) => {
723- let Some ( cache) = Self :: upgrade_event_cache( cache) else {
736+ let Some ( cache) = upgrade_event_cache( cache) else {
724737 break false ;
725738 } ;
726739
@@ -750,7 +763,7 @@ impl Redecryptor {
750763 // Alright, some room keys were received and persisted in our store,
751764 // let's attempt to redecrypt events that were encrypted using these
752765 // room keys.
753- let Some ( cache) = Self :: upgrade_event_cache( cache) else {
766+ let Some ( cache) = upgrade_event_cache( cache) else {
754767 break false ;
755768 } ;
756769
@@ -779,12 +792,9 @@ impl Redecryptor {
779792 // This would most likely be the timeline from the UI crate. The
780793 // timeline might attempt to redecrypt all UTDs it is showing to the
781794 // user.
782- let Some ( cache) = Self :: upgrade_event_cache ( cache ) else {
795+ if report_lag ( cache) . is_err ( ) {
783796 break false ;
784- } ;
785-
786- let message = RedecryptorReport :: Lagging ;
787- let _ = cache. inner. redecryption_channels. utd_reporter. send( message) ;
797+ }
788798 } ,
789799 // The stream got closed, this could mean that our OlmMachine got
790800 // regenerated, let's return true and try to recreate the stream.
@@ -796,7 +806,7 @@ impl Redecryptor {
796806 withheld_info = withheld_stream. next( ) => {
797807 match withheld_info {
798808 Some ( infos) => {
799- let Some ( cache) = Self :: upgrade_event_cache( cache) else {
809+ let Some ( cache) = upgrade_event_cache( cache) else {
800810 break false ;
801811 } ;
802812
@@ -822,7 +832,7 @@ impl Redecryptor {
822832 Some ( event_updates) = events_stream. next( ) => {
823833 match event_updates {
824834 Ok ( updates) => {
825- let Some ( cache) = Self :: upgrade_event_cache( cache) else {
835+ let Some ( cache) = upgrade_event_cache( cache) else {
826836 break false ;
827837 } ;
828838
@@ -836,12 +846,9 @@ impl Redecryptor {
836846 ) ;
837847 }
838848 Err ( _) => {
839- let Some ( cache) = Self :: upgrade_event_cache ( cache ) else {
849+ if report_lag ( cache) . is_err ( ) {
840850 break false ;
841- } ;
842-
843- let message = RedecryptorReport :: Lagging ;
844- let _ = cache. inner. redecryption_channels. utd_reporter. send( message) ;
851+ }
845852 }
846853 }
847854 }
@@ -866,14 +873,11 @@ impl Redecryptor {
866873 {
867874 info ! ( "Regenerating the re-decryption streams" ) ;
868875
869- let Some ( cache) = Self :: upgrade_event_cache ( & cache) else {
870- break ;
871- } ;
872-
873876 // Report that the stream got recreated so listeners can attempt to redecrypt
874877 // any UTDs they might be seeing.
875- let message = RedecryptorReport :: Lagging ;
876- let _ = cache. inner . redecryption_channels . utd_reporter . send ( message) ;
878+ if report_lag ( & cache) . is_err ( ) {
879+ break ;
880+ }
877881 }
878882
879883 info ! ( "Shutting down the event cache redecryptor" ) ;
0 commit comments