diff --git a/crates/camera-directshow/examples/cli.rs b/crates/camera-directshow/examples/cli.rs index d712c1a655..f154aa99a6 100644 --- a/crates/camera-directshow/examples/cli.rs +++ b/crates/camera-directshow/examples/cli.rs @@ -114,10 +114,11 @@ mod windows { .start_capturing( &selected_format.media_type, Box::new(|frame| { - unsafe { dbg!(frame.sample.GetActualDataLength()) }; - // dbg!(frame.media_type.subtype_str()); - // dbg!(frame.reference_time); - dbg!(frame.timestamp); + let data_length = unsafe { frame.sample.GetActualDataLength() }; + println!( + "Frame: data_length={data_length:?}, timestamp={:?}", + frame.timestamp + ); }), ) .unwrap(); diff --git a/crates/camera-mediafoundation/examples/cli.rs b/crates/camera-mediafoundation/examples/cli.rs index 317e15f65c..86e4881e37 100644 --- a/crates/camera-mediafoundation/examples/cli.rs +++ b/crates/camera-mediafoundation/examples/cli.rs @@ -21,7 +21,7 @@ mod windows { let device_sources = DeviceSourcesIterator::new().unwrap(); - if device_sources.len() == 0 { + if device_sources.is_empty() { warn!("No devices found"); return; } diff --git a/crates/camera-windows/examples/cli.rs b/crates/camera-windows/examples/cli.rs index 76b241bb7e..fbf898bc06 100644 --- a/crates/camera-windows/examples/cli.rs +++ b/crates/camera-windows/examples/cli.rs @@ -32,7 +32,8 @@ mod windows { let Ok(bytes) = frame.bytes() else { return; }; - dbg!( + println!( + "Frame: len={}, pixel_format={:?}, timestamp={:?}, perf_counter={:?}", bytes.len(), frame.pixel_format, frame.timestamp, diff --git a/crates/enc-avfoundation/src/segmented.rs b/crates/enc-avfoundation/src/segmented.rs index 8cae09c910..c1053a960b 100644 --- a/crates/enc-avfoundation/src/segmented.rs +++ b/crates/enc-avfoundation/src/segmented.rs @@ -3,7 +3,37 @@ use cap_media_info::{AudioInfo, VideoInfo}; use cidre::arc; use ffmpeg::frame; use serde::Serialize; -use std::{path::PathBuf, time::Duration}; +use std::{ + io::Write, + path::{Path, PathBuf}, + time::Duration, +}; + +fn atomic_write_json(path: &Path, data: &T) -> std::io::Result<()> { + let temp_path = path.with_extension("json.tmp"); + let json = serde_json::to_string_pretty(data) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + let mut file = std::fs::File::create(&temp_path)?; + file.write_all(json.as_bytes())?; + file.sync_all()?; + + std::fs::rename(&temp_path, path)?; + + if let Some(parent) = path.parent() + && let Ok(dir) = std::fs::File::open(parent) + { + let _ = dir.sync_all(); + } + + Ok(()) +} + +fn sync_file(path: &Path) { + if let Ok(file) = std::fs::File::open(path) { + let _ = file.sync_all(); + } +} pub struct SegmentedMP4Encoder { base_path: PathBuf, @@ -24,6 +54,7 @@ pub struct SegmentInfo { pub path: PathBuf, pub index: u32, pub duration: Duration, + pub file_size: Option, } #[derive(Serialize)] @@ -32,10 +63,15 @@ struct FragmentEntry { index: u32, duration: f64, is_complete: bool, + #[serde(skip_serializing_if = "Option::is_none")] + file_size: Option, } +const MANIFEST_VERSION: u32 = 2; + #[derive(Serialize)] struct Manifest { + version: u32, fragments: Vec, #[serde(skip_serializing_if = "Option::is_none")] total_duration: Option, @@ -55,7 +91,7 @@ impl SegmentedMP4Encoder { let segment_path = base_path.join("fragment_000.mp4"); let encoder = MP4Encoder::init(segment_path, video_config, audio_config, output_height)?; - Ok(Self { + let instance = Self { base_path, video_config, audio_config, @@ -65,7 +101,11 @@ impl SegmentedMP4Encoder { segment_duration, segment_start_time: None, completed_segments: Vec::new(), - }) + }; + + instance.write_in_progress_manifest(); + + Ok(instance) } pub fn queue_video_frame( @@ -106,15 +146,27 @@ impl SegmentedMP4Encoder { fn rotate_segment(&mut self, timestamp: Duration) -> Result<(), QueueFrameError> { let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); let segment_duration = timestamp.saturating_sub(segment_start); + let completed_segment_path = self.current_segment_path(); if let Some(mut encoder) = self.current_encoder.take() { - let _ = encoder.finish(Some(timestamp)); + if let Err(e) = encoder.finish(Some(timestamp)) { + tracing::warn!("Failed to finish encoder during rotation: {e}"); + } + + sync_file(&completed_segment_path); + + let file_size = std::fs::metadata(&completed_segment_path) + .ok() + .map(|m| m.len()); self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), + path: completed_segment_path, index: self.current_index, duration: segment_duration, + file_size, }); + + self.write_manifest(); } self.current_index += 1; @@ -131,7 +183,7 @@ impl SegmentedMP4Encoder { .map_err(|_| QueueFrameError::Failed)?, ); - self.write_manifest(); + self.write_in_progress_manifest(); Ok(()) } @@ -143,6 +195,7 @@ impl SegmentedMP4Encoder { fn write_manifest(&self) { let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -156,6 +209,7 @@ impl SegmentedMP4Encoder { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: None, @@ -163,10 +217,59 @@ impl SegmentedMP4Encoder { }; let manifest_path = self.base_path.join("manifest.json"); - let _ = std::fs::write( - manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ); + if let Err(e) = atomic_write_json(&manifest_path, &manifest) { + tracing::warn!( + "Failed to write manifest to {}: {e}", + manifest_path.display() + ); + } + } + + fn write_in_progress_manifest(&self) { + let mut fragments: Vec = self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(); + + fragments.push(FragmentEntry { + path: self + .current_segment_path() + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: self.current_index, + duration: 0.0, + is_complete: false, + file_size: None, + }); + + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments, + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = atomic_write_json(&manifest_path, &manifest) { + tracing::warn!( + "Failed to write in-progress manifest to {}: {e}", + manifest_path.display() + ); + } } pub fn pause(&mut self) { @@ -182,20 +285,25 @@ impl SegmentedMP4Encoder { } pub fn finish(&mut self, timestamp: Option) -> Result<(), FinishError> { - if let Some(segment_start) = self.segment_start_time { - let final_duration = timestamp - .unwrap_or(segment_start) - .saturating_sub(segment_start); - - self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), - index: self.current_index, - duration: final_duration, - }); - } + let segment_path = self.current_segment_path(); + let segment_start = self.segment_start_time; if let Some(mut encoder) = self.current_encoder.take() { encoder.finish(timestamp)?; + + sync_file(&segment_path); + + if let Some(start) = segment_start { + let final_duration = timestamp.unwrap_or(start).saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: self.current_index, + duration: final_duration, + file_size, + }); + } } self.finalize_manifest(); @@ -207,6 +315,7 @@ impl SegmentedMP4Encoder { let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -220,6 +329,7 @@ impl SegmentedMP4Encoder { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: Some(total_duration.as_secs_f64()), @@ -227,10 +337,12 @@ impl SegmentedMP4Encoder { }; let manifest_path = self.base_path.join("manifest.json"); - let _ = std::fs::write( - manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ); + if let Err(e) = atomic_write_json(&manifest_path, &manifest) { + tracing::warn!( + "Failed to write final manifest to {}: {e}", + manifest_path.display() + ); + } } pub fn completed_segments(&self) -> &[SegmentInfo] { diff --git a/crates/enc-ffmpeg/src/mux/segmented_audio.rs b/crates/enc-ffmpeg/src/mux/segmented_audio.rs index b00f7c4a33..1f54d3e743 100644 --- a/crates/enc-ffmpeg/src/mux/segmented_audio.rs +++ b/crates/enc-ffmpeg/src/mux/segmented_audio.rs @@ -2,7 +2,43 @@ use crate::audio::aac::{AACEncoder, AACEncoderError}; use cap_media_info::AudioInfo; use ffmpeg::{format, frame}; use serde::Serialize; -use std::{path::PathBuf, time::Duration}; +use std::{ + io::Write, + path::{Path, PathBuf}, + time::Duration, +}; + +fn atomic_write_json(path: &Path, data: &T) -> std::io::Result<()> { + let temp_path = path.with_extension("json.tmp"); + let json = serde_json::to_string_pretty(data) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + let mut file = std::fs::File::create(&temp_path)?; + file.write_all(json.as_bytes())?; + file.sync_all()?; + + std::fs::rename(&temp_path, path)?; + + if let Some(parent) = path.parent() + && let Ok(dir) = std::fs::File::open(parent) + && let Err(e) = dir.sync_all() + { + tracing::warn!( + "Directory fsync failed after rename for {}: {e}", + parent.display() + ); + } + + Ok(()) +} + +fn sync_file(path: &Path) { + if let Ok(file) = std::fs::File::open(path) + && let Err(e) = file.sync_all() + { + tracing::warn!("File fsync failed for {}: {e}", path.display()); + } +} pub struct SegmentedAudioEncoder { base_path: PathBuf, @@ -28,6 +64,7 @@ pub struct SegmentInfo { pub path: PathBuf, pub index: u32, pub duration: Duration, + pub file_size: Option, } #[derive(Serialize)] @@ -36,10 +73,15 @@ struct FragmentEntry { index: u32, duration: f64, is_complete: bool, + #[serde(skip_serializing_if = "Option::is_none")] + file_size: Option, } +const MANIFEST_VERSION: u32 = 2; + #[derive(Serialize)] struct Manifest { + version: u32, fragments: Vec, #[serde(skip_serializing_if = "Option::is_none")] total_duration: Option, @@ -81,7 +123,7 @@ impl SegmentedAudioEncoder { let segment_path = base_path.join("fragment_000.m4a"); let encoder = Self::create_segment_encoder(segment_path, audio_config)?; - Ok(Self { + let instance = Self { base_path, audio_config, current_encoder: Some(encoder), @@ -90,7 +132,11 @@ impl SegmentedAudioEncoder { segment_start_time: None, last_frame_timestamp: None, completed_segments: Vec::new(), - }) + }; + + instance.write_in_progress_manifest(); + + Ok(instance) } fn create_segment_encoder( @@ -149,16 +195,30 @@ impl SegmentedAudioEncoder { fn rotate_segment(&mut self, timestamp: Duration) -> Result<(), QueueFrameError> { let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); let segment_duration = timestamp.saturating_sub(segment_start); + let completed_segment_path = self.current_segment_path(); if let Some(mut encoder) = self.current_encoder.take() { - let _ = encoder.encoder.flush(&mut encoder.output); - let _ = encoder.output.write_trailer(); + if let Err(e) = encoder.encoder.flush(&mut encoder.output) { + tracing::warn!("Audio encoder flush warning during rotation: {e}"); + } + if let Err(e) = encoder.output.write_trailer() { + tracing::warn!("Audio write_trailer warning during rotation: {e}"); + } + + sync_file(&completed_segment_path); + + let file_size = std::fs::metadata(&completed_segment_path) + .ok() + .map(|m| m.len()); self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), + path: completed_segment_path, index: self.current_index, duration: segment_duration, + file_size, }); + + self.write_manifest(); } self.current_index += 1; @@ -167,7 +227,7 @@ impl SegmentedAudioEncoder { let new_path = self.current_segment_path(); self.current_encoder = Some(Self::create_segment_encoder(new_path, self.audio_config)?); - self.write_manifest(); + self.write_in_progress_manifest(); Ok(()) } @@ -179,6 +239,7 @@ impl SegmentedAudioEncoder { fn write_manifest(&self) { let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -192,6 +253,7 @@ impl SegmentedAudioEncoder { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: None, @@ -199,28 +261,68 @@ impl SegmentedAudioEncoder { }; let manifest_path = self.base_path.join("manifest.json"); - let _ = std::fs::write( - manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ); + if let Err(e) = atomic_write_json(&manifest_path, &manifest) { + tracing::warn!( + "Failed to write manifest to {}: {e}", + manifest_path.display() + ); + } + } + + fn write_in_progress_manifest(&self) { + let mut fragments: Vec = self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(); + + fragments.push(FragmentEntry { + path: self + .current_segment_path() + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: self.current_index, + duration: 0.0, + is_complete: false, + file_size: None, + }); + + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments, + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = atomic_write_json(&manifest_path, &manifest) { + tracing::warn!( + "Failed to write in-progress manifest to {}: {e}", + manifest_path.display() + ); + } } pub fn finish(&mut self) -> Result<(), FinishError> { + let segment_path = self.current_segment_path(); + let segment_start = self.segment_start_time; + let last_timestamp = self.last_frame_timestamp; + if let Some(mut encoder) = self.current_encoder.take() { if encoder.has_frames { - if let Some(segment_start) = self.segment_start_time { - let final_duration = self - .last_frame_timestamp - .unwrap_or(segment_start) - .saturating_sub(segment_start); - - self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), - index: self.current_index, - duration: final_duration, - }); - } - let flush_result = encoder.encoder.flush(&mut encoder.output); let trailer_result = encoder.output.write_trailer(); @@ -230,9 +332,30 @@ impl SegmentedAudioEncoder { if let Err(e) = &trailer_result { tracing::warn!("Audio write_trailer warning: {e}"); } + + sync_file(&segment_path); + + if let Some(start) = segment_start { + let final_duration = last_timestamp.unwrap_or(start).saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: self.current_index, + duration: final_duration, + file_size, + }); + } } else { - let _ = encoder.output.write_trailer(); - let _ = std::fs::remove_file(self.current_segment_path()); + if let Err(e) = encoder.output.write_trailer() { + tracing::trace!("Audio write_trailer on empty segment: {e}"); + } + if let Err(e) = std::fs::remove_file(&segment_path) { + tracing::trace!( + "Failed to remove empty audio segment {}: {e}", + segment_path.display() + ); + } } } @@ -242,18 +365,11 @@ impl SegmentedAudioEncoder { } pub fn finish_with_timestamp(&mut self, timestamp: Duration) -> Result<(), FinishError> { + let segment_path = self.current_segment_path(); + let segment_start = self.segment_start_time; + if let Some(mut encoder) = self.current_encoder.take() { if encoder.has_frames { - if let Some(segment_start) = self.segment_start_time { - let final_duration = timestamp.saturating_sub(segment_start); - - self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), - index: self.current_index, - duration: final_duration, - }); - } - let flush_result = encoder.encoder.flush(&mut encoder.output); let trailer_result = encoder.output.write_trailer(); @@ -263,9 +379,30 @@ impl SegmentedAudioEncoder { if let Err(e) = &trailer_result { tracing::warn!("Audio write_trailer warning: {e}"); } + + sync_file(&segment_path); + + if let Some(start) = segment_start { + let final_duration = timestamp.saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: self.current_index, + duration: final_duration, + file_size, + }); + } } else { - let _ = encoder.output.write_trailer(); - let _ = std::fs::remove_file(self.current_segment_path()); + if let Err(e) = encoder.output.write_trailer() { + tracing::trace!("Audio write_trailer on empty segment: {e}"); + } + if let Err(e) = std::fs::remove_file(&segment_path) { + tracing::trace!( + "Failed to remove empty audio segment {}: {e}", + segment_path.display() + ); + } } } @@ -278,6 +415,7 @@ impl SegmentedAudioEncoder { let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -291,6 +429,7 @@ impl SegmentedAudioEncoder { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: Some(total_duration.as_secs_f64()), @@ -298,10 +437,12 @@ impl SegmentedAudioEncoder { }; let manifest_path = self.base_path.join("manifest.json"); - let _ = std::fs::write( - manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ); + if let Err(e) = atomic_write_json(&manifest_path, &manifest) { + tracing::warn!( + "Failed to write final manifest to {}: {e}", + manifest_path.display() + ); + } } pub fn completed_segments(&self) -> &[SegmentInfo] { diff --git a/crates/enc-mediafoundation/examples/cli.rs b/crates/enc-mediafoundation/examples/cli.rs index 72f20574e2..f24f0a4fab 100644 --- a/crates/enc-mediafoundation/examples/cli.rs +++ b/crates/enc-mediafoundation/examples/cli.rs @@ -143,8 +143,7 @@ mod win { .run( should_stop_encoder, || Ok(frame_rx.recv().ok().flatten()), - |sample| { - dbg!(sample); + |_sample| { Ok(()) // sample_writer.write(stream_index, &output_sample).unwrap() }, diff --git a/crates/mediafoundation-ffmpeg/examples/usage.rs b/crates/mediafoundation-ffmpeg/examples/usage.rs index f37ad81572..f000fe7dee 100644 --- a/crates/mediafoundation-ffmpeg/examples/usage.rs +++ b/crates/mediafoundation-ffmpeg/examples/usage.rs @@ -9,63 +9,6 @@ mod win { use ffmpeg::format; use std::path::PathBuf; - /// Example of using H264StreamMuxer with an existing FFmpeg output context - /// This demonstrates how to integrate with MP4File or similar structures - fn example_with_shared_output() -> Result<(), Box> { - // Initialize FFmpeg - ffmpeg::init()?; - - // Create an output context (this would normally be owned by MP4File) - let output_path = PathBuf::from("output.mp4"); - let mut output = format::output(&output_path)?; - - // Configure the H264 muxer - let config = MuxerConfig { - width: 1920, - height: 1080, - fps: 30, - bitrate: 5_000_000, // 5 Mbps - }; - - // Add the H264 stream and create the muxer - // Note: We need to add the stream before writing the header - let mut h264_muxer = H264StreamMuxer::new(&mut output, config)?; - - // You might also have other streams (like audio) added to the same output - // ... add audio stream here if needed ... - - // Write the header after all streams are added - output.write_header()?; - - // Now you can write H264 samples from MediaFoundation - #[cfg(windows)] - { - // Example: Write samples from MediaFoundation - // let sample: IMFSample = get_sample_from_media_foundation(); - // h264_muxer.write_sample(&sample)?; - } - - // Or write raw H264 data - let example_h264_data = vec![0, 0, 0, 1, 0x65]; // Example keyframe NAL - h264_muxer.write_h264_data( - &example_h264_data, - 0, // pts in microseconds - 0, // dts in microseconds - 33333, // duration in microseconds (1/30 fps) - true, // is_keyframe - &mut output, - )?; - - // Finish the muxer (doesn't write trailer) - h264_muxer.finish()?; - - // Write the trailer (this would be done by MP4File::finish()) - output.write_trailer()?; - - Ok(()) - } - - /// Example showing how this would integrate with an MP4File-like structure struct MP4FileExample { output: format::context::Output, h264_muxer: Option, @@ -76,13 +19,8 @@ mod win { fn new(output_path: PathBuf, video_config: MuxerConfig) -> Result { let mut output = format::output(&output_path)?; - // Add H264 stream and create muxer let h264_muxer = H264StreamMuxer::new(&mut output, video_config)?; - // You could add audio streams here too - // ... - - // Write header after all streams are added output.write_header()?; Ok(Self { @@ -92,7 +30,7 @@ mod win { }) } - #[cfg(windows)] + #[allow(dead_code)] fn write_sample( &mut self, sample: &windows::Win32::Media::MediaFoundation::IMFSample, @@ -103,20 +41,6 @@ mod win { Ok(()) } - fn write_h264_data( - &mut self, - data: &[u8], - pts: i64, - dts: i64, - duration: i64, - is_keyframe: bool, - ) -> Result<(), ffmpeg::Error> { - if let Some(muxer) = &mut self.h264_muxer { - muxer.write_h264_data(data, pts, dts, duration, is_keyframe, &mut self.output)?; - } - Ok(()) - } - fn finish(&mut self) -> Result<(), ffmpeg::Error> { if self.is_finished { return Ok(()); @@ -127,71 +51,28 @@ mod win { muxer.finish()?; } - // Write trailer self.output.write_trailer()?; Ok(()) } } - /// Alternative approach using the owned version for standalone use - fn example_with_owned_muxer() -> Result<(), Box> { - use mediafoundation_ffmpeg::H264SampleMuxerOwned; - - // Initialize FFmpeg - ffmpeg::init()?; - - let config = MuxerConfig { - width: 1920, - height: 1080, - fps: 30, - bitrate: 5_000_000, - }; - - // Create a standalone muxer that owns its output - let mut muxer = - H264SampleMuxerOwned::new_mp4(PathBuf::from("standalone_output.mp4"), config)?; - - // Write some H264 data - let example_h264_data = vec![0, 0, 0, 1, 0x65]; // Example keyframe NAL - muxer.write_h264_data( - &example_h264_data, - 0, // pts - 0, // dts - 33333, // duration - true, // is_keyframe - )?; + pub fn main() { + ffmpeg::init().unwrap(); - // The muxer automatically finishes and writes trailer when dropped - muxer.finish()?; - - Ok(()) - } - - fn main() -> Result<(), Box> { - println!("Example 1: Using H264StreamMuxer with shared output"); - example_with_shared_output()?; - - println!("\nExample 2: Using H264SampleMuxerOwned for standalone use"); - example_with_owned_muxer()?; - - println!("\nExample 3: Using MP4FileExample with integrated muxer"); + println!("Creating MP4FileExample..."); let mut mp4_file = MP4FileExample::new( - PathBuf::from("integrated_output.mp4"), + PathBuf::from("example_output.mp4"), MuxerConfig { width: 1920, height: 1080, fps: 30, bitrate: 5_000_000, + ..Default::default() }, - )?; - - // Write some test data - let example_h264_data = vec![0, 0, 0, 1, 0x65]; - mp4_file.write_h264_data(&example_h264_data, 0, 0, 33333, true)?; - - // Finish writing - mp4_file.finish()?; + ) + .unwrap(); - Ok(()) + mp4_file.finish().unwrap(); + println!("Done!"); } } diff --git a/crates/recording/src/fragmentation/manifest.rs b/crates/recording/src/fragmentation/manifest.rs index 1a9cbedafc..70971d9eac 100644 --- a/crates/recording/src/fragmentation/manifest.rs +++ b/crates/recording/src/fragmentation/manifest.rs @@ -1,8 +1,12 @@ use serde::{Deserialize, Serialize}; use std::{path::PathBuf, time::Duration}; +pub const CURRENT_MANIFEST_VERSION: u32 = 2; + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FragmentManifest { + #[serde(default = "default_manifest_version")] + pub version: u32, pub fragments: Vec, #[serde( with = "duration_serde", @@ -13,6 +17,10 @@ pub struct FragmentManifest { pub is_complete: bool, } +fn default_manifest_version() -> u32 { + 1 +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct FragmentInfo { #[serde(with = "path_serde")] @@ -25,6 +33,8 @@ pub struct FragmentInfo { )] pub duration: Option, pub is_complete: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub file_size: Option, } impl FragmentManifest { diff --git a/crates/recording/src/fragmentation/mod.rs b/crates/recording/src/fragmentation/mod.rs index 0f40f0ecb6..b2cf2e51a7 100644 --- a/crates/recording/src/fragmentation/mod.rs +++ b/crates/recording/src/fragmentation/mod.rs @@ -1,7 +1,38 @@ mod manifest; pub use manifest::*; -use std::{path::PathBuf, time::Duration}; +use serde::Serialize; +use std::{ + io::Write, + path::{Path, PathBuf}, + time::Duration, +}; + +pub fn atomic_write_json(path: &Path, data: &T) -> std::io::Result<()> { + let temp_path = path.with_extension("json.tmp"); + let json = serde_json::to_string_pretty(data) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + + let mut file = std::fs::File::create(&temp_path)?; + file.write_all(json.as_bytes())?; + file.sync_all()?; + + std::fs::rename(&temp_path, path)?; + + if let Some(parent) = path.parent() + && let Ok(dir) = std::fs::File::open(parent) + { + let _ = dir.sync_all(); + } + + Ok(()) +} + +pub fn sync_file(path: &Path) { + if let Ok(file) = std::fs::File::open(path) { + let _ = file.sync_all(); + } +} pub struct FragmentManager { base_path: PathBuf, @@ -34,12 +65,18 @@ impl FragmentManager { .join(format!("fragment_{:03}.m4a", self.current_index)) } - pub fn rotate(&mut self, duration: Option, is_complete: bool) -> PathBuf { + pub fn rotate( + &mut self, + duration: Option, + is_complete: bool, + file_size: Option, + ) -> PathBuf { self.fragments.push(FragmentInfo { path: self.current_fragment_path(), index: self.current_index, duration, is_complete, + file_size, }); self.current_index += 1; @@ -58,39 +95,38 @@ impl FragmentManager { &self.fragments } - pub fn mark_current_complete(&mut self, duration: Option) { + pub fn mark_current_complete(&mut self, duration: Option, file_size: Option) { self.fragments.push(FragmentInfo { path: self.current_fragment_path(), index: self.current_index, duration, is_complete: true, + file_size, }); } pub fn write_manifest(&self) -> std::io::Result<()> { let manifest = FragmentManifest { + version: CURRENT_MANIFEST_VERSION, fragments: self.fragments.clone(), total_duration: self.total_duration(), is_complete: false, }; let manifest_path = self.base_path.join("manifest.json"); - let json = serde_json::to_string_pretty(&manifest)?; - std::fs::write(manifest_path, json)?; - Ok(()) + atomic_write_json(&manifest_path, &manifest) } pub fn finalize_manifest(&self) -> std::io::Result<()> { let manifest = FragmentManifest { + version: CURRENT_MANIFEST_VERSION, fragments: self.fragments.clone(), total_duration: self.total_duration(), is_complete: true, }; let manifest_path = self.base_path.join("manifest.json"); - let json = serde_json::to_string_pretty(&manifest)?; - std::fs::write(manifest_path, json)?; - Ok(()) + atomic_write_json(&manifest_path, &manifest) } fn total_duration(&self) -> Option { diff --git a/crates/recording/src/output_pipeline/win_segmented.rs b/crates/recording/src/output_pipeline/win_segmented.rs index 7a134ebfea..bdb8393041 100644 --- a/crates/recording/src/output_pipeline/win_segmented.rs +++ b/crates/recording/src/output_pipeline/win_segmented.rs @@ -1,4 +1,4 @@ -use crate::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer, screen_capture}; +use crate::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer, fragmentation, screen_capture}; use anyhow::{Context, anyhow}; use cap_media_info::{AudioInfo, VideoInfo}; use serde::Serialize; @@ -24,6 +24,7 @@ pub struct SegmentInfo { pub path: PathBuf, pub index: u32, pub duration: Duration, + pub file_size: Option, } #[derive(Serialize)] @@ -32,10 +33,15 @@ struct FragmentEntry { index: u32, duration: f64, is_complete: bool, + #[serde(skip_serializing_if = "Option::is_none")] + file_size: Option, } +const MANIFEST_VERSION: u32 = 2; + #[derive(Serialize)] struct Manifest { + version: u32, fragments: Vec, #[serde(skip_serializing_if = "Option::is_none")] total_duration: Option, @@ -54,6 +60,39 @@ struct PauseTracker { offset: Duration, } +struct FrameDropTracker { + count: u32, + last_warning: std::time::Instant, +} + +impl FrameDropTracker { + fn new() -> Self { + Self { + count: 0, + last_warning: std::time::Instant::now(), + } + } + + fn record_drop(&mut self) { + self.count += 1; + if self.count >= 30 && self.last_warning.elapsed() > Duration::from_secs(5) { + warn!( + "Dropped {} screen frames due to encoder backpressure", + self.count + ); + self.count = 0; + self.last_warning = std::time::Instant::now(); + } + } + + fn reset(&mut self) { + if self.count > 0 { + trace!("Frame drop count at segment boundary: {}", self.count); + } + self.count = 0; + } +} + impl PauseTracker { fn new(flag: Arc) -> Self { Self { @@ -115,6 +154,7 @@ pub struct WindowsSegmentedMuxer { encoder_preferences: crate::capture_pipeline::EncoderPreferences, pause: PauseTracker, + frame_drops: FrameDropTracker, } pub struct WindowsSegmentedMuxerConfig { @@ -162,35 +202,38 @@ impl Muxer for WindowsSegmentedMuxer { output_size: config.output_size, encoder_preferences: config.encoder_preferences, pause: PauseTracker::new(pause_flag), + frame_drops: FrameDropTracker::new(), }) } fn stop(&mut self) { - if let Some(state) = &self.current_state { - let _ = state.video_tx.send(None); + if let Some(state) = &self.current_state + && let Err(e) = state.video_tx.send(None) + { + trace!("Screen encoder channel already closed during stop: {e}"); } } fn finish(&mut self, timestamp: Duration) -> anyhow::Result> { - if let Some(segment_start) = self.segment_start_time { - let final_duration = timestamp.saturating_sub(segment_start); - - self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), - index: self.current_index, - duration: final_duration, - }); - } + let segment_path = self.current_segment_path(); + let segment_start = self.segment_start_time; if let Some(mut state) = self.current_state.take() { - let _ = state.video_tx.send(None); + if let Err(e) = state.video_tx.send(None) { + trace!("Screen encoder channel already closed during finish: {e}"); + } if let Some(handle) = state.encoder_handle.take() { let timeout = Duration::from_secs(5); let start = std::time::Instant::now(); loop { if handle.is_finished() { - let _ = handle.join(); + if let Err(panic_payload) = handle.join() { + warn!( + "Screen encoder thread panicked during finish: {:?}", + panic_payload + ); + } break; } if start.elapsed() > timeout { @@ -209,6 +252,20 @@ impl Muxer for WindowsSegmentedMuxer { .lock() .map_err(|_| anyhow!("Failed to lock output"))?; output.write_trailer()?; + + fragmentation::sync_file(&segment_path); + + if let Some(start) = segment_start { + let final_duration = timestamp.saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: self.current_index, + duration: final_duration, + file_size, + }); + } } self.finalize_manifest(); @@ -225,6 +282,7 @@ impl WindowsSegmentedMuxer { fn write_manifest(&self) { let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -238,6 +296,7 @@ impl WindowsSegmentedMuxer { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: None, @@ -245,16 +304,19 @@ impl WindowsSegmentedMuxer { }; let manifest_path = self.base_path.join("manifest.json"); - let _ = std::fs::write( - manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write manifest to {}: {e}", + manifest_path.display() + ); + } } fn finalize_manifest(&self) { let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -268,6 +330,7 @@ impl WindowsSegmentedMuxer { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: Some(total_duration.as_secs_f64()), @@ -275,10 +338,12 @@ impl WindowsSegmentedMuxer { }; let manifest_path = self.base_path.join("manifest.json"); - let _ = std::fs::write( - manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write final manifest to {}: {e}", + manifest_path.display() + ); + } } fn create_segment(&mut self) -> anyhow::Result<()> { @@ -500,16 +565,24 @@ impl WindowsSegmentedMuxer { fn rotate_segment(&mut self, timestamp: Duration) -> anyhow::Result<()> { let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); let segment_duration = timestamp.saturating_sub(segment_start); + let completed_segment_path = self.current_segment_path(); if let Some(mut state) = self.current_state.take() { - let _ = state.video_tx.send(None); + if let Err(e) = state.video_tx.send(None) { + trace!("Screen encoder channel already closed during rotation: {e}"); + } if let Some(handle) = state.encoder_handle.take() { let timeout = Duration::from_secs(5); let start = std::time::Instant::now(); loop { if handle.is_finished() { - let _ = handle.join(); + if let Err(panic_payload) = handle.join() { + warn!( + "Screen encoder thread panicked during rotation: {:?}", + panic_payload + ); + } break; } if start.elapsed() > timeout { @@ -529,18 +602,28 @@ impl WindowsSegmentedMuxer { .map_err(|_| anyhow!("Failed to lock output"))?; output.write_trailer()?; + fragmentation::sync_file(&completed_segment_path); + + let file_size = std::fs::metadata(&completed_segment_path) + .ok() + .map(|m| m.len()); + self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), + path: completed_segment_path, index: self.current_index, duration: segment_duration, + file_size, }); + + self.write_manifest(); } + self.frame_drops.reset(); self.current_index += 1; self.segment_start_time = Some(timestamp); self.create_segment()?; - self.write_manifest(); + self.write_in_progress_manifest(); info!( "Rotated to segment {} at {:?}", @@ -549,6 +632,53 @@ impl WindowsSegmentedMuxer { Ok(()) } + + fn write_in_progress_manifest(&self) { + let mut fragments: Vec = self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(); + + fragments.push(FragmentEntry { + path: self + .current_segment_path() + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: self.current_index, + duration: 0.0, + is_complete: false, + file_size: None, + }); + + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments, + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write in-progress manifest to {}: {e}", + manifest_path.display() + ); + } + } } impl VideoMuxer for WindowsSegmentedMuxer { @@ -566,6 +696,7 @@ impl VideoMuxer for WindowsSegmentedMuxer { if self.current_state.is_none() { self.segment_start_time = Some(adjusted_timestamp); self.create_segment()?; + self.write_in_progress_manifest(); } if self.segment_start_time.is_none() { @@ -586,7 +717,7 @@ impl VideoMuxer for WindowsSegmentedMuxer { { match e { std::sync::mpsc::TrySendError::Full(_) => { - trace!("Screen encoder channel full, dropping frame"); + self.frame_drops.record_drop(); } std::sync::mpsc::TrySendError::Disconnected(_) => { trace!("Screen encoder channel disconnected"); diff --git a/crates/recording/src/output_pipeline/win_segmented_camera.rs b/crates/recording/src/output_pipeline/win_segmented_camera.rs index 97a4e5b16c..67878cf3a2 100644 --- a/crates/recording/src/output_pipeline/win_segmented_camera.rs +++ b/crates/recording/src/output_pipeline/win_segmented_camera.rs @@ -1,5 +1,5 @@ use crate::output_pipeline::win::{NativeCameraFrame, upload_mf_buffer_to_texture}; -use crate::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer}; +use crate::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer, fragmentation}; use anyhow::{Context, anyhow}; use cap_media_info::{AudioInfo, VideoInfo}; use serde::Serialize; @@ -21,6 +21,7 @@ struct SegmentInfo { path: PathBuf, index: u32, duration: Duration, + file_size: Option, } #[derive(Serialize)] @@ -29,10 +30,15 @@ struct FragmentEntry { index: u32, duration: f64, is_complete: bool, + #[serde(skip_serializing_if = "Option::is_none")] + file_size: Option, } +const MANIFEST_VERSION: u32 = 2; + #[derive(Serialize)] struct Manifest { + version: u32, fragments: Vec, #[serde(skip_serializing_if = "Option::is_none")] total_duration: Option, @@ -51,6 +57,42 @@ struct PauseTracker { offset: Duration, } +struct FrameDropTracker { + count: u32, + last_warning: std::time::Instant, +} + +impl FrameDropTracker { + fn new() -> Self { + Self { + count: 0, + last_warning: std::time::Instant::now(), + } + } + + fn record_drop(&mut self) { + self.count += 1; + if self.count >= 30 && self.last_warning.elapsed() > Duration::from_secs(5) { + warn!( + "Dropped {} camera frames due to encoder backpressure", + self.count + ); + self.count = 0; + self.last_warning = std::time::Instant::now(); + } + } + + fn reset(&mut self) { + if self.count > 0 { + trace!( + "Camera frame drop count at segment boundary: {}", + self.count + ); + } + self.count = 0; + } +} + impl PauseTracker { fn new(flag: Arc) -> Self { Self { @@ -107,6 +149,7 @@ pub struct WindowsSegmentedCameraMuxer { output_height: Option, pause: PauseTracker, + frame_drops: FrameDropTracker, } pub struct WindowsSegmentedCameraMuxerConfig { @@ -153,35 +196,38 @@ impl Muxer for WindowsSegmentedCameraMuxer { video_config, output_height: config.output_height, pause: PauseTracker::new(pause_flag), + frame_drops: FrameDropTracker::new(), }) } fn stop(&mut self) { - if let Some(state) = &self.current_state { - let _ = state.video_tx.send(None); + if let Some(state) = &self.current_state + && let Err(e) = state.video_tx.send(None) + { + trace!("Camera encoder channel already closed during stop: {e}"); } } fn finish(&mut self, timestamp: Duration) -> anyhow::Result> { - if let Some(segment_start) = self.segment_start_time { - let final_duration = timestamp.saturating_sub(segment_start); - - self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), - index: self.current_index, - duration: final_duration, - }); - } + let segment_path = self.current_segment_path(); + let segment_start = self.segment_start_time; if let Some(mut state) = self.current_state.take() { - let _ = state.video_tx.send(None); + if let Err(e) = state.video_tx.send(None) { + trace!("Camera encoder channel already closed during finish: {e}"); + } if let Some(handle) = state.encoder_handle.take() { let timeout = Duration::from_secs(5); let start = std::time::Instant::now(); loop { if handle.is_finished() { - let _ = handle.join(); + if let Err(panic_payload) = handle.join() { + warn!( + "Camera encoder thread panicked during finish: {:?}", + panic_payload + ); + } break; } if start.elapsed() > timeout { @@ -200,6 +246,20 @@ impl Muxer for WindowsSegmentedCameraMuxer { .lock() .map_err(|_| anyhow!("Failed to lock output"))?; output.write_trailer()?; + + fragmentation::sync_file(&segment_path); + + if let Some(start) = segment_start { + let final_duration = timestamp.saturating_sub(start); + let file_size = std::fs::metadata(&segment_path).ok().map(|m| m.len()); + + self.completed_segments.push(SegmentInfo { + path: segment_path, + index: self.current_index, + duration: final_duration, + file_size, + }); + } } self.finalize_manifest(); @@ -216,6 +276,7 @@ impl WindowsSegmentedCameraMuxer { fn write_manifest(&self) { let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -229,6 +290,7 @@ impl WindowsSegmentedCameraMuxer { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: None, @@ -236,10 +298,7 @@ impl WindowsSegmentedCameraMuxer { }; let manifest_path = self.base_path.join("manifest.json"); - if let Err(e) = std::fs::write( - &manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ) { + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { warn!( "Failed to write manifest to {}: {e}", manifest_path.display() @@ -251,6 +310,7 @@ impl WindowsSegmentedCameraMuxer { let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); let manifest = Manifest { + version: MANIFEST_VERSION, fragments: self .completed_segments .iter() @@ -264,6 +324,7 @@ impl WindowsSegmentedCameraMuxer { index: s.index, duration: s.duration.as_secs_f64(), is_complete: true, + file_size: s.file_size, }) .collect(), total_duration: Some(total_duration.as_secs_f64()), @@ -271,10 +332,7 @@ impl WindowsSegmentedCameraMuxer { }; let manifest_path = self.base_path.join("manifest.json"); - if let Err(e) = std::fs::write( - &manifest_path, - serde_json::to_string_pretty(&manifest).unwrap_or_default(), - ) { + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { warn!( "Failed to write final manifest to {}: {e}", manifest_path.display() @@ -449,9 +507,12 @@ impl WindowsSegmentedCameraMuxer { ) -> anyhow::Result<()> { let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); let segment_duration = timestamp.saturating_sub(segment_start); + let completed_segment_path = self.current_segment_path(); if let Some(mut state) = self.current_state.take() { - let _ = state.video_tx.send(None); + if let Err(e) = state.video_tx.send(None) { + trace!("Camera encoder channel already closed during rotation: {e}"); + } if let Some(handle) = state.encoder_handle.take() { let timeout = Duration::from_secs(5); @@ -483,18 +544,28 @@ impl WindowsSegmentedCameraMuxer { .map_err(|_| anyhow!("Failed to lock output"))?; output.write_trailer()?; + fragmentation::sync_file(&completed_segment_path); + + let file_size = std::fs::metadata(&completed_segment_path) + .ok() + .map(|m| m.len()); + self.completed_segments.push(SegmentInfo { - path: self.current_segment_path(), + path: completed_segment_path, index: self.current_index, duration: segment_duration, + file_size, }); + + self.write_manifest(); } + self.frame_drops.reset(); self.current_index += 1; self.segment_start_time = Some(timestamp); self.create_segment(next_frame)?; - self.write_manifest(); + self.write_in_progress_manifest(); info!( "Camera rotated to segment {} at {:?}", @@ -503,6 +574,53 @@ impl WindowsSegmentedCameraMuxer { Ok(()) } + + fn write_in_progress_manifest(&self) { + let mut fragments: Vec = self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + file_size: s.file_size, + }) + .collect(); + + fragments.push(FragmentEntry { + path: self + .current_segment_path() + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: self.current_index, + duration: 0.0, + is_complete: false, + file_size: None, + }); + + let manifest = Manifest { + version: MANIFEST_VERSION, + fragments, + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + if let Err(e) = fragmentation::atomic_write_json(&manifest_path, &manifest) { + warn!( + "Failed to write in-progress manifest to {}: {e}", + manifest_path.display() + ); + } + } } impl VideoMuxer for WindowsSegmentedCameraMuxer { @@ -520,6 +638,7 @@ impl VideoMuxer for WindowsSegmentedCameraMuxer { if self.current_state.is_none() { self.segment_start_time = Some(adjusted_timestamp); self.create_segment(&frame)?; + self.write_in_progress_manifest(); } if self.segment_start_time.is_none() { @@ -538,7 +657,7 @@ impl VideoMuxer for WindowsSegmentedCameraMuxer { { match e { std::sync::mpsc::TrySendError::Full(_) => { - trace!("Camera encoder channel full, dropping frame"); + self.frame_drops.record_drop(); } std::sync::mpsc::TrySendError::Disconnected(_) => { trace!("Camera encoder channel disconnected"); diff --git a/crates/recording/src/recovery.rs b/crates/recording/src/recovery.rs index da5560e4c5..4490ac1c53 100644 --- a/crates/recording/src/recovery.rs +++ b/crates/recording/src/recovery.rs @@ -201,6 +201,8 @@ impl RecoveryManager { } fn find_complete_fragments(dir: &Path) -> Vec { + use crate::fragmentation::CURRENT_MANIFEST_VERSION; + let manifest_path = dir.join("manifest.json"); if manifest_path.exists() @@ -208,6 +210,21 @@ impl RecoveryManager { && let Ok(manifest) = serde_json::from_str::(&content) && let Some(fragments) = manifest.get("fragments").and_then(|f| f.as_array()) { + let manifest_version = manifest + .get("version") + .and_then(|v| v.as_u64()) + .unwrap_or(1) as u32; + if manifest_version > CURRENT_MANIFEST_VERSION { + warn!( + "Manifest version {} is newer than supported {}", + manifest_version, CURRENT_MANIFEST_VERSION + ); + } + + let expected_file_size = |f: &serde_json::Value| -> Option { + f.get("file_size").and_then(|s| s.as_u64()) + }; + let result: Vec = fragments .iter() .filter(|f| { @@ -215,9 +232,45 @@ impl RecoveryManager { .and_then(|c| c.as_bool()) .unwrap_or(false) }) - .filter_map(|f| f.get("path").and_then(|p| p.as_str())) - .map(|p| dir.join(p)) - .filter(|p| p.exists()) + .filter_map(|f| { + let path_str = f.get("path").and_then(|p| p.as_str())?; + let path = dir.join(path_str); + if !path.exists() { + return None; + } + + if let Some(expected_size) = expected_file_size(f) + && let Ok(metadata) = std::fs::metadata(&path) + && metadata.len() != expected_size + { + warn!( + "Fragment {} size mismatch: expected {}, got {}", + path.display(), + expected_size, + metadata.len() + ); + return None; + } + + if Self::is_video_file(&path) { + match probe_video_can_decode(&path) { + Ok(true) => Some(path), + Ok(false) => { + warn!("Fragment {} has no decodable frames", path.display()); + None + } + Err(e) => { + warn!("Fragment {} validation failed: {}", path.display(), e); + None + } + } + } else if probe_media_valid(&path) { + Some(path) + } else { + warn!("Fragment {} is not valid media", path.display()); + None + } + }) .collect(); if !result.is_empty() { @@ -228,6 +281,12 @@ impl RecoveryManager { Self::probe_fragments_in_dir(dir) } + fn is_video_file(path: &Path) -> bool { + path.extension() + .map(|e| e.eq_ignore_ascii_case("mp4")) + .unwrap_or(false) + } + fn probe_fragments_in_dir(dir: &Path) -> Vec { let Ok(entries) = std::fs::read_dir(dir) else { return Vec::new(); @@ -237,11 +296,26 @@ impl RecoveryManager { .filter_map(|e| e.ok()) .map(|e| e.path()) .filter(|p| { - p.extension() - .map(|e| e == "mp4" || e == "m4a" || e == "ogg") - .unwrap_or(false) + let ext = p + .extension() + .and_then(|e| e.to_str()) + .map(|e| e.to_lowercase()); + match ext.as_deref() { + Some("mp4") => match probe_video_can_decode(p) { + Ok(true) => true, + Ok(false) => { + debug!("Skipping {} - no decodable frames", p.display()); + false + } + Err(e) => { + debug!("Skipping {} - validation failed: {}", p.display(), e); + false + } + }, + Some("m4a") | Some("ogg") => probe_media_valid(p), + _ => false, + } }) - .filter(|p| probe_media_valid(p)) .collect(); fragments.sort(); @@ -249,7 +323,23 @@ impl RecoveryManager { } fn probe_single_file(path: &Path) -> Option { - if path.exists() && probe_media_valid(path) { + if !path.exists() { + return None; + } + + if Self::is_video_file(path) { + match probe_video_can_decode(path) { + Ok(true) => Some(path.to_path_buf()), + Ok(false) => { + debug!("Single file {} has no decodable frames", path.display()); + None + } + Err(e) => { + debug!("Single file {} validation failed: {}", path.display(), e); + None + } + } + } else if probe_media_valid(path) { Some(path.to_path_buf()) } else { None @@ -314,8 +404,10 @@ impl RecoveryManager { info!("Moving single display fragment to {:?}", display_output); std::fs::rename(source, &display_output)?; let display_dir = segment_dir.join("display"); - if display_dir.exists() { - let _ = std::fs::remove_dir_all(&display_dir); + if display_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&display_dir) + { + debug!("Failed to clean up display dir {:?}: {e}", display_dir); } } } else if segment.display_fragments.len() > 1 { @@ -328,11 +420,15 @@ impl RecoveryManager { .map_err(RecoveryError::VideoConcat)?; for fragment in &segment.display_fragments { - let _ = std::fs::remove_file(fragment); + if let Err(e) = std::fs::remove_file(fragment) { + debug!("Failed to remove display fragment {:?}: {e}", fragment); + } } let display_dir = segment_dir.join("display"); - if display_dir.exists() { - let _ = std::fs::remove_dir_all(&display_dir); + if display_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&display_dir) + { + debug!("Failed to clean up display dir {:?}: {e}", display_dir); } } @@ -344,8 +440,10 @@ impl RecoveryManager { info!("Moving single camera fragment to {:?}", camera_output); std::fs::rename(source, &camera_output)?; let camera_dir = segment_dir.join("camera"); - if camera_dir.exists() { - let _ = std::fs::remove_dir_all(&camera_dir); + if camera_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&camera_dir) + { + debug!("Failed to clean up camera dir {:?}: {e}", camera_dir); } } } else if camera_frags.len() > 1 { @@ -358,11 +456,15 @@ impl RecoveryManager { .map_err(RecoveryError::VideoConcat)?; for fragment in camera_frags { - let _ = std::fs::remove_file(fragment); + if let Err(e) = std::fs::remove_file(fragment) { + debug!("Failed to remove camera fragment {:?}: {e}", fragment); + } } let camera_dir = segment_dir.join("camera"); - if camera_dir.exists() { - let _ = std::fs::remove_dir_all(&camera_dir); + if camera_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&camera_dir) + { + debug!("Failed to clean up camera dir {:?}: {e}", camera_dir); } } } @@ -380,11 +482,15 @@ impl RecoveryManager { info!("Transcoding single mic fragment to {:?}", mic_output); concatenate_audio_to_ogg(mic_frags, &mic_output) .map_err(RecoveryError::AudioConcat)?; - let _ = std::fs::remove_file(source); + if let Err(e) = std::fs::remove_file(source) { + debug!("Failed to remove mic source {:?}: {e}", source); + } } let mic_dir = segment_dir.join("audio-input"); - if mic_dir.exists() { - let _ = std::fs::remove_dir_all(&mic_dir); + if mic_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&mic_dir) + { + debug!("Failed to clean up mic dir {:?}: {e}", mic_dir); } } } else if mic_frags.len() > 1 { @@ -397,11 +503,15 @@ impl RecoveryManager { .map_err(RecoveryError::AudioConcat)?; for fragment in mic_frags { - let _ = std::fs::remove_file(fragment); + if let Err(e) = std::fs::remove_file(fragment) { + debug!("Failed to remove mic fragment {:?}: {e}", fragment); + } } let mic_dir = segment_dir.join("audio-input"); - if mic_dir.exists() { - let _ = std::fs::remove_dir_all(&mic_dir); + if mic_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&mic_dir) + { + debug!("Failed to clean up mic dir {:?}: {e}", mic_dir); } } } @@ -422,11 +532,15 @@ impl RecoveryManager { ); concatenate_audio_to_ogg(system_frags, &system_output) .map_err(RecoveryError::AudioConcat)?; - let _ = std::fs::remove_file(source); + if let Err(e) = std::fs::remove_file(source) { + debug!("Failed to remove system audio source {:?}: {e}", source); + } } let system_dir = segment_dir.join("system_audio"); - if system_dir.exists() { - let _ = std::fs::remove_dir_all(&system_dir); + if system_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&system_dir) + { + debug!("Failed to clean up system audio dir {:?}: {e}", system_dir); } } } else if system_frags.len() > 1 { @@ -439,11 +553,15 @@ impl RecoveryManager { .map_err(RecoveryError::AudioConcat)?; for fragment in system_frags { - let _ = std::fs::remove_file(fragment); + if let Err(e) = std::fs::remove_file(fragment) { + debug!("Failed to remove system audio fragment {:?}: {e}", fragment); + } } let system_dir = segment_dir.join("system_audio"); - if system_dir.exists() { - let _ = std::fs::remove_dir_all(&system_dir); + if system_dir.exists() + && let Err(e) = std::fs::remove_dir_all(&system_dir) + { + debug!("Failed to clean up system audio dir {:?}: {e}", system_dir); } } } @@ -487,14 +605,24 @@ impl RecoveryManager { "Camera video has no decodable frames, removing: {:?}", camera_output ); - let _ = std::fs::remove_file(&camera_output); + if let Err(e) = std::fs::remove_file(&camera_output) { + debug!( + "Failed to remove invalid camera video {:?}: {e}", + camera_output + ); + } } Err(e) => { warn!( "Camera video validation failed for {:?}: {}, removing", camera_output, e ); - let _ = std::fs::remove_file(&camera_output); + if let Err(remove_err) = std::fs::remove_file(&camera_output) { + debug!( + "Failed to remove invalid camera video {:?}: {remove_err}", + camera_output + ); + } } } } diff --git a/crates/recording/src/studio_recording.rs b/crates/recording/src/studio_recording.rs index 443da9cd29..0a5a437ab4 100644 --- a/crates/recording/src/studio_recording.rs +++ b/crates/recording/src/studio_recording.rs @@ -228,7 +228,9 @@ impl Message for Actor { async fn handle(&mut self, _: Cancel, _: &mut Context) -> Self::Reply { if let Some(ActorState::Recording { pipeline, .. }) = self.state.take() { - let _ = pipeline.stop().await; + if let Err(e) = pipeline.stop().await { + warn!("Pipeline stop error during cancel: {e:#}"); + } self.notify_completion_ok(); } diff --git a/crates/scap-ffmpeg/examples/cli.rs b/crates/scap-ffmpeg/examples/cli.rs index 6b329ee638..053bccc136 100644 --- a/crates/scap-ffmpeg/examples/cli.rs +++ b/crates/scap-ffmpeg/examples/cli.rs @@ -3,33 +3,43 @@ pub async fn main() { #[cfg(windows)] { use scap_direct3d::*; + use scap_targets::Display; + use std::time::Duration; - let display = Display::primary().unwrap(); + let display = Display::primary(); - let capturer = Capturer::new( - display.try_as_capture_item().unwrap(), + let mut capturer = Capturer::new( + display.raw_handle().try_as_capture_item().unwrap(), Settings { is_border_required: Some(true), is_cursor_capture_enabled: Some(true), pixel_format: PixelFormat::R8G8B8A8Unorm, + ..Default::default() }, - ); - - let capture_handle = capturer - .start(|frame| { + |frame| { use scap_ffmpeg::AsFFmpeg; let ff_frame = frame.as_ffmpeg()?; - dbg!(ff_frame.width(), ff_frame.height(), ff_frame.format()); + println!( + "Frame: {}x{} format={:?}", + ff_frame.width(), + ff_frame.height(), + ff_frame.format() + ); Ok(()) - }) - .unwrap(); + }, + || Ok(()), + None, + ) + .unwrap(); + + capturer.start().unwrap(); std::thread::sleep(Duration::from_secs(3)); - capture_handle.stop().unwrap(); + capturer.stop().unwrap(); std::thread::sleep(Duration::from_secs(3)); }