diff --git a/.claude/settings.local.json b/.claude/settings.local.json index b698e146d4..7f820b11da 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -7,7 +7,30 @@ "Bash(cargo check:*)", "Bash(cargo fmt:*)", "Bash(pnpm format:*)", - "Bash(pnpm exec biome check:*)" + "Bash(pnpm exec biome check:*)", + "Bash(grep:*)", + "Bash(cargo metadata:*)", + "Bash(ffprobe:*)", + "Bash(ls:*)", + "Bash(find:*)", + "Bash(cat:*)", + "WebFetch(domain:raw.githubusercontent.com)", + "WebFetch(domain:api.github.com)", + "Bash(cargo doc:*)", + "Bash(cargo clippy:*)", + "Bash(python3:*)", + "Bash(cargo run:*)", + "WebSearch", + "Bash(xargs ls:*)", + "WebFetch(domain:ffmpeg.org)", + "Bash(git log:*)", + "Bash(tree:*)", + "Bash(tail:*)", + "Bash(pnpm typecheck:desktop:*)", + "Bash(pnpm exec tsc:*)", + "Bash(pnpm biome check:*)", + "Bash(pnpm --dir apps/desktop exec tsc:*)", + "Bash(xxd:*)" ], "deny": [], "ask": [] diff --git a/Cargo.lock b/Cargo.lock index 41f63e0cf7..8deccdc263 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1307,6 +1307,8 @@ dependencies = [ "cap-media-info", "cidre", "ffmpeg-next", + "serde", + "serde_json", "thiserror 1.0.69", "tracing", "workspace-hack", @@ -1318,6 +1320,8 @@ version = "0.1.0" dependencies = [ "cap-media-info", "ffmpeg-next", + "serde", + "serde_json", "thiserror 1.0.69", "tracing", "workspace-hack", diff --git a/Cargo.toml b/Cargo.toml index 7eae47f5d6..e0350317cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ specta = { version = "=2.0.0-rc.20", features = [ "chrono" ] } serde = { version = "1", features = ["derive"] } +serde_json = "1" nokhwa = { git = "https://github.com/CapSoftware/nokhwa", rev = "b9c8079e82e2", features = [ "input-native", diff --git a/apps/desktop/src-tauri/src/captions.rs b/apps/desktop/src-tauri/src/captions.rs index 80a135e5b8..0dde354181 100644 --- a/apps/desktop/src-tauri/src/captions.rs +++ b/apps/desktop/src-tauri/src/captions.rs @@ -1010,7 +1010,7 @@ fn start_whisperx_server( std::thread::spawn(move || { use std::io::BufRead; let reader = std::io::BufReader::new(stderr); - for line in reader.lines().flatten() { + for line in reader.lines().map_while(Result::ok) { if let Some(stripped) = line.strip_prefix("STDERR:") { log::info!("[WhisperX] {}", stripped); } else { diff --git a/apps/desktop/src-tauri/src/general_settings.rs b/apps/desktop/src-tauri/src/general_settings.rs index 1d3fc7fa61..9b215c7b58 100644 --- a/apps/desktop/src-tauri/src/general_settings.rs +++ b/apps/desktop/src-tauri/src/general_settings.rs @@ -124,6 +124,8 @@ pub struct GeneralSettingsStore { pub instant_mode_max_resolution: u32, #[serde(default)] pub default_project_name_template: Option, + #[serde(default)] + pub crash_recovery_recording: bool, } fn default_enable_native_camera_preview() -> bool { @@ -190,6 +192,7 @@ impl Default for GeneralSettingsStore { delete_instant_recordings_after_upload: false, instant_mode_max_resolution: 1920, default_project_name_template: None, + crash_recovery_recording: false, } } } diff --git a/apps/desktop/src-tauri/src/lib.rs b/apps/desktop/src-tauri/src/lib.rs index 49f0253ed5..3bab3baefa 100644 --- a/apps/desktop/src-tauri/src/lib.rs +++ b/apps/desktop/src-tauri/src/lib.rs @@ -24,6 +24,7 @@ mod posthog; mod presets; mod recording; mod recording_settings; +mod recovery; mod screenshot_editor; mod target_select_overlay; mod thumbnails; @@ -2164,7 +2165,7 @@ async fn editor_delete_project( #[specta::specta] #[instrument(skip(app))] async fn show_window(app: AppHandle, window: ShowCapWindow) -> Result<(), String> { - let _ = window.show(&app).await; + window.show(&app).await.map_err(|e| e.to_string())?; Ok(()) } @@ -2403,6 +2404,9 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { target_select_overlay::focus_window, editor_delete_project, format_project_name, + recovery::find_incomplete_recordings, + recovery::recover_recording, + recovery::discard_incomplete_recording, ]) .events(tauri_specta::collect_events![ RecordingOptionsChanged, @@ -2803,8 +2807,8 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { tokio::spawn(EditorInstances::remove(window.clone())); #[cfg(target_os = "windows")] - if CapWindowId::Settings.get(&app).is_none() { - reopen_main_window(&app); + if CapWindowId::Settings.get(app).is_none() { + reopen_main_window(app); } } CapWindowId::ScreenshotEditor { id } => { @@ -2815,8 +2819,8 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { tokio::spawn(ScreenshotEditorInstances::remove(window.clone())); #[cfg(target_os = "windows")] - if CapWindowId::Settings.get(&app).is_none() { - reopen_main_window(&app); + if CapWindowId::Settings.get(app).is_none() { + reopen_main_window(app); } } CapWindowId::Settings => { @@ -2834,8 +2838,8 @@ pub async fn run(recording_logging_handle: LoggingHandle, logs_dir: PathBuf) { } #[cfg(target_os = "windows")] - if !has_open_editor_window(&app) { - reopen_main_window(&app); + if !has_open_editor_window(app) { + reopen_main_window(app); } return; @@ -3122,6 +3126,11 @@ async fn create_editor_instance_impl( RenderFrameEvent::listen_any(&app, { let preview_tx = instance.preview_tx.clone(); move |e| { + tracing::debug!( + frame = e.payload.frame_number, + fps = e.payload.fps, + "RenderFrameEvent received" + ); preview_tx.send_modify(|v| { *v = Some(( e.payload.frame_number, diff --git a/apps/desktop/src-tauri/src/platform/macos/delegates.rs b/apps/desktop/src-tauri/src/platform/macos/delegates.rs index 7c9c97f020..d8933c3b84 100644 --- a/apps/desktop/src-tauri/src/platform/macos/delegates.rs +++ b/apps/desktop/src-tauri/src/platform/macos/delegates.rs @@ -75,7 +75,10 @@ pub fn setup(window: Window, controls_inset: LogicalPosition use objc::runtime::{Object, Sel}; use std::ffi::c_void; - let ns_win = window.ns_window().expect("Failed to create window handle"); + let Ok(ns_win) = window.ns_window() else { + tracing::warn!("Failed to get window handle for delegate setup"); + return; + }; // Do the initial positioning position_window_controls(UnsafeWindowHandle(ns_win), &controls_inset); diff --git a/apps/desktop/src-tauri/src/platform/macos/mod.rs b/apps/desktop/src-tauri/src/platform/macos/mod.rs index f6d5d47010..14a3158583 100644 --- a/apps/desktop/src-tauri/src/platform/macos/mod.rs +++ b/apps/desktop/src-tauri/src/platform/macos/mod.rs @@ -18,10 +18,10 @@ pub use sc_shareable_content::*; pub fn set_window_level(window: tauri::Window, level: objc2_app_kit::NSWindowLevel) { let c_window = window.clone(); _ = window.run_on_main_thread(move || unsafe { - let ns_win = c_window - .ns_window() - .expect("Failed to get native window handle") - as *const objc2_app_kit::NSWindow; + let Ok(ns_win) = c_window.ns_window() else { + return; + }; + let ns_win = ns_win as *const objc2_app_kit::NSWindow; (*ns_win).setLevel(level); }); } diff --git a/apps/desktop/src-tauri/src/recording.rs b/apps/desktop/src-tauri/src/recording.rs index f0378595bc..a964cf3e4d 100644 --- a/apps/desktop/src-tauri/src/recording.rs +++ b/apps/desktop/src-tauri/src/recording.rs @@ -15,6 +15,7 @@ use cap_recording::{ RecordingMode, feeds::{camera, microphone}, instant_recording, + recovery::RecoveryManager, sources::MicrophoneSourceError, sources::{ screen_capture, @@ -30,10 +31,10 @@ use regex::Regex; use serde::{Deserialize, Serialize}; use specta::Type; use std::borrow::Cow; +use std::error::Error as StdError; use std::{ any::Any, collections::{HashMap, VecDeque}, - error::Error as StdError, panic::AssertUnwindSafe, path::{Path, PathBuf}, str::FromStr, @@ -721,6 +722,12 @@ pub async fn start_recording( .as_ref() .map(|s| s.custom_cursor_capture) .unwrap_or_default(), + ) + .with_fragmented( + general_settings + .as_ref() + .map(|s| s.crash_recovery_recording) + .unwrap_or_default(), ); #[cfg(target_os = "macos")] @@ -1358,41 +1365,68 @@ async fn handle_recording_finish( let screenshots_dir = recording_dir.join("screenshots"); std::fs::create_dir_all(&screenshots_dir).ok(); - let display_output_path = match &completed_recording { - CompletedRecording::Studio { recording, .. } => match &recording.meta { - StudioRecordingMeta::SingleSegment { segment } => { - segment.display.path.to_path(&recording_dir) - } - StudioRecordingMeta::MultipleSegments { inner, .. } => { - inner.segments[0].display.path.to_path(&recording_dir) + let (meta_inner, sharing) = match completed_recording { + CompletedRecording::Studio { recording, .. } => { + let meta_inner = RecordingMetaInner::Studio(recording.meta.clone()); + + if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { + error!("Failed to load recording meta while saving finished recording: {err}") + }) { + meta.inner = meta_inner.clone(); + meta.sharing = None; + meta.save_for_project() + .map_err(|e| format!("Failed to save recording meta: {e}"))?; } - }, - CompletedRecording::Instant { recording, .. } => { - recording.project_path.join("./content/output.mp4") - } - }; - let display_screenshot = screenshots_dir.join("display.jpg"); - let screenshot_task = tokio::spawn(create_screenshot( - display_output_path, - display_screenshot.clone(), - None, - )); + let updated_studio_meta = if needs_fragment_remux(&recording_dir, &recording.meta) { + info!("Recording has fragments that need remuxing"); + if let Err(e) = remux_fragmented_recording(&recording_dir) { + error!("Failed to remux fragmented recording: {e}"); + return Err(format!("Failed to remux fragmented recording: {e}")); + } - let (meta_inner, sharing) = match completed_recording { - CompletedRecording::Studio { recording, .. } => { - let recordings = ProjectRecordingsMeta::new(&recording_dir, &recording.meta)?; + let updated_meta = RecordingMeta::load_for_project(&recording_dir) + .map_err(|e| format!("Failed to reload recording meta: {e}"))?; + updated_meta + .studio_meta() + .ok_or_else(|| "Expected studio meta after remux".to_string())? + .clone() + } else { + recording.meta.clone() + }; + + let display_output_path = match &updated_studio_meta { + StudioRecordingMeta::SingleSegment { segment } => { + segment.display.path.to_path(&recording_dir) + } + StudioRecordingMeta::MultipleSegments { inner, .. } => { + inner.segments[0].display.path.to_path(&recording_dir) + } + }; + + let display_screenshot = screenshots_dir.join("display.jpg"); + tokio::spawn(create_screenshot( + display_output_path, + display_screenshot.clone(), + None, + )); + + let recordings = ProjectRecordingsMeta::new(&recording_dir, &updated_studio_meta)?; let config = project_config_from_recording( app, - &recording, + &cap_recording::studio_recording::CompletedRecording { + project_path: recording.project_path, + meta: updated_studio_meta.clone(), + cursor_data: recording.cursor_data, + }, &recordings, PresetsStore::get_default_preset(app)?.map(|p| p.config), ); config.write(&recording_dir).map_err(|e| e.to_string())?; - (RecordingMetaInner::Studio(recording.meta), None) + (RecordingMetaInner::Studio(updated_studio_meta), None) } CompletedRecording::Instant { recording, @@ -1403,6 +1437,13 @@ async fn handle_recording_finish( let app = app.clone(); let output_path = recording_dir.join("content/output.mp4"); + let display_screenshot = screenshots_dir.join("display.jpg"); + let screenshot_task = tokio::spawn(create_screenshot( + output_path.clone(), + display_screenshot.clone(), + None, + )); + let _ = open_external_link(app.clone(), video_upload_info.link.clone()); spawn_actor({ @@ -1499,9 +1540,11 @@ async fn handle_recording_finish( } }; - if let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { - error!("Failed to load recording meta while saving finished recording: {err}") - }) { + if let RecordingMetaInner::Instant(_) = &meta_inner + && let Ok(mut meta) = RecordingMeta::load_for_project(&recording_dir).map_err(|err| { + error!("Failed to load recording meta while saving finished recording: {err}") + }) + { meta.inner = meta_inner.clone(); meta.sharing = sharing; meta.save_for_project() @@ -1829,6 +1872,148 @@ fn project_config_from_recording( config } +fn needs_fragment_remux(recording_dir: &Path, meta: &StudioRecordingMeta) -> bool { + let StudioRecordingMeta::MultipleSegments { inner, .. } = meta else { + return false; + }; + + for segment in &inner.segments { + let display_path = segment.display.path.to_path(recording_dir); + if display_path.is_dir() { + return true; + } + } + + false +} + +fn remux_fragmented_recording(recording_dir: &Path) -> Result<(), String> { + let meta = RecordingMeta::load_for_project(recording_dir) + .map_err(|e| format!("Failed to load recording meta: {e}"))?; + + let incomplete = + RecoveryManager::find_incomplete(recording_dir.parent().unwrap_or(recording_dir)); + + let incomplete_recording = incomplete + .into_iter() + .find(|r| r.project_path == recording_dir) + .or_else(|| analyze_recording_for_remux(recording_dir, &meta)); + + if let Some(recording) = incomplete_recording { + RecoveryManager::recover(&recording) + .map_err(|e| format!("Failed to remux recording: {e}"))?; + info!("Successfully remuxed fragmented recording"); + Ok(()) + } else { + Err("Could not find fragments to remux".to_string()) + } +} + +fn analyze_recording_for_remux( + project_path: &Path, + meta: &RecordingMeta, +) -> Option { + use cap_recording::recovery::{IncompleteRecording, RecoverableSegment}; + + let StudioRecordingMeta::MultipleSegments { inner, .. } = meta.studio_meta()? else { + return None; + }; + + let mut recoverable_segments = Vec::new(); + + for (index, segment) in inner.segments.iter().enumerate() { + let display_path = segment.display.path.to_path(project_path); + let display_fragments = if display_path.is_dir() { + find_fragments_in_dir(&display_path) + } else if display_path.exists() { + vec![display_path] + } else { + continue; + }; + + if display_fragments.is_empty() { + continue; + } + + let camera_fragments = segment.camera.as_ref().and_then(|cam| { + let cam_path = cam.path.to_path(project_path); + if cam_path.is_dir() { + let frags = find_fragments_in_dir(&cam_path); + if frags.is_empty() { None } else { Some(frags) } + } else if cam_path.exists() { + Some(vec![cam_path]) + } else { + None + } + }); + + let cursor_path = segment + .cursor + .as_ref() + .map(|c| c.to_path(project_path)) + .filter(|p| p.exists()); + + let mic_fragments = segment.mic.as_ref().and_then(|mic| { + let mic_path = mic.path.to_path(project_path); + if mic_path.is_dir() { + let frags = find_fragments_in_dir(&mic_path); + if frags.is_empty() { None } else { Some(frags) } + } else if mic_path.exists() { + Some(vec![mic_path]) + } else { + None + } + }); + + let system_audio_fragments = segment.system_audio.as_ref().and_then(|sys| { + let sys_path = sys.path.to_path(project_path); + if sys_path.is_dir() { + let frags = find_fragments_in_dir(&sys_path); + if frags.is_empty() { None } else { Some(frags) } + } else if sys_path.exists() { + Some(vec![sys_path]) + } else { + None + } + }); + + recoverable_segments.push(RecoverableSegment { + index: index as u32, + display_fragments, + camera_fragments, + mic_fragments, + system_audio_fragments, + cursor_path, + }); + } + + if recoverable_segments.is_empty() { + return None; + } + + Some(IncompleteRecording { + project_path: project_path.to_path_buf(), + meta: meta.clone(), + recoverable_segments, + estimated_duration: Duration::ZERO, + }) +} + +fn find_fragments_in_dir(dir: &Path) -> Vec { + let Ok(entries) = std::fs::read_dir(dir) else { + return Vec::new(); + }; + + let mut fragments: Vec<_> = entries + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| p.extension().is_some_and(|e| e == "mp4" || e == "m4a")) + .collect(); + + fragments.sort(); + fragments +} + #[cfg(test)] mod tests { use super::*; diff --git a/apps/desktop/src-tauri/src/recovery.rs b/apps/desktop/src-tauri/src/recovery.rs new file mode 100644 index 0000000000..cc61d419a2 --- /dev/null +++ b/apps/desktop/src-tauri/src/recovery.rs @@ -0,0 +1,122 @@ +use cap_project::StudioRecordingMeta; +use cap_recording::recovery::RecoveryManager; +use serde::{Deserialize, Serialize}; +use specta::Type; +use std::path::PathBuf; +use tauri::{AppHandle, Manager}; +use tracing::info; + +use crate::create_screenshot; + +#[derive(Debug, Clone, Serialize, Deserialize, Type)] +#[serde(rename_all = "camelCase")] +pub struct IncompleteRecordingInfo { + pub project_path: String, + pub pretty_name: String, + pub segment_count: u32, + pub estimated_duration_secs: f64, +} + +#[tauri::command] +#[specta::specta] +pub async fn find_incomplete_recordings( + app: AppHandle, +) -> Result, String> { + let recordings_dir = app + .path() + .app_data_dir() + .map_err(|e| e.to_string())? + .join("recordings"); + + if !recordings_dir.exists() { + return Ok(Vec::new()); + } + + let incomplete_list = RecoveryManager::find_incomplete(&recordings_dir); + + let result = incomplete_list + .into_iter() + .map(|recording| IncompleteRecordingInfo { + project_path: recording.project_path.to_string_lossy().to_string(), + pretty_name: recording.meta.pretty_name.clone(), + segment_count: recording.recoverable_segments.len() as u32, + estimated_duration_secs: recording.estimated_duration.as_secs_f64(), + }) + .collect(); + + Ok(result) +} + +#[tauri::command] +#[specta::specta] +pub async fn recover_recording(app: AppHandle, project_path: String) -> Result { + let recordings_dir = app + .path() + .app_data_dir() + .map_err(|e| e.to_string())? + .join("recordings"); + + let path = PathBuf::from(&project_path); + + let incomplete_list = RecoveryManager::find_incomplete(&recordings_dir); + + let recording = incomplete_list + .into_iter() + .find(|r| r.project_path == path) + .ok_or_else(|| "Recording not found in incomplete list".to_string())?; + + if recording.recoverable_segments.is_empty() { + return Err("No recoverable segments found".to_string()); + } + + let recovered = RecoveryManager::recover(&recording).map_err(|e| format!("{e}"))?; + + let segment_count = match &recovered.meta { + StudioRecordingMeta::SingleSegment { .. } => 1, + StudioRecordingMeta::MultipleSegments { inner } => inner.segments.len(), + }; + + info!( + "Recovered recording with {} segments: {}", + segment_count, project_path + ); + + let display_output_path = match &recovered.meta { + StudioRecordingMeta::SingleSegment { segment } => { + segment.display.path.to_path(&recovered.project_path) + } + StudioRecordingMeta::MultipleSegments { inner, .. } => inner.segments[0] + .display + .path + .to_path(&recovered.project_path), + }; + + let screenshots_dir = recovered.project_path.join("screenshots"); + std::fs::create_dir_all(&screenshots_dir) + .map_err(|e| format!("Failed to create screenshots directory: {e}"))?; + + let display_screenshot = screenshots_dir.join("display.jpg"); + tokio::spawn(async move { + if let Err(e) = create_screenshot(display_output_path, display_screenshot, None).await { + tracing::error!("Failed to create screenshot during recovery: {}", e); + } + }); + + Ok(project_path) +} + +#[tauri::command] +#[specta::specta] +pub async fn discard_incomplete_recording(project_path: String) -> Result<(), String> { + let path = PathBuf::from(&project_path); + + if !path.exists() { + return Err("Recording path does not exist".to_string()); + } + + std::fs::remove_dir_all(&path).map_err(|e| e.to_string())?; + + info!("Discarded incomplete recording: {}", project_path); + + Ok(()) +} diff --git a/apps/desktop/src-tauri/src/target_select_overlay.rs b/apps/desktop/src-tauri/src/target_select_overlay.rs index 1ea7a0642f..f72949c096 100644 --- a/apps/desktop/src-tauri/src/target_select_overlay.rs +++ b/apps/desktop/src-tauri/src/target_select_overlay.rs @@ -260,8 +260,10 @@ pub async fn focus_window(window_id: WindowId) -> Result<(), String> { // Only restore if the window is actually minimized if IsIconic(hwnd).as_bool() { // Get current window placement to preserve size/position - let mut wp = WINDOWPLACEMENT::default(); - wp.length = std::mem::size_of::() as u32; + let mut wp = WINDOWPLACEMENT { + length: std::mem::size_of::() as u32, + ..Default::default() + }; if GetWindowPlacement(hwnd, &mut wp).is_ok() { // Restore using the previous placement to avoid resizing diff --git a/apps/desktop/src-tauri/src/window_exclusion.rs b/apps/desktop/src-tauri/src/window_exclusion.rs index 09c688502b..67fc1c1e1e 100644 --- a/apps/desktop/src-tauri/src/window_exclusion.rs +++ b/apps/desktop/src-tauri/src/window_exclusion.rs @@ -1,5 +1,4 @@ -use scap_targets::Window; -use scap_targets::WindowId; +use scap_targets::{Window, WindowId}; use serde::{Deserialize, Serialize}; use specta::Type; diff --git a/apps/desktop/src-tauri/src/windows.rs b/apps/desktop/src-tauri/src/windows.rs index 0e817aa101..485bc54283 100644 --- a/apps/desktop/src-tauri/src/windows.rs +++ b/apps/desktop/src-tauri/src/windows.rs @@ -589,7 +589,10 @@ impl ShowCapWindow { _ = window.run_on_main_thread({ let window = window.as_ref().window(); move || unsafe { - let win = window.ns_window().unwrap() as *const objc2_app_kit::NSWindow; + let Ok(win) = window.ns_window() else { + return; + }; + let win = win as *const objc2_app_kit::NSWindow; (*win).setCollectionBehavior( (*win).collectionBehavior() | objc2_app_kit::NSWindowCollectionBehavior::FullScreenAuxiliary, ); diff --git a/apps/desktop/src/components/RecoveryToast.tsx b/apps/desktop/src/components/RecoveryToast.tsx new file mode 100644 index 0000000000..32e0fa75e2 --- /dev/null +++ b/apps/desktop/src/components/RecoveryToast.tsx @@ -0,0 +1,114 @@ +import { Button } from "@cap/ui-solid"; +import { createMutation, createQuery } from "@tanstack/solid-query"; +import { createSignal, Show } from "solid-js"; +import { commands } from "~/utils/tauri"; + +function formatDuration(secs: number): string { + if (secs < 60) { + return `${Math.round(secs)}s`; + } + const mins = Math.floor(secs / 60); + const remainingSecs = Math.round(secs % 60); + if (remainingSecs === 0) { + return `${mins}m`; + } + return `${mins}m ${remainingSecs}s`; +} + +export function RecoveryToast() { + const [dismissed] = createSignal(false); + + const incompleteRecordings = createQuery(() => ({ + queryKey: ["incompleteRecordings"], + queryFn: () => commands.findIncompleteRecordings(), + refetchOnWindowFocus: false, + staleTime: Number.POSITIVE_INFINITY, + })); + + const mostRecent = () => { + const data = incompleteRecordings.data; + if (!data || data.length === 0) return null; + return data[0]; + }; + + const recoverMutation = createMutation(() => ({ + mutationFn: async (projectPath: string) => { + const result = await commands.recoverRecording(projectPath); + await commands.showWindow({ Editor: { project_path: result } }); + await incompleteRecordings.refetch(); + return result; + }, + })); + + const discardMutation = createMutation(() => ({ + mutationFn: async (projectPath: string) => { + await commands.discardIncompleteRecording(projectPath); + await incompleteRecordings.refetch(); + }, + })); + + const isProcessing = () => + recoverMutation.isPending || discardMutation.isPending; + + const recording = () => mostRecent(); + const duration = () => { + const r = recording(); + if (!r || r.estimatedDurationSecs <= 0) return null; + return formatDuration(r.estimatedDurationSecs); + }; + + return ( + + {(rec) => ( +
+
+
+

+ Incomplete Recording +

+

+ {rec().prettyName} +

+

+ {rec().segmentCount} segment + {rec().segmentCount !== 1 ? "s" : ""} + {duration() && ` ยท ~${duration()}`} +

+ + {(error) => { + const errorMessage = () => { + const e = error(); + if (e instanceof Error) return e.message; + if (typeof e === "string") return e; + return "Recovery failed. The recording may be corrupted."; + }; + return ( +

{errorMessage()}

+ ); + }} +
+
+
+ + +
+
+
+ )} +
+ ); +} diff --git a/apps/desktop/src/routes/(window-chrome)/new-main/index.tsx b/apps/desktop/src/routes/(window-chrome)/new-main/index.tsx index 82e0200fec..4baa5c2e59 100644 --- a/apps/desktop/src/routes/(window-chrome)/new-main/index.tsx +++ b/apps/desktop/src/routes/(window-chrome)/new-main/index.tsx @@ -27,6 +27,7 @@ import { import { createStore, produce, reconcile } from "solid-js/store"; import { Transition } from "solid-transition-group"; import Mode from "~/components/Mode"; +import { RecoveryToast } from "~/components/RecoveryToast"; import Tooltip from "~/components/Tooltip"; import { Input } from "~/routes/editor/ui"; import { authStore, generalSettingsStore } from "~/store"; @@ -1124,8 +1125,23 @@ function Page() { } onSelect={async (recording) => { if (recording.mode === "studio") { + let projectPath = recording.path; + + const needsRecovery = + recording.status.status === "InProgress" || + recording.status.status === "NeedsRemux"; + + if (needsRecovery) { + try { + projectPath = + await commands.recoverRecording(projectPath); + } catch (e) { + console.error("Failed to recover recording:", e); + } + } + await commands.showWindow({ - Editor: { project_path: recording.path }, + Editor: { project_path: projectPath }, }); } else { if (recording.sharing?.link) { @@ -1181,6 +1197,7 @@ function Page() { + ); } diff --git a/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx b/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx index 1f9f8371fe..0e6ba2a7de 100644 --- a/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx +++ b/apps/desktop/src/routes/(window-chrome)/settings/experimental.tsx @@ -27,6 +27,7 @@ function Inner(props: { initialStore: GeneralSettingsStore | null }) { enableNewRecordingFlow: true, autoZoomOnClicks: false, custom_cursor_capture2: true, + crashRecoveryRecording: false, }, ); @@ -86,6 +87,14 @@ function Inner(props: { initialStore: GeneralSettingsStore | null }) { ); }} /> + + handleChange("crashRecoveryRecording", value) + } + /> diff --git a/apps/desktop/src/routes/editor/context.ts b/apps/desktop/src/routes/editor/context.ts index 502760f169..f7b9bef38b 100644 --- a/apps/desktop/src/routes/editor/context.ts +++ b/apps/desktop/src/routes/editor/context.ts @@ -740,22 +740,30 @@ export const [EditorInstanceContextProvider, useEditorInstanceContext] = data: ImageData; }>(); + const [isConnected, setIsConnected] = createSignal(false); + const [editorInstance] = createResource(async () => { + console.log("[Editor] Creating editor instance..."); const instance = await commands.createEditorInstance(); + console.log("[Editor] Editor instance created, setting up WebSocket"); - const [_ws, isConnected] = createImageDataWS( + const [ws, wsConnected] = createImageDataWS( instance.framesSocketUrl, setLatestFrame, ); - createEffect(() => { - if (isConnected()) { - events.renderFrameEvent.emit({ - frame_number: Math.floor(0), - fps: FPS, - resolution_base: getPreviewResolution(DEFAULT_PREVIEW_QUALITY), - }); - } + ws.addEventListener("open", () => { + console.log("[Editor] WebSocket open event - emitting initial frame"); + setIsConnected(true); + events.renderFrameEvent.emit({ + frame_number: 0, + fps: FPS, + resolution_base: getPreviewResolution(DEFAULT_PREVIEW_QUALITY), + }); + }); + + ws.addEventListener("close", () => { + setIsConnected(false); }); return instance; diff --git a/apps/desktop/src/utils/tauri.ts b/apps/desktop/src/utils/tauri.ts index 9a1f36bcc4..d3f34c9c9b 100644 --- a/apps/desktop/src/utils/tauri.ts +++ b/apps/desktop/src/utils/tauri.ts @@ -285,6 +285,15 @@ async editorDeleteProject() : Promise { }, async formatProjectName(template: string | null, targetName: string, targetKind: string, recordingMode: RecordingMode, datetime: string | null) : Promise { return await TAURI_INVOKE("format_project_name", { template, targetName, targetKind, recordingMode, datetime }); +}, +async findIncompleteRecordings() : Promise { + return await TAURI_INVOKE("find_incomplete_recordings"); +}, +async recoverRecording(projectPath: string) : Promise { + return await TAURI_INVOKE("recover_recording", { projectPath }); +}, +async discardIncompleteRecording(projectPath: string) : Promise { + return await TAURI_INVOKE("discard_incomplete_recording", { projectPath }); } } @@ -350,11 +359,7 @@ export type AspectRatio = "wide" | "vertical" | "square" | "classic" | "tall" export type Audio = { duration: number; sample_rate: number; channels: number; start_time: number } export type AudioConfiguration = { mute: boolean; improve: boolean; micVolumeDb?: number; micStereoMode?: StereoMode; systemVolumeDb?: number } export type AudioInputLevelChange = number -export type AudioMeta = { path: string; -/** - * unix time of the first frame - */ -start_time?: number | null } +export type AudioMeta = { path: string; start_time?: number | null } export type AuthSecret = { api_key: string } | { token: string; expires: number } export type AuthStore = { secret: AuthSecret; user_id: string | null; plan: Plan | null; intercom_hash: string | null; organizations?: Organization[] } export type BackgroundConfiguration = { source: BackgroundSource; blur: number; padding: number; rounding: number; roundingType?: CornerStyle; inset: number; crop: Crop | null; shadow?: number; advancedShadow?: ShadowConfiguration | null; border?: BorderConfiguration | null } @@ -401,7 +406,7 @@ export type ExportSettings = ({ format: "Mp4" } & Mp4ExportSettings) | ({ format export type FileType = "recording" | "screenshot" export type Flags = { captions: boolean } export type FramesRendered = { renderedCount: number; totalFrames: number; type: "FramesRendered" } -export type GeneralSettingsStore = { instanceId?: string; uploadIndividualFiles?: boolean; hideDockIcon?: boolean; autoCreateShareableLink?: boolean; enableNotifications?: boolean; disableAutoOpenLinks?: boolean; hasCompletedStartup?: boolean; theme?: AppTheme; commercialLicense?: CommercialLicense | null; lastVersion?: string | null; windowTransparency?: boolean; postStudioRecordingBehaviour?: PostStudioRecordingBehaviour; mainWindowRecordingStartBehaviour?: MainWindowRecordingStartBehaviour; custom_cursor_capture2?: boolean; serverUrl?: string; recordingCountdown?: number | null; enableNativeCameraPreview: boolean; autoZoomOnClicks?: boolean; enableNewRecordingFlow: boolean; recordingPickerPreferenceSet?: boolean; postDeletionBehaviour?: PostDeletionBehaviour; excludedWindows?: WindowExclusion[]; deleteInstantRecordingsAfterUpload?: boolean; instantModeMaxResolution?: number; defaultProjectNameTemplate?: string | null } +export type GeneralSettingsStore = { instanceId?: string; uploadIndividualFiles?: boolean; hideDockIcon?: boolean; autoCreateShareableLink?: boolean; enableNotifications?: boolean; disableAutoOpenLinks?: boolean; hasCompletedStartup?: boolean; theme?: AppTheme; commercialLicense?: CommercialLicense | null; lastVersion?: string | null; windowTransparency?: boolean; postStudioRecordingBehaviour?: PostStudioRecordingBehaviour; mainWindowRecordingStartBehaviour?: MainWindowRecordingStartBehaviour; custom_cursor_capture2?: boolean; serverUrl?: string; recordingCountdown?: number | null; enableNativeCameraPreview: boolean; autoZoomOnClicks?: boolean; enableNewRecordingFlow: boolean; recordingPickerPreferenceSet?: boolean; postDeletionBehaviour?: PostDeletionBehaviour; excludedWindows?: WindowExclusion[]; deleteInstantRecordingsAfterUpload?: boolean; instantModeMaxResolution?: number; defaultProjectNameTemplate?: string | null; crashRecoveryRecording?: boolean } export type GifExportSettings = { fps: number; resolution_base: XY; quality: GifQuality | null } export type GifQuality = { /** @@ -418,6 +423,7 @@ export type Hotkey = { code: string; meta: boolean; ctrl: boolean; alt: boolean; export type HotkeyAction = "startStudioRecording" | "startInstantRecording" | "stopRecording" | "restartRecording" | "openRecordingPicker" | "openRecordingPickerDisplay" | "openRecordingPickerWindow" | "openRecordingPickerArea" | "other" export type HotkeysConfiguration = { show: boolean } export type HotkeysStore = { hotkeys: { [key in HotkeyAction]: Hotkey } } +export type IncompleteRecordingInfo = { projectPath: string; prettyName: string; segmentCount: number; estimatedDurationSecs: number } export type InstantRecordingMeta = { recording: boolean } | { error: string } | { fps: number; sample_rate: number | null } export type JsonValue = [T] export type LogicalBounds = { position: LogicalPosition; size: LogicalSize } @@ -484,7 +490,7 @@ export type SingleSegment = { display: VideoMeta; camera?: VideoMeta | null; aud export type StartRecordingInputs = { capture_target: ScreenCaptureTarget; capture_system_audio?: boolean; mode: RecordingMode; organization_id?: string | null } export type StereoMode = "stereo" | "monoL" | "monoR" export type StudioRecordingMeta = { segment: SingleSegment } | { inner: MultipleSegments } -export type StudioRecordingStatus = { status: "InProgress" } | { status: "Failed"; error: string } | { status: "Complete" } +export type StudioRecordingStatus = { status: "InProgress" } | { status: "NeedsRemux" } | { status: "Failed"; error: string } | { status: "Complete" } export type TargetUnderCursor = { display_id: DisplayId | null; window: WindowUnderCursor | null } export type TextSegment = { start: number; end: number; enabled?: boolean; content?: string; center?: XY; size?: XY; fontFamily?: string; fontSize?: number; fontWeight?: number; italic?: boolean; color?: string } export type TimelineConfiguration = { segments: TimelineSegment[]; zoomSegments: ZoomSegment[]; sceneSegments?: SceneSegment[]; maskSegments?: MaskSegment[]; textSegments?: TextSegment[] } @@ -495,11 +501,7 @@ export type UploadProgress = { progress: number } export type UploadProgressEvent = { video_id: string; uploaded: string; total: string } export type UploadResult = { Success: string } | "NotAuthenticated" | "PlanCheckFailed" | "UpgradeRequired" export type Video = { duration: number; width: number; height: number; fps: number; start_time: number } -export type VideoMeta = { path: string; fps?: number; -/** - * unix time of the first frame - */ -start_time?: number | null } +export type VideoMeta = { path: string; fps?: number; start_time?: number | null } export type VideoRecordingMetadata = { duration: number; size: number } export type VideoUploadInfo = { id: string; link: string; config: S3UploadMeta } export type WindowExclusion = { bundleIdentifier?: string | null; ownerName?: string | null; windowTitle?: string | null } diff --git a/crates/editor/src/editor_instance.rs b/crates/editor/src/editor_instance.rs index f834a75b16..ac627b2430 100644 --- a/crates/editor/src/editor_instance.rs +++ b/crates/editor/src/editor_instance.rs @@ -2,10 +2,13 @@ use crate::editor; use crate::playback::{self, PlaybackHandle, PlaybackStartError}; use cap_audio::AudioData; use cap_project::StudioRecordingMeta; -use cap_project::{CursorEvents, ProjectConfiguration, RecordingMeta, RecordingMetaInner, XY}; +use cap_project::{ + CursorEvents, ProjectConfiguration, RecordingMeta, RecordingMetaInner, TimelineConfiguration, + TimelineSegment, XY, +}; use cap_rendering::{ ProjectRecordingsMeta, ProjectUniforms, RecordingSegmentDecoders, RenderVideoConstants, - RenderedFrame, SegmentVideoPaths, get_duration, + RenderedFrame, SegmentVideoPaths, Video, get_duration, }; use std::{path::PathBuf, sync::Arc}; use tokio::sync::{Mutex, watch}; @@ -35,29 +38,123 @@ impl EditorInstance { on_state_change: impl Fn(&EditorState) + Send + Sync + 'static, frame_cb: Box, ) -> Result, String> { + trace!("EditorInstance::new starting for {:?}", project_path); + if !project_path.exists() { - println!("Video path {} not found!", project_path.display()); - panic!("Video path {} not found!", project_path.display()); + return Err(format!("Video path {} not found!", project_path.display())); } - let recording_meta = cap_project::RecordingMeta::load_for_project(&project_path).unwrap(); + trace!("Loading recording meta"); + let recording_meta = cap_project::RecordingMeta::load_for_project(&project_path) + .map_err(|e| format!("Failed to load recording meta: {e}"))?; + let RecordingMetaInner::Studio(meta) = &recording_meta.inner else { return Err("Cannot edit non-studio recordings".to_string()); }; - let project = recording_meta.project_config(); + + let segment_count = match meta { + StudioRecordingMeta::SingleSegment { .. } => 1, + StudioRecordingMeta::MultipleSegments { inner } => inner.segments.len(), + }; + + trace!("Recording has {} segments", segment_count); + + if segment_count == 0 { + return Err( + "Recording has no segments. It may need to be recovered first.".to_string(), + ); + } + + let mut project = recording_meta.project_config(); + + if project.timeline.is_none() { + warn!("Project config has no timeline, creating one from recording segments"); + let timeline_segments = match meta { + StudioRecordingMeta::SingleSegment { segment } => { + let display_path = recording_meta.path(&segment.display.path); + let duration = match Video::new(&display_path, 0.0) { + Ok(v) => v.duration, + Err(e) => { + warn!( + "Failed to load video for duration calculation: {} (path: {}), using default duration 5.0s", + e, + display_path.display() + ); + 5.0 + } + }; + vec![TimelineSegment { + recording_clip: 0, + start: 0.0, + end: duration, + timescale: 1.0, + }] + } + StudioRecordingMeta::MultipleSegments { inner } => inner + .segments + .iter() + .enumerate() + .filter_map(|(i, segment)| { + let display_path = recording_meta.path(&segment.display.path); + let duration = match Video::new(&display_path, 0.0) { + Ok(v) => v.duration, + Err(e) => { + warn!( + "Failed to load video for duration calculation: {} (path: {}), using default duration 5.0s", + e, + display_path.display() + ); + 5.0 + } + }; + if duration <= 0.0 { + return None; + } + Some(TimelineSegment { + recording_clip: i as u32, + start: 0.0, + end: duration, + timescale: 1.0, + }) + }) + .collect(), + }; + + if !timeline_segments.is_empty() { + project.timeline = Some(TimelineConfiguration { + segments: timeline_segments, + zoom_segments: Vec::new(), + scene_segments: Vec::new(), + mask_segments: Vec::new(), + text_segments: Vec::new(), + }); + + if let Err(e) = project.write(&recording_meta.project_path) { + warn!("Failed to save auto-generated timeline: {}", e); + } else { + trace!("Auto-generated timeline saved to project config"); + } + } + } + + trace!("Creating ProjectRecordingsMeta"); let recordings = Arc::new(ProjectRecordingsMeta::new( &recording_meta.project_path, meta, )?); + trace!("Creating segments with decoders"); let segments = create_segments(&recording_meta, meta).await?; + trace!("Segments created successfully"); + trace!("Creating render constants"); let render_constants = Arc::new( RenderVideoConstants::new(&recordings.segments, recording_meta.clone(), meta.clone()) .await - .unwrap(), + .map_err(|e| format!("Failed to create render constants: {e}"))?, ); + trace!("Spawning renderer"); let renderer = Arc::new(editor::Renderer::spawn( render_constants.clone(), frame_cb, @@ -87,6 +184,7 @@ impl EditorInstance { this.state.lock().await.preview_task = Some(this.clone().spawn_preview_renderer(preview_rx)); + trace!("EditorInstance::new completed successfully"); Ok(this) } @@ -196,9 +294,13 @@ impl EditorInstance { self: Arc, mut preview_rx: watch::Receiver)>>, ) -> tokio::task::JoinHandle<()> { + trace!("Starting preview renderer task"); tokio::spawn(async move { + trace!("Preview renderer task running"); loop { + trace!("Preview renderer: waiting for frame request"); preview_rx.changed().await.unwrap(); + trace!("Preview renderer: received change notification"); loop { let Some((frame_number, fps, resolution_base)) = @@ -207,11 +309,17 @@ impl EditorInstance { break; }; + trace!("Preview renderer: processing frame {}", frame_number); + let project = self.project_config.1.borrow().clone(); let Some((segment_time, segment)) = project.get_segment_time(frame_number as f64 / fps as f64) else { + warn!( + "Preview renderer: no segment found for frame {}", + frame_number + ); break; }; @@ -241,6 +349,7 @@ impl EditorInstance { } if let Some(segment_frames) = segment_frames_opt { + trace!("Preview renderer: rendering frame {}", frame_number); let uniforms = ProjectUniforms::new( &self.render_constants, &project, @@ -253,6 +362,8 @@ impl EditorInstance { self.renderer .render_frame(segment_frames, uniforms, segment_medias.cursor.clone(), frame_number) .await; + } else { + warn!("Preview renderer: no frames returned for frame {}", frame_number); } } } diff --git a/crates/enc-avfoundation/Cargo.toml b/crates/enc-avfoundation/Cargo.toml index 369c2db4d6..5d5c3c567f 100644 --- a/crates/enc-avfoundation/Cargo.toml +++ b/crates/enc-avfoundation/Cargo.toml @@ -13,6 +13,8 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } [target.'cfg(target_os = "macos")'.dependencies] cidre = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } [lints] workspace = true diff --git a/crates/enc-avfoundation/src/lib.rs b/crates/enc-avfoundation/src/lib.rs index 1db0f16db5..8683aafff5 100644 --- a/crates/enc-avfoundation/src/lib.rs +++ b/crates/enc-avfoundation/src/lib.rs @@ -1,5 +1,7 @@ #![cfg(target_os = "macos")] mod mp4; +mod segmented; pub use mp4::*; +pub use segmented::*; diff --git a/crates/enc-avfoundation/src/segmented.rs b/crates/enc-avfoundation/src/segmented.rs new file mode 100644 index 0000000000..8cae09c910 --- /dev/null +++ b/crates/enc-avfoundation/src/segmented.rs @@ -0,0 +1,239 @@ +use crate::{FinishError, InitError, MP4Encoder, QueueFrameError}; +use cap_media_info::{AudioInfo, VideoInfo}; +use cidre::arc; +use ffmpeg::frame; +use serde::Serialize; +use std::{path::PathBuf, time::Duration}; + +pub struct SegmentedMP4Encoder { + base_path: PathBuf, + video_config: VideoInfo, + audio_config: Option, + output_height: Option, + + current_encoder: Option, + current_index: u32, + segment_duration: Duration, + segment_start_time: Option, + + completed_segments: Vec, +} + +#[derive(Debug, Clone)] +pub struct SegmentInfo { + pub path: PathBuf, + pub index: u32, + pub duration: Duration, +} + +#[derive(Serialize)] +struct FragmentEntry { + path: String, + index: u32, + duration: f64, + is_complete: bool, +} + +#[derive(Serialize)] +struct Manifest { + fragments: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + total_duration: Option, + is_complete: bool, +} + +impl SegmentedMP4Encoder { + pub fn init( + base_path: PathBuf, + video_config: VideoInfo, + audio_config: Option, + output_height: Option, + segment_duration: Duration, + ) -> Result { + std::fs::create_dir_all(&base_path).map_err(|_| InitError::NoSettingsAssistant)?; + + let segment_path = base_path.join("fragment_000.mp4"); + let encoder = MP4Encoder::init(segment_path, video_config, audio_config, output_height)?; + + Ok(Self { + base_path, + video_config, + audio_config, + output_height, + current_encoder: Some(encoder), + current_index: 0, + segment_duration, + segment_start_time: None, + completed_segments: Vec::new(), + }) + } + + pub fn queue_video_frame( + &mut self, + frame: arc::R, + timestamp: Duration, + ) -> Result<(), QueueFrameError> { + if self.segment_start_time.is_none() { + self.segment_start_time = Some(timestamp); + } + + let segment_elapsed = + timestamp.saturating_sub(self.segment_start_time.unwrap_or(Duration::ZERO)); + + if segment_elapsed >= self.segment_duration { + self.rotate_segment(timestamp)?; + } + + if let Some(encoder) = &mut self.current_encoder { + encoder.queue_video_frame(frame, timestamp) + } else { + Err(QueueFrameError::Failed) + } + } + + pub fn queue_audio_frame( + &mut self, + frame: &frame::Audio, + timestamp: Duration, + ) -> Result<(), QueueFrameError> { + if let Some(encoder) = &mut self.current_encoder { + encoder.queue_audio_frame(frame, timestamp) + } else { + Err(QueueFrameError::Failed) + } + } + + fn rotate_segment(&mut self, timestamp: Duration) -> Result<(), QueueFrameError> { + let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); + let segment_duration = timestamp.saturating_sub(segment_start); + + if let Some(mut encoder) = self.current_encoder.take() { + let _ = encoder.finish(Some(timestamp)); + + self.completed_segments.push(SegmentInfo { + path: self.current_segment_path(), + index: self.current_index, + duration: segment_duration, + }); + } + + self.current_index += 1; + self.segment_start_time = Some(timestamp); + + let new_path = self.current_segment_path(); + self.current_encoder = Some( + MP4Encoder::init( + new_path, + self.video_config, + self.audio_config, + self.output_height, + ) + .map_err(|_| QueueFrameError::Failed)?, + ); + + self.write_manifest(); + + Ok(()) + } + + fn current_segment_path(&self) -> PathBuf { + self.base_path + .join(format!("fragment_{:03}.mp4", self.current_index)) + } + + fn write_manifest(&self) { + let manifest = Manifest { + fragments: self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + }) + .collect(), + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + let _ = std::fs::write( + manifest_path, + serde_json::to_string_pretty(&manifest).unwrap_or_default(), + ); + } + + pub fn pause(&mut self) { + if let Some(encoder) = &mut self.current_encoder { + encoder.pause(); + } + } + + pub fn resume(&mut self) { + if let Some(encoder) = &mut self.current_encoder { + encoder.resume(); + } + } + + pub fn finish(&mut self, timestamp: Option) -> Result<(), FinishError> { + if let Some(segment_start) = self.segment_start_time { + let final_duration = timestamp + .unwrap_or(segment_start) + .saturating_sub(segment_start); + + self.completed_segments.push(SegmentInfo { + path: self.current_segment_path(), + index: self.current_index, + duration: final_duration, + }); + } + + if let Some(mut encoder) = self.current_encoder.take() { + encoder.finish(timestamp)?; + } + + self.finalize_manifest(); + + Ok(()) + } + + fn finalize_manifest(&self) { + let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); + + let manifest = Manifest { + fragments: self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + }) + .collect(), + total_duration: Some(total_duration.as_secs_f64()), + is_complete: true, + }; + + let manifest_path = self.base_path.join("manifest.json"); + let _ = std::fs::write( + manifest_path, + serde_json::to_string_pretty(&manifest).unwrap_or_default(), + ); + } + + pub fn completed_segments(&self) -> &[SegmentInfo] { + &self.completed_segments + } +} diff --git a/crates/enc-ffmpeg/Cargo.toml b/crates/enc-ffmpeg/Cargo.toml index 28f1c012e6..cf0244afac 100644 --- a/crates/enc-ffmpeg/Cargo.toml +++ b/crates/enc-ffmpeg/Cargo.toml @@ -7,6 +7,8 @@ edition = "2024" cap-media-info = { path = "../media-info" } ffmpeg.workspace = true +serde = { workspace = true, features = ["derive"] } +serde_json.workspace = true thiserror.workspace = true tracing.workspace = true workspace-hack = { version = "0.1", path = "../workspace-hack" } diff --git a/crates/enc-ffmpeg/src/lib.rs b/crates/enc-ffmpeg/src/lib.rs index d07e15e4a9..2dd64f560d 100644 --- a/crates/enc-ffmpeg/src/lib.rs +++ b/crates/enc-ffmpeg/src/lib.rs @@ -8,3 +8,8 @@ pub use video::*; mod mux; pub use mux::*; + +pub mod remux; +pub mod segmented_audio { + pub use crate::mux::segmented_audio::*; +} diff --git a/crates/enc-ffmpeg/src/mux/fragmented_audio.rs b/crates/enc-ffmpeg/src/mux/fragmented_audio.rs new file mode 100644 index 0000000000..0792136a5e --- /dev/null +++ b/crates/enc-ffmpeg/src/mux/fragmented_audio.rs @@ -0,0 +1,99 @@ +use cap_media_info::AudioInfo; +use ffmpeg::{format, frame}; +use std::{path::PathBuf, time::Duration}; + +use crate::audio::aac::{AACEncoder, AACEncoderError}; + +pub struct FragmentedAudioFile { + encoder: AACEncoder, + output: format::context::Output, + finished: bool, + has_frames: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error("FFmpeg: {0}")] + FFmpeg(#[from] ffmpeg::Error), + #[error("Encoder: {0}")] + Encoder(#[from] AACEncoderError), + #[error("IO: {0}")] + Io(#[from] std::io::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum FinishError { + #[error("Already finished")] + AlreadyFinished, + #[error("{0}")] + WriteTrailerFailed(ffmpeg::Error), +} + +impl FragmentedAudioFile { + pub fn init(mut output_path: PathBuf, audio_config: AudioInfo) -> Result { + output_path.set_extension("m4a"); + + if let Some(parent) = output_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let mut output = format::output_as(&output_path, "mp4")?; + + unsafe { + let opts = output.as_mut_ptr(); + let key = std::ffi::CString::new("movflags").unwrap(); + let value = + std::ffi::CString::new("frag_keyframe+empty_moov+default_base_moof").unwrap(); + ffmpeg::ffi::av_opt_set((*opts).priv_data, key.as_ptr(), value.as_ptr(), 0); + } + + let encoder = AACEncoder::init(audio_config, &mut output)?; + + output.write_header()?; + + Ok(Self { + encoder, + output, + finished: false, + has_frames: false, + }) + } + + pub fn encoder(&self) -> &AACEncoder { + &self.encoder + } + + pub fn queue_frame( + &mut self, + frame: frame::Audio, + timestamp: Duration, + ) -> Result<(), ffmpeg::Error> { + self.has_frames = true; + self.encoder.send_frame(frame, timestamp, &mut self.output) + } + + pub fn finish(&mut self) -> Result, FinishError> { + if self.finished { + return Err(FinishError::AlreadyFinished); + } + + self.finished = true; + + if self.has_frames { + let flush_result = self.encoder.flush(&mut self.output); + self.output + .write_trailer() + .map_err(FinishError::WriteTrailerFailed)?; + Ok(flush_result) + } else { + let _ = self.output.write_trailer(); + Ok(Ok(())) + } + } +} + +impl Drop for FragmentedAudioFile { + fn drop(&mut self) { + let _ = self.finish(); + } +} diff --git a/crates/enc-ffmpeg/src/mux/mod.rs b/crates/enc-ffmpeg/src/mux/mod.rs index 280593d90f..bb986be77c 100644 --- a/crates/enc-ffmpeg/src/mux/mod.rs +++ b/crates/enc-ffmpeg/src/mux/mod.rs @@ -1,2 +1,4 @@ +pub mod fragmented_audio; pub mod mp4; pub mod ogg; +pub mod segmented_audio; diff --git a/crates/enc-ffmpeg/src/mux/segmented_audio.rs b/crates/enc-ffmpeg/src/mux/segmented_audio.rs new file mode 100644 index 0000000000..b00f7c4a33 --- /dev/null +++ b/crates/enc-ffmpeg/src/mux/segmented_audio.rs @@ -0,0 +1,310 @@ +use crate::audio::aac::{AACEncoder, AACEncoderError}; +use cap_media_info::AudioInfo; +use ffmpeg::{format, frame}; +use serde::Serialize; +use std::{path::PathBuf, time::Duration}; + +pub struct SegmentedAudioEncoder { + base_path: PathBuf, + audio_config: AudioInfo, + + current_encoder: Option, + current_index: u32, + segment_duration: Duration, + segment_start_time: Option, + last_frame_timestamp: Option, + + completed_segments: Vec, +} + +struct AudioSegmentEncoder { + encoder: AACEncoder, + output: format::context::Output, + has_frames: bool, +} + +#[derive(Debug, Clone)] +pub struct SegmentInfo { + pub path: PathBuf, + pub index: u32, + pub duration: Duration, +} + +#[derive(Serialize)] +struct FragmentEntry { + path: String, + index: u32, + duration: f64, + is_complete: bool, +} + +#[derive(Serialize)] +struct Manifest { + fragments: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + total_duration: Option, + is_complete: bool, +} + +#[derive(thiserror::Error, Debug)] +pub enum InitError { + #[error("FFmpeg: {0}")] + FFmpeg(#[from] ffmpeg::Error), + #[error("Encoder: {0}")] + Encoder(#[from] AACEncoderError), + #[error("IO: {0}")] + Io(#[from] std::io::Error), +} + +#[derive(thiserror::Error, Debug)] +pub enum QueueFrameError { + #[error("FFmpeg: {0}")] + FFmpeg(#[from] ffmpeg::Error), + #[error("Init: {0}")] + Init(#[from] InitError), +} + +#[derive(thiserror::Error, Debug)] +pub enum FinishError { + #[error("FFmpeg: {0}")] + FFmpeg(#[from] ffmpeg::Error), +} + +impl SegmentedAudioEncoder { + pub fn init( + base_path: PathBuf, + audio_config: AudioInfo, + segment_duration: Duration, + ) -> Result { + std::fs::create_dir_all(&base_path)?; + + let segment_path = base_path.join("fragment_000.m4a"); + let encoder = Self::create_segment_encoder(segment_path, audio_config)?; + + Ok(Self { + base_path, + audio_config, + current_encoder: Some(encoder), + current_index: 0, + segment_duration, + segment_start_time: None, + last_frame_timestamp: None, + completed_segments: Vec::new(), + }) + } + + fn create_segment_encoder( + path: PathBuf, + audio_config: AudioInfo, + ) -> Result { + let mut output = format::output_as(&path, "mp4")?; + + unsafe { + let opts = output.as_mut_ptr(); + let key = std::ffi::CString::new("movflags").unwrap(); + let value = + std::ffi::CString::new("frag_keyframe+empty_moov+default_base_moof").unwrap(); + ffmpeg::ffi::av_opt_set((*opts).priv_data, key.as_ptr(), value.as_ptr(), 0); + } + + let encoder = AACEncoder::init(audio_config, &mut output)?; + + output.write_header()?; + + Ok(AudioSegmentEncoder { + encoder, + output, + has_frames: false, + }) + } + + pub fn queue_frame( + &mut self, + frame: frame::Audio, + timestamp: Duration, + ) -> Result<(), QueueFrameError> { + if self.segment_start_time.is_none() { + self.segment_start_time = Some(timestamp); + } + + self.last_frame_timestamp = Some(timestamp); + + let segment_elapsed = + timestamp.saturating_sub(self.segment_start_time.unwrap_or(Duration::ZERO)); + + if segment_elapsed >= self.segment_duration { + self.rotate_segment(timestamp)?; + } + + if let Some(encoder) = &mut self.current_encoder { + encoder + .encoder + .send_frame(frame, timestamp, &mut encoder.output)?; + encoder.has_frames = true; + } + + Ok(()) + } + + fn rotate_segment(&mut self, timestamp: Duration) -> Result<(), QueueFrameError> { + let segment_start = self.segment_start_time.unwrap_or(Duration::ZERO); + let segment_duration = timestamp.saturating_sub(segment_start); + + if let Some(mut encoder) = self.current_encoder.take() { + let _ = encoder.encoder.flush(&mut encoder.output); + let _ = encoder.output.write_trailer(); + + self.completed_segments.push(SegmentInfo { + path: self.current_segment_path(), + index: self.current_index, + duration: segment_duration, + }); + } + + self.current_index += 1; + self.segment_start_time = Some(timestamp); + + let new_path = self.current_segment_path(); + self.current_encoder = Some(Self::create_segment_encoder(new_path, self.audio_config)?); + + self.write_manifest(); + + Ok(()) + } + + fn current_segment_path(&self) -> PathBuf { + self.base_path + .join(format!("fragment_{:03}.m4a", self.current_index)) + } + + fn write_manifest(&self) { + let manifest = Manifest { + fragments: self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + }) + .collect(), + total_duration: None, + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + let _ = std::fs::write( + manifest_path, + serde_json::to_string_pretty(&manifest).unwrap_or_default(), + ); + } + + pub fn finish(&mut self) -> Result<(), FinishError> { + if let Some(mut encoder) = self.current_encoder.take() { + if encoder.has_frames { + if let Some(segment_start) = self.segment_start_time { + let final_duration = self + .last_frame_timestamp + .unwrap_or(segment_start) + .saturating_sub(segment_start); + + self.completed_segments.push(SegmentInfo { + path: self.current_segment_path(), + index: self.current_index, + duration: final_duration, + }); + } + + let flush_result = encoder.encoder.flush(&mut encoder.output); + let trailer_result = encoder.output.write_trailer(); + + if let Err(e) = &flush_result { + tracing::warn!("Audio encoder flush warning: {e}"); + } + if let Err(e) = &trailer_result { + tracing::warn!("Audio write_trailer warning: {e}"); + } + } else { + let _ = encoder.output.write_trailer(); + let _ = std::fs::remove_file(self.current_segment_path()); + } + } + + self.finalize_manifest(); + + Ok(()) + } + + pub fn finish_with_timestamp(&mut self, timestamp: Duration) -> Result<(), FinishError> { + if let Some(mut encoder) = self.current_encoder.take() { + if encoder.has_frames { + if let Some(segment_start) = self.segment_start_time { + let final_duration = timestamp.saturating_sub(segment_start); + + self.completed_segments.push(SegmentInfo { + path: self.current_segment_path(), + index: self.current_index, + duration: final_duration, + }); + } + + let flush_result = encoder.encoder.flush(&mut encoder.output); + let trailer_result = encoder.output.write_trailer(); + + if let Err(e) = &flush_result { + tracing::warn!("Audio encoder flush warning: {e}"); + } + if let Err(e) = &trailer_result { + tracing::warn!("Audio write_trailer warning: {e}"); + } + } else { + let _ = encoder.output.write_trailer(); + let _ = std::fs::remove_file(self.current_segment_path()); + } + } + + self.finalize_manifest(); + + Ok(()) + } + + fn finalize_manifest(&self) { + let total_duration: Duration = self.completed_segments.iter().map(|s| s.duration).sum(); + + let manifest = Manifest { + fragments: self + .completed_segments + .iter() + .map(|s| FragmentEntry { + path: s + .path + .file_name() + .unwrap_or_default() + .to_string_lossy() + .into_owned(), + index: s.index, + duration: s.duration.as_secs_f64(), + is_complete: true, + }) + .collect(), + total_duration: Some(total_duration.as_secs_f64()), + is_complete: true, + }; + + let manifest_path = self.base_path.join("manifest.json"); + let _ = std::fs::write( + manifest_path, + serde_json::to_string_pretty(&manifest).unwrap_or_default(), + ); + } + + pub fn completed_segments(&self) -> &[SegmentInfo] { + &self.completed_segments + } +} diff --git a/crates/enc-ffmpeg/src/remux.rs b/crates/enc-ffmpeg/src/remux.rs new file mode 100644 index 0000000000..cf858002e2 --- /dev/null +++ b/crates/enc-ffmpeg/src/remux.rs @@ -0,0 +1,366 @@ +use std::{ + ffi::CString, + io::Write, + path::{Path, PathBuf}, + ptr, + time::Duration, +}; + +use cap_media_info::{AudioInfo, AudioInfoError}; +use ffmpeg::{ChannelLayout, codec as avcodec, format as avformat, packet::Mut as PacketMut}; + +use crate::audio::opus::{OpusEncoder, OpusEncoderError}; + +#[derive(Debug, thiserror::Error)] +pub enum RemuxError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("FFmpeg error: {0}")] + Ffmpeg(#[from] ffmpeg::Error), + #[error("No input fragments provided")] + NoFragments, + #[error("Fragment not found: {0}")] + FragmentNotFound(PathBuf), + #[error("No audio stream found")] + NoAudioStream, + #[error("Opus encoder error: {0}")] + OpusEncoder(#[from] OpusEncoderError), + #[error("Audio info error: {0}")] + AudioInfo(#[from] AudioInfoError), + #[error("Concat demuxer not found")] + ConcatDemuxerNotFound, +} + +pub fn concatenate_video_fragments(fragments: &[PathBuf], output: &Path) -> Result<(), RemuxError> { + if fragments.is_empty() { + return Err(RemuxError::NoFragments); + } + + for fragment in fragments { + if !fragment.exists() { + return Err(RemuxError::FragmentNotFound(fragment.clone())); + } + } + + let concat_list_path = output.with_extension("concat.txt"); + { + let mut file = std::fs::File::create(&concat_list_path)?; + for fragment in fragments { + writeln!( + file, + "file '{}'", + fragment.to_string_lossy().replace('\'', "'\\''") + )?; + } + } + + let result = concatenate_with_concat_demuxer(&concat_list_path, output); + + let _ = std::fs::remove_file(&concat_list_path); + + result +} + +fn open_input_with_format( + path: &Path, + format_name: &str, + options: ffmpeg::Dictionary, +) -> Result { + unsafe { + let format_cstr = + CString::new(format_name).map_err(|_| RemuxError::ConcatDemuxerNotFound)?; + let input_format = ffmpeg::ffi::av_find_input_format(format_cstr.as_ptr()); + if input_format.is_null() { + return Err(RemuxError::ConcatDemuxerNotFound); + } + + let path_cstr = CString::new(path.to_string_lossy().as_bytes()).map_err(|_| { + RemuxError::Io(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "Invalid path", + )) + })?; + + let mut ps = ptr::null_mut(); + let mut opts = options.disown(); + + let ret = + ffmpeg::ffi::avformat_open_input(&mut ps, path_cstr.as_ptr(), input_format, &mut opts); + + ffmpeg::Dictionary::own(opts); + + if ret < 0 { + return Err(ffmpeg::Error::from(ret).into()); + } + + let ret = ffmpeg::ffi::avformat_find_stream_info(ps, ptr::null_mut()); + if ret < 0 { + ffmpeg::ffi::avformat_close_input(&mut ps); + return Err(ffmpeg::Error::from(ret).into()); + } + + Ok(avformat::context::Input::wrap(ps)) + } +} + +fn concatenate_with_concat_demuxer( + concat_list_path: &Path, + output: &Path, +) -> Result<(), RemuxError> { + let mut options = ffmpeg::Dictionary::new(); + options.set("safe", "0"); + + let mut ictx = open_input_with_format(concat_list_path, "concat", options)?; + let mut octx = avformat::output(output)?; + + let mut stream_mapping: Vec> = Vec::new(); + let mut output_stream_index = 0usize; + + for input_stream in ictx.streams() { + let codec_params = input_stream.parameters(); + let medium = codec_params.medium(); + + if medium == ffmpeg::media::Type::Video || medium == ffmpeg::media::Type::Audio { + stream_mapping.push(Some(output_stream_index)); + output_stream_index += 1; + + let mut output_stream = octx.add_stream(None)?; + output_stream.set_parameters(codec_params); + unsafe { + (*output_stream.as_mut_ptr()).time_base = (*input_stream.as_ptr()).time_base; + } + } else { + stream_mapping.push(None); + } + } + + octx.write_header()?; + + let mut last_dts: Vec = vec![i64::MIN; output_stream_index]; + let mut dts_offset: Vec = vec![0; output_stream_index]; + + for (input_stream, packet) in ictx.packets() { + let input_stream_index = input_stream.index(); + + if let Some(Some(output_index)) = stream_mapping.get(input_stream_index) { + let output_index = *output_index; + let mut packet = packet; + let input_time_base = input_stream.time_base(); + let output_time_base = octx.stream(output_index).unwrap().time_base(); + + packet.rescale_ts(input_time_base, output_time_base); + + let current_dts = packet.dts().unwrap_or(0); + + if last_dts[output_index] != i64::MIN && current_dts <= last_dts[output_index] { + dts_offset[output_index] = last_dts[output_index] - current_dts + 1; + } + + let adjusted_dts = current_dts + dts_offset[output_index]; + let adjusted_pts = packet.pts().map(|pts| pts + dts_offset[output_index]); + + unsafe { + (*packet.as_mut_ptr()).dts = adjusted_dts; + if let Some(pts) = adjusted_pts { + (*packet.as_mut_ptr()).pts = pts; + } + } + + last_dts[output_index] = adjusted_dts; + + packet.set_stream(output_index); + packet.set_position(-1); + + packet.write_interleaved(&mut octx)?; + } + } + + octx.write_trailer()?; + + Ok(()) +} + +pub fn concatenate_audio_to_ogg(fragments: &[PathBuf], output: &Path) -> Result<(), RemuxError> { + if fragments.is_empty() { + return Err(RemuxError::NoFragments); + } + + for fragment in fragments { + if !fragment.exists() { + return Err(RemuxError::FragmentNotFound(fragment.clone())); + } + } + + let concat_list_path = output.with_extension("concat.txt"); + { + let mut file = std::fs::File::create(&concat_list_path)?; + for fragment in fragments { + writeln!( + file, + "file '{}'", + fragment.to_string_lossy().replace('\'', "'\\''") + )?; + } + } + + let result = transcode_audio_to_ogg(&concat_list_path, output); + + let _ = std::fs::remove_file(&concat_list_path); + + result +} + +fn transcode_audio_to_ogg(concat_list_path: &Path, output: &Path) -> Result<(), RemuxError> { + let mut options = ffmpeg::Dictionary::new(); + options.set("safe", "0"); + + let mut ictx = open_input_with_format(concat_list_path, "concat", options)?; + + let input_stream = ictx + .streams() + .best(ffmpeg::media::Type::Audio) + .ok_or(RemuxError::NoAudioStream)?; + + let input_stream_index = input_stream.index(); + let input_time_base = input_stream.time_base(); + + let decoder_ctx = avcodec::Context::from_parameters(input_stream.parameters())?; + let mut decoder = decoder_ctx.decoder().audio()?; + + if decoder.channel_layout().is_empty() { + decoder.set_channel_layout(ChannelLayout::default(decoder.channels() as i32)); + } + decoder.set_packet_time_base(input_time_base); + + let input_audio_info = AudioInfo::from_decoder(&decoder)?; + + let mut octx = avformat::output(output)?; + + let mut opus_encoder = OpusEncoder::init(input_audio_info, &mut octx)?; + + octx.write_header()?; + + let mut decoded_frame = ffmpeg::frame::Audio::empty(); + + for (stream, packet) in ictx.packets() { + if stream.index() == input_stream_index { + decoder.send_packet(&packet)?; + + while decoder.receive_frame(&mut decoded_frame).is_ok() { + opus_encoder.queue_frame(decoded_frame.clone(), Duration::MAX, &mut octx)?; + } + } + } + + decoder.send_eof()?; + + while decoder.receive_frame(&mut decoded_frame).is_ok() { + opus_encoder.queue_frame(decoded_frame.clone(), Duration::MAX, &mut octx)?; + } + + opus_encoder.flush(&mut octx)?; + + octx.write_trailer()?; + + Ok(()) +} + +pub fn stream_copy_fragments(fragments: &[PathBuf], output: &Path) -> Result<(), RemuxError> { + concatenate_video_fragments(fragments, output) +} + +pub fn probe_media_valid(path: &Path) -> bool { + avformat::input(path).is_ok() +} + +pub fn probe_video_can_decode(path: &Path) -> Result { + let input = avformat::input(path).map_err(|e| format!("Failed to open file: {e}"))?; + + let input_stream = input + .streams() + .best(ffmpeg::media::Type::Video) + .ok_or_else(|| "No video stream found".to_string())?; + + let decoder_ctx = avcodec::Context::from_parameters(input_stream.parameters()) + .map_err(|e| format!("Failed to create decoder context: {e}"))?; + + let mut decoder = decoder_ctx + .decoder() + .video() + .map_err(|e| format!("Failed to create video decoder: {e}"))?; + + let stream_index = input_stream.index(); + + let mut input = avformat::input(path).map_err(|e| format!("Failed to reopen file: {e}"))?; + + let mut frame = ffmpeg::frame::Video::empty(); + let mut packets_tried = 0; + const MAX_PACKETS: usize = 100; + + for (stream, packet) in input.packets() { + if stream.index() != stream_index { + continue; + } + + packets_tried += 1; + + if let Err(e) = decoder.send_packet(&packet) { + if packets_tried >= MAX_PACKETS { + return Err(format!( + "Failed to send packet after {packets_tried} attempts: {e}" + )); + } + continue; + } + + match decoder.receive_frame(&mut frame) { + Ok(()) => return Ok(true), + Err(ffmpeg::Error::Other { errno }) if errno == ffmpeg::ffi::EAGAIN => continue, + Err(ffmpeg::Error::Eof) => break, + Err(e) => { + if packets_tried >= MAX_PACKETS { + return Err(format!( + "Failed to decode frame after {packets_tried} packets: {e}" + )); + } + continue; + } + } + } + + if let Err(e) = decoder.send_eof() { + return Err(format!("Failed to send EOF: {e}")); + } + + loop { + match decoder.receive_frame(&mut frame) { + Ok(()) => return Ok(true), + Err(ffmpeg::Error::Eof) => break, + Err(ffmpeg::Error::Other { errno }) if errno == ffmpeg::ffi::EAGAIN => continue, + Err(e) => return Err(format!("Failed to receive frame after EOF: {e}")), + } + } + + Err(format!( + "No decodable frames found after trying {packets_tried} packets" + )) +} + +pub fn get_media_duration(path: &Path) -> Option { + let input = avformat::input(path).ok()?; + let duration_ts = input.duration(); + if duration_ts <= 0 { + return None; + } + Some(Duration::from_micros(duration_ts as u64)) +} + +pub fn get_video_fps(path: &Path) -> Option { + let input = avformat::input(path).ok()?; + let stream = input.streams().best(ffmpeg::media::Type::Video)?; + let rate = stream.avg_frame_rate(); + if rate.denominator() == 0 { + return None; + } + Some((rate.numerator() as f64 / rate.denominator() as f64).round() as u32) +} diff --git a/crates/mediafoundation-ffmpeg/src/h264.rs b/crates/mediafoundation-ffmpeg/src/h264.rs index bf12d502c6..55e8a5c0af 100644 --- a/crates/mediafoundation-ffmpeg/src/h264.rs +++ b/crates/mediafoundation-ffmpeg/src/h264.rs @@ -1,19 +1,32 @@ use cap_mediafoundation_utils::*; use ffmpeg::{Rational, ffi::av_rescale_q, packet}; +use std::ffi::CString; use tracing::*; use windows::Win32::Media::MediaFoundation::{IMFSample, MFSampleExtension_CleanPoint}; -/// Configuration for H264 muxing #[derive(Clone, Debug)] pub struct MuxerConfig { pub width: u32, pub height: u32, pub fps: u32, pub bitrate: u32, + pub fragmented: bool, + pub frag_duration_us: i64, +} + +impl Default for MuxerConfig { + fn default() -> Self { + Self { + width: 0, + height: 0, + fps: 30, + bitrate: 0, + fragmented: false, + frag_duration_us: 2_000_000, + } + } } -/// H264 stream muxer that works with external FFmpeg output contexts -/// This version doesn't hold a reference to the output, making it easier to integrate pub struct H264StreamMuxer { stream_index: usize, time_base: ffmpeg::Rational, @@ -22,27 +35,21 @@ pub struct H264StreamMuxer { } impl H264StreamMuxer { - /// Add an H264 stream to an output context and create a muxer for it - /// Returns the muxer which can be used to write packets to the stream - /// Note: The caller must call write_header() on the output after adding all streams pub fn new( output: &mut ffmpeg::format::context::Output, config: MuxerConfig, ) -> Result { info!("Adding H264 stream to output context"); - // Find H264 codec let h264_codec = ffmpeg::codec::decoder::find(ffmpeg::codec::Id::H264) .ok_or(ffmpeg::Error::DecoderNotFound)?; - // Add video stream let mut stream = output.add_stream(h264_codec)?; let stream_index = stream.index(); let time_base = ffmpeg::Rational::new(1, config.fps as i32 * 1000); stream.set_time_base(time_base); - // Configure stream parameters unsafe { let codecpar = (*stream.as_mut_ptr()).codecpar; (*codecpar).codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO; @@ -52,7 +59,6 @@ impl H264StreamMuxer { (*codecpar).bit_rate = config.bitrate as i64; (*codecpar).format = ffmpeg::ffi::AVPixelFormat::AV_PIX_FMT_NV12 as i32; - // Set frame rate (*stream.as_mut_ptr()).avg_frame_rate = ffmpeg::ffi::AVRational { num: config.fps as i32, den: 1, @@ -64,11 +70,12 @@ impl H264StreamMuxer { } info!( - "H264 stream added: {}x{} @ {} fps, {} kbps", + "H264 stream added: {}x{} @ {} fps, {} kbps, fragmented={}", config.width, config.height, config.fps, - config.bitrate / 1000 + config.bitrate / 1000, + config.fragmented ); Ok(Self { @@ -79,7 +86,6 @@ impl H264StreamMuxer { }) } - /// Write an H264 sample from MediaFoundation to the output pub fn write_sample( &mut self, sample: &IMFSample, @@ -135,19 +141,9 @@ impl H264StreamMuxer { packet.set_stream(self.stream_index); - // if let Ok(decode_timestamp) = - // unsafe { sample.GetUINT64(&MFSampleExtension_DecodeTimestamp) } - // { - // packet.set_dts(Some(mf_from_mf_time( - // self.time_base, - // decode_timestamp as i64, - // ))); - // } - Ok(packet) } - /// Mark the muxer as finished (note: does not write trailer, caller is responsible) pub fn finish(&mut self) -> Result<(), ffmpeg::Error> { if self.is_finished { return Ok(()); @@ -157,17 +153,13 @@ impl H264StreamMuxer { info!("Finishing H264 muxer, wrote {} frames", self.frame_count); - // Note: Caller is responsible for writing trailer to the output context - Ok(()) } - /// Get the number of frames written pub fn frame_count(&self) -> u64 { self.frame_count } - /// Check if the muxer is finished pub fn is_finished(&self) -> bool { self.is_finished } @@ -178,3 +170,45 @@ const MF_TIMEBASE: ffmpeg::Rational = ffmpeg::Rational(1, 10_000_000); fn mf_from_mf_time(tb: Rational, stime: i64) -> i64 { unsafe { av_rescale_q(stime, MF_TIMEBASE.into(), tb.into()) } } + +pub fn set_fragmented_mp4_options( + output: &mut ffmpeg::format::context::Output, + frag_duration_us: i64, +) -> Result<(), ffmpeg::Error> { + unsafe { + let ctx = output.as_mut_ptr(); + + let movflags_key = CString::new("movflags").unwrap(); + let movflags_val = CString::new("frag_keyframe+empty_moov+default_base_moof").unwrap(); + + let ret = ffmpeg::ffi::av_opt_set( + (*ctx).priv_data, + movflags_key.as_ptr(), + movflags_val.as_ptr(), + 0, + ); + if ret < 0 { + warn!("Failed to set movflags: {}", ret); + } + + let frag_duration_key = CString::new("frag_duration").unwrap(); + let frag_duration_val = CString::new(frag_duration_us.to_string()).unwrap(); + + let ret = ffmpeg::ffi::av_opt_set( + (*ctx).priv_data, + frag_duration_key.as_ptr(), + frag_duration_val.as_ptr(), + 0, + ); + if ret < 0 { + warn!("Failed to set frag_duration: {}", ret); + } + + info!( + "Set fMP4 options: movflags=frag_keyframe+empty_moov+default_base_moof, frag_duration={}us", + frag_duration_us + ); + } + + Ok(()) +} diff --git a/crates/mediafoundation-ffmpeg/src/lib.rs b/crates/mediafoundation-ffmpeg/src/lib.rs index 07fe6e79ec..1080a2c25c 100644 --- a/crates/mediafoundation-ffmpeg/src/lib.rs +++ b/crates/mediafoundation-ffmpeg/src/lib.rs @@ -4,4 +4,4 @@ mod audio; mod h264; pub use audio::AudioExt; -pub use h264::{H264StreamMuxer, MuxerConfig}; +pub use h264::{H264StreamMuxer, MuxerConfig, set_fragmented_mp4_options}; diff --git a/crates/project/src/meta.rs b/crates/project/src/meta.rs index b86c1525d6..4dcd11aca6 100644 --- a/crates/project/src/meta.rs +++ b/crates/project/src/meta.rs @@ -20,7 +20,6 @@ pub struct VideoMeta { pub path: RelativePathBuf, #[serde(default = "legacy_static_video_fps")] pub fps: u32, - /// unix time of the first frame #[serde(default, skip_serializing_if = "Option::is_none")] pub start_time: Option, } @@ -33,7 +32,6 @@ fn legacy_static_video_fps() -> u32 { pub struct AudioMeta { #[specta(type = String)] pub path: RelativePathBuf, - /// unix time of the first frame #[serde(default, skip_serializing_if = "Option::is_none")] pub start_time: Option, } @@ -64,7 +62,6 @@ impl Default for Platform { pub struct RecordingMeta { #[serde(default)] pub platform: Option, - // this field is just for convenience, it shouldn't be persisted #[serde(skip_serializing, default)] pub project_path: PathBuf, pub pretty_name: String, @@ -92,19 +89,14 @@ pub struct VideoUploadInfo { #[serde(tag = "state")] pub enum UploadMeta { MultipartUpload { - // Cap web identifier video_id: String, - // Data for resuming file_path: PathBuf, pre_created_video: VideoUploadInfo, recording_dir: PathBuf, }, SinglePartUpload { - // Cap web identifier video_id: String, - // Path of the Cap file recording_dir: PathBuf, - // Path to video and screenshot files for resuming file_path: PathBuf, screenshot_path: PathBuf, }, @@ -126,17 +118,9 @@ impl specta::Flatten for RecordingMetaInner {} #[derive(Debug, Clone, Serialize, Deserialize, Type)] #[serde(untagged, rename_all = "camelCase")] pub enum InstantRecordingMeta { - InProgress { - // This field means nothing and is just because this enum is untagged. - recording: bool, - }, - Failed { - error: String, - }, - Complete { - fps: u32, - sample_rate: Option, - }, + InProgress { recording: bool }, + Failed { error: String }, + Complete { fps: u32, sample_rate: Option }, } impl RecordingMeta { @@ -162,7 +146,6 @@ impl RecordingMeta { pub fn project_config(&self) -> ProjectConfiguration { let mut config = ProjectConfiguration::load(&self.project_path).unwrap_or_default(); - // Try to load captions from captions.json if it exists let captions_path = self.project_path.join("captions.json"); debug!("Checking for captions at: {:?}", captions_path); @@ -288,6 +271,7 @@ pub struct MultipleSegments { #[serde(tag = "status")] pub enum StudioRecordingStatus { InProgress, + NeedsRemux, Failed { error: String }, Complete, } @@ -295,13 +279,12 @@ pub enum StudioRecordingStatus { #[derive(Debug, Clone, Serialize, Deserialize, Type)] #[serde(untagged, rename_all = "camelCase")] pub enum Cursors { - // needed for backwards compat as i wasn't strict enough with feature flagging ๐Ÿคฆ Old(HashMap), Correct(HashMap), } impl Cursors { - fn is_empty(&self) -> bool { + pub fn is_empty(&self) -> bool { match self { Cursors::Old(map) => map.is_empty(), Cursors::Correct(map) => map.is_empty(), @@ -388,7 +371,6 @@ impl MultipleSegment { let full_path = meta.path(cursor_path); - // Try to load the cursor data let mut data = match CursorEvents::load_from_file(&full_path) { Ok(data) => data, Err(e) => { @@ -459,26 +441,25 @@ mod test { test_meta_deserialize( r#"{ - "pretty_name": "Cap 2024-11-26 at 22.16.36", - "sharing": null, - "display": { - "path": "content/display.mp4" - }, - "camera": { - "path": "content/camera.mp4" - }, - "audio": { - "path": "content/audio-input.mp3" - }, - "segments": [], - "cursor": "cursor.json" - }"#, + "pretty_name": "Cap 2024-11-26 at 22.16.36", + "sharing": null, + "display": { + "path": "content/display.mp4" + }, + "camera": { + "path": "content/camera.mp4" + }, + "audio": { + "path": "content/audio-input.mp3" + }, + "segments": [], + "cursor": "cursor.json" + }"#, ); } #[test] fn multi_segment() { - // single segment test_meta_deserialize( r#"{ "pretty_name": "Cap 2024-11-26 at 22.29.30", @@ -505,36 +486,35 @@ mod test { }"#, ); - // multi segment, no cursor test_meta_deserialize( r#"{ - "pretty_name": "Cap 2024-11-26 at 22.32.26", - "sharing": null, - "segments": [ - { - "display": { - "path": "content/segments/segment-0/display.mp4" - }, - "camera": { - "path": "content/segments/segment-0/camera.mp4" - }, - "audio": { - "path": "content/segments/segment-0/audio-input.mp3" - } - }, - { - "display": { - "path": "content/segments/segment-1/display.mp4" - }, - "camera": { - "path": "content/segments/segment-1/camera.mp4" - }, - "audio": { - "path": "content/segments/segment-1/audio-input.mp3" - } - } - ] - }"#, + "pretty_name": "Cap 2024-11-26 at 22.32.26", + "sharing": null, + "segments": [ + { + "display": { + "path": "content/segments/segment-0/display.mp4" + }, + "camera": { + "path": "content/segments/segment-0/camera.mp4" + }, + "audio": { + "path": "content/segments/segment-0/audio-input.mp3" + } + }, + { + "display": { + "path": "content/segments/segment-1/display.mp4" + }, + "camera": { + "path": "content/segments/segment-1/camera.mp4" + }, + "audio": { + "path": "content/segments/segment-1/audio-input.mp3" + } + } + ] + }"#, ); } } diff --git a/crates/recording/src/capture_pipeline.rs b/crates/recording/src/capture_pipeline.rs index eea5d2d770..c308820f77 100644 --- a/crates/recording/src/capture_pipeline.rs +++ b/crates/recording/src/capture_pipeline.rs @@ -4,6 +4,11 @@ use crate::{ sources, sources::screen_capture::{self, CropBounds, ScreenCaptureFormat, ScreenCaptureTarget}, }; + +#[cfg(target_os = "macos")] +use crate::output_pipeline::{ + FragmentedAVFoundationMp4Muxer, FragmentedAVFoundationMp4MuxerConfig, +}; use anyhow::anyhow; use cap_timestamp::Timestamps; use std::{path::PathBuf, sync::Arc}; @@ -17,6 +22,13 @@ pub struct EncoderPreferences { force_software: Arc, } +#[cfg(windows)] +impl Default for EncoderPreferences { + fn default() -> Self { + Self::new() + } +} + #[cfg(windows)] impl EncoderPreferences { pub fn new() -> Self { @@ -39,6 +51,7 @@ pub trait MakeCapturePipeline: ScreenCaptureFormat + std::fmt::Debug + 'static { screen_capture: screen_capture::VideoSourceConfig, output_path: PathBuf, start_time: Timestamps, + fragmented: bool, #[cfg(windows)] encoder_preferences: EncoderPreferences, ) -> anyhow::Result where @@ -64,12 +77,28 @@ impl MakeCapturePipeline for screen_capture::CMSampleBufferCapture { screen_capture: screen_capture::VideoSourceConfig, output_path: PathBuf, start_time: Timestamps, + fragmented: bool, ) -> anyhow::Result { - OutputPipeline::builder(output_path.clone()) - .with_video::(screen_capture) - .with_timestamps(start_time) - .build::(Default::default()) - .await + if fragmented { + let fragments_dir = output_path + .parent() + .map(|p| p.join("display")) + .unwrap_or_else(|| output_path.with_file_name("display")); + + OutputPipeline::builder(fragments_dir) + .with_video::(screen_capture) + .with_timestamps(start_time) + .build::( + FragmentedAVFoundationMp4MuxerConfig::default(), + ) + .await + } else { + OutputPipeline::builder(output_path.clone()) + .with_video::(screen_capture) + .with_timestamps(start_time) + .build::(Default::default()) + .await + } } async fn make_instant_mode_pipeline( @@ -104,11 +133,21 @@ impl MakeCapturePipeline for screen_capture::Direct3DCapture { screen_capture: screen_capture::VideoSourceConfig, output_path: PathBuf, start_time: Timestamps, + fragmented: bool, encoder_preferences: EncoderPreferences, ) -> anyhow::Result { let d3d_device = screen_capture.d3d_device.clone(); - OutputPipeline::builder(output_path.clone()) + let actual_output_path = if fragmented { + output_path + .parent() + .map(|p| p.join("display.mp4")) + .unwrap_or_else(|| output_path.with_file_name("display.mp4")) + } else { + output_path.clone() + }; + + OutputPipeline::builder(actual_output_path) .with_video::(screen_capture) .with_timestamps(start_time) .build::(WindowsMuxerConfig { @@ -118,6 +157,8 @@ impl MakeCapturePipeline for screen_capture::Direct3DCapture { frame_rate: 30u32, output_size: None, encoder_preferences, + fragmented, + frag_duration_us: 2_000_000, }) .await } @@ -154,6 +195,8 @@ impl MakeCapturePipeline for screen_capture::Direct3DCapture { Height: output_resolution.1 as i32, }), encoder_preferences, + fragmented: false, + frag_duration_us: 2_000_000, }) .await } diff --git a/crates/recording/src/cursor.rs b/crates/recording/src/cursor.rs index 0f618804ec..a3701dfeaa 100644 --- a/crates/recording/src/cursor.rs +++ b/crates/recording/src/cursor.rs @@ -1,9 +1,13 @@ use cap_cursor_capture::CursorCropBounds; use cap_cursor_info::CursorShape; -use cap_project::{CursorClickEvent, CursorMoveEvent, XY}; +use cap_project::{CursorClickEvent, CursorEvents, CursorMoveEvent, XY}; use cap_timestamp::Timestamps; use futures::{FutureExt, future::Shared}; -use std::{collections::HashMap, path::PathBuf}; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + time::Instant, +}; use tokio::sync::oneshot; use tokio_util::sync::{CancellationToken, DropGuard}; @@ -37,6 +41,24 @@ impl CursorActor { } } +const CURSOR_FLUSH_INTERVAL_SECS: u64 = 5; + +fn flush_cursor_data(output_path: &Path, moves: &[CursorMoveEvent], clicks: &[CursorClickEvent]) { + let events = CursorEvents { + clicks: clicks.to_vec(), + moves: moves.to_vec(), + }; + if let Ok(json) = serde_json::to_string_pretty(&events) + && let Err(e) = std::fs::write(output_path, json) + { + tracing::error!( + "Failed to write cursor data to {}: {}", + output_path.display(), + e + ); + } +} + #[tracing::instrument(name = "cursor", skip_all)] pub fn spawn_cursor_recorder( crop_bounds: CursorCropBounds, @@ -45,6 +67,7 @@ pub fn spawn_cursor_recorder( prev_cursors: Cursors, next_cursor_id: u32, start_time: Timestamps, + output_path: Option, ) -> CursorActor { use cap_utils::spawn_actor; use device_query::{DeviceQuery, DeviceState}; @@ -66,7 +89,6 @@ pub fn spawn_cursor_recorder( let mut last_position = cap_cursor_capture::RawCursorPosition::get(); - // Create cursors directory if it doesn't exist std::fs::create_dir_all(&cursors_dir).unwrap(); let mut response = CursorActorResponse { @@ -76,6 +98,9 @@ pub fn spawn_cursor_recorder( clicks: vec![], }; + let mut last_flush = Instant::now(); + let flush_interval = Duration::from_secs(CURSOR_FLUSH_INTERVAL_SECS); + loop { let sleep = tokio::time::sleep(Duration::from_millis(10)); let Either::Right(_) = @@ -93,17 +118,14 @@ pub fn spawn_cursor_recorder( data.image.hash(&mut hasher); let id = hasher.finish(); - // Check if we've seen this cursor data before if let Some(existing_id) = response.cursors.get(&id) { existing_id.id.to_string() } else { - // New cursor data - save it let cursor_id = response.next_cursor_id.to_string(); let file_name = format!("cursor_{cursor_id}.png"); let cursor_path = cursors_dir.join(&file_name); if let Ok(image) = image::load_from_memory(&data.image) { - // Convert to RGBA let rgba_image = image.into_rgba8(); if let Err(e) = rgba_image.save(&cursor_path) { @@ -174,10 +196,21 @@ pub fn spawn_cursor_recorder( } last_mouse_state = mouse_state; + + if let Some(ref path) = output_path + && last_flush.elapsed() >= flush_interval + { + flush_cursor_data(path, &response.moves, &response.clicks); + last_flush = Instant::now(); + } } info!("cursor recorder done"); + if let Some(ref path) = output_path { + flush_cursor_data(path, &response.moves, &response.clicks); + } + let _ = tx.send(response); }); diff --git a/crates/recording/src/feeds/camera.rs b/crates/recording/src/feeds/camera.rs index 44cdaa5dc4..b652c09af1 100644 --- a/crates/recording/src/feeds/camera.rs +++ b/crates/recording/src/feeds/camera.rs @@ -565,7 +565,7 @@ async fn setup_camera( if let Ok(buffer) = unsafe { MFCreateMemoryBuffer(data_len as u32) } { let buffer_ready = { if let Ok(mut lock) = buffer.lock() { - lock.copy_from_slice(&*bytes); + lock.copy_from_slice(&bytes); true } else { false diff --git a/crates/recording/src/fragmentation/manifest.rs b/crates/recording/src/fragmentation/manifest.rs new file mode 100644 index 0000000000..1a9cbedafc --- /dev/null +++ b/crates/recording/src/fragmentation/manifest.rs @@ -0,0 +1,91 @@ +use serde::{Deserialize, Serialize}; +use std::{path::PathBuf, time::Duration}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FragmentManifest { + pub fragments: Vec, + #[serde( + with = "duration_serde", + default, + skip_serializing_if = "Option::is_none" + )] + pub total_duration: Option, + pub is_complete: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct FragmentInfo { + #[serde(with = "path_serde")] + pub path: PathBuf, + pub index: u32, + #[serde( + with = "duration_serde", + default, + skip_serializing_if = "Option::is_none" + )] + pub duration: Option, + pub is_complete: bool, +} + +impl FragmentManifest { + pub fn load_from_file(path: &PathBuf) -> std::io::Result { + let content = std::fs::read_to_string(path)?; + serde_json::from_str(&content) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e)) + } + + pub fn complete_fragments(&self) -> Vec<&FragmentInfo> { + self.fragments.iter().filter(|f| f.is_complete).collect() + } + + pub fn recoverable_duration(&self) -> Option { + let mut total = Duration::ZERO; + for fragment in self.complete_fragments() { + total += fragment.duration?; + } + Some(total) + } +} + +mod duration_serde { + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use std::time::Duration; + + pub fn serialize(duration: &Option, serializer: S) -> Result + where + S: Serializer, + { + match duration { + Some(d) => d.as_secs_f64().serialize(serializer), + None => serializer.serialize_none(), + } + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + let opt: Option = Option::deserialize(deserializer)?; + Ok(opt.map(Duration::from_secs_f64)) + } +} + +mod path_serde { + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + use std::path::{Path, PathBuf}; + + pub fn serialize(path: &Path, serializer: S) -> Result + where + S: Serializer, + { + path.to_string_lossy().serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Ok(PathBuf::from(s)) + } +} diff --git a/crates/recording/src/fragmentation/mod.rs b/crates/recording/src/fragmentation/mod.rs new file mode 100644 index 0000000000..0f40f0ecb6 --- /dev/null +++ b/crates/recording/src/fragmentation/mod.rs @@ -0,0 +1,103 @@ +mod manifest; +pub use manifest::*; + +use std::{path::PathBuf, time::Duration}; + +pub struct FragmentManager { + base_path: PathBuf, + fragment_duration: Duration, + current_index: u32, + fragments: Vec, +} + +impl FragmentManager { + pub fn new(base_path: PathBuf, duration: Duration) -> Self { + Self { + base_path, + fragment_duration: duration, + current_index: 0, + fragments: Vec::new(), + } + } + + pub fn fragment_duration(&self) -> Duration { + self.fragment_duration + } + + pub fn current_fragment_path(&self) -> PathBuf { + self.base_path + .join(format!("fragment_{:03}.mp4", self.current_index)) + } + + pub fn current_audio_fragment_path(&self) -> PathBuf { + self.base_path + .join(format!("fragment_{:03}.m4a", self.current_index)) + } + + pub fn rotate(&mut self, duration: Option, is_complete: bool) -> PathBuf { + self.fragments.push(FragmentInfo { + path: self.current_fragment_path(), + index: self.current_index, + duration, + is_complete, + }); + + self.current_index += 1; + self.current_fragment_path() + } + + pub fn current_index(&self) -> u32 { + self.current_index + } + + pub fn complete_fragments(&self) -> Vec<&FragmentInfo> { + self.fragments.iter().filter(|f| f.is_complete).collect() + } + + pub fn all_fragments(&self) -> &[FragmentInfo] { + &self.fragments + } + + pub fn mark_current_complete(&mut self, duration: Option) { + self.fragments.push(FragmentInfo { + path: self.current_fragment_path(), + index: self.current_index, + duration, + is_complete: true, + }); + } + + pub fn write_manifest(&self) -> std::io::Result<()> { + let manifest = FragmentManifest { + fragments: self.fragments.clone(), + total_duration: self.total_duration(), + is_complete: false, + }; + + let manifest_path = self.base_path.join("manifest.json"); + let json = serde_json::to_string_pretty(&manifest)?; + std::fs::write(manifest_path, json)?; + Ok(()) + } + + pub fn finalize_manifest(&self) -> std::io::Result<()> { + let manifest = FragmentManifest { + fragments: self.fragments.clone(), + total_duration: self.total_duration(), + is_complete: true, + }; + + let manifest_path = self.base_path.join("manifest.json"); + let json = serde_json::to_string_pretty(&manifest)?; + std::fs::write(manifest_path, json)?; + Ok(()) + } + + fn total_duration(&self) -> Option { + let mut total = Duration::ZERO; + for fragment in &self.fragments { + total += fragment.duration?; + } + Some(total) + } +} diff --git a/crates/recording/src/lib.rs b/crates/recording/src/lib.rs index e57f795aa6..e68b92d15e 100644 --- a/crates/recording/src/lib.rs +++ b/crates/recording/src/lib.rs @@ -2,8 +2,10 @@ pub mod benchmark; mod capture_pipeline; pub mod cursor; pub mod feeds; +pub mod fragmentation; pub mod instant_recording; mod output_pipeline; +pub mod recovery; pub mod screenshot; pub mod sources; pub mod studio_recording; diff --git a/crates/recording/src/output_pipeline/ffmpeg.rs b/crates/recording/src/output_pipeline/ffmpeg.rs index 8453479de1..1c8e2fe694 100644 --- a/crates/recording/src/output_pipeline/ffmpeg.rs +++ b/crates/recording/src/output_pipeline/ffmpeg.rs @@ -3,7 +3,10 @@ use crate::{ output_pipeline::{AudioFrame, AudioMuxer, Muxer, VideoFrame, VideoMuxer}, }; use anyhow::{Context, anyhow}; -use cap_enc_ffmpeg::{aac::AACEncoder, h264::*, ogg::*, opus::OpusEncoder}; +use cap_enc_ffmpeg::{ + aac::AACEncoder, fragmented_audio::FragmentedAudioFile, h264::*, ogg::*, opus::OpusEncoder, + segmented_audio::SegmentedAudioEncoder, +}; use cap_media_info::{AudioInfo, VideoInfo}; use cap_timestamp::Timestamp; use std::{ @@ -154,3 +157,95 @@ impl AudioMuxer for OggMuxer { Ok(self.0.queue_frame(frame.inner, timestamp)?) } } + +pub struct FragmentedAudioMuxer(FragmentedAudioFile); + +impl Muxer for FragmentedAudioMuxer { + type Config = (); + + async fn setup( + _: Self::Config, + output_path: PathBuf, + _: Option, + audio_config: Option, + _: Arc, + _: &mut TaskPool, + ) -> anyhow::Result + where + Self: Sized, + { + let audio_config = + audio_config.ok_or_else(|| anyhow!("No audio configuration provided"))?; + + Ok(Self( + FragmentedAudioFile::init(output_path, audio_config) + .map_err(|e| anyhow!("Failed to initialize fragmented audio encoder: {e}"))?, + )) + } + + fn finish(&mut self, _: Duration) -> anyhow::Result> { + self.0 + .finish() + .map_err(Into::into) + .map(|r| r.map_err(Into::into)) + } +} + +impl AudioMuxer for FragmentedAudioMuxer { + fn send_audio_frame(&mut self, frame: AudioFrame, timestamp: Duration) -> anyhow::Result<()> { + Ok(self.0.queue_frame(frame.inner, timestamp)?) + } +} + +pub struct SegmentedAudioMuxer(SegmentedAudioEncoder); + +pub struct SegmentedAudioMuxerConfig { + pub segment_duration: Duration, +} + +impl Default for SegmentedAudioMuxerConfig { + fn default() -> Self { + Self { + segment_duration: Duration::from_secs(3), + } + } +} + +impl Muxer for SegmentedAudioMuxer { + type Config = SegmentedAudioMuxerConfig; + + async fn setup( + config: Self::Config, + output_path: PathBuf, + _: Option, + audio_config: Option, + _: Arc, + _: &mut TaskPool, + ) -> anyhow::Result + where + Self: Sized, + { + let audio_config = + audio_config.ok_or_else(|| anyhow!("No audio configuration provided"))?; + + Ok(Self( + SegmentedAudioEncoder::init(output_path, audio_config, config.segment_duration) + .map_err(|e| anyhow!("Failed to initialize segmented audio encoder: {e}"))?, + )) + } + + fn finish(&mut self, timestamp: Duration) -> anyhow::Result> { + self.0 + .finish_with_timestamp(timestamp) + .map_err(Into::into) + .map(|_| Ok(())) + } +} + +impl AudioMuxer for SegmentedAudioMuxer { + fn send_audio_frame(&mut self, frame: AudioFrame, timestamp: Duration) -> anyhow::Result<()> { + self.0 + .queue_frame(frame.inner, timestamp) + .map_err(|e| anyhow!("Failed to queue audio frame: {e}")) + } +} diff --git a/crates/recording/src/output_pipeline/fragmented.rs b/crates/recording/src/output_pipeline/fragmented.rs new file mode 100644 index 0000000000..4ced9f28ca --- /dev/null +++ b/crates/recording/src/output_pipeline/fragmented.rs @@ -0,0 +1,294 @@ +#[cfg(target_os = "macos")] +use crate::{ + VideoFrame, + output_pipeline::{AudioFrame, AudioMuxer, Muxer, TaskPool, VideoMuxer}, + sources::screen_capture, +}; + +#[cfg(target_os = "macos")] +use anyhow::anyhow; +#[cfg(target_os = "macos")] +use cap_enc_avfoundation::SegmentedMP4Encoder; +#[cfg(target_os = "macos")] +use cap_media_info::{AudioInfo, VideoInfo}; +#[cfg(target_os = "macos")] +use cap_timestamp::Timestamp; +#[cfg(target_os = "macos")] +use std::{ + path::PathBuf, + sync::{Arc, atomic::AtomicBool}, + time::Duration, +}; + +#[cfg(target_os = "macos")] +pub struct FragmentedAVFoundationMp4Muxer { + inner: SegmentedMP4Encoder, + pause_flag: Arc, +} + +#[cfg(target_os = "macos")] +pub struct FragmentedAVFoundationMp4MuxerConfig { + pub output_height: Option, + pub segment_duration: Duration, +} + +#[cfg(target_os = "macos")] +impl Default for FragmentedAVFoundationMp4MuxerConfig { + fn default() -> Self { + Self { + output_height: None, + segment_duration: Duration::from_secs(3), + } + } +} + +#[cfg(target_os = "macos")] +impl FragmentedAVFoundationMp4Muxer { + const MAX_QUEUE_RETRIES: u32 = 500; +} + +#[cfg(target_os = "macos")] +#[derive(Clone)] +pub struct FragmentedNativeCameraFrame { + pub sample_buf: cidre::arc::R, + pub timestamp: Timestamp, +} + +#[cfg(target_os = "macos")] +unsafe impl Send for FragmentedNativeCameraFrame {} +#[cfg(target_os = "macos")] +unsafe impl Sync for FragmentedNativeCameraFrame {} + +#[cfg(target_os = "macos")] +impl VideoFrame for FragmentedNativeCameraFrame { + fn timestamp(&self) -> Timestamp { + self.timestamp + } +} + +#[cfg(target_os = "macos")] +impl Muxer for FragmentedAVFoundationMp4Muxer { + type Config = FragmentedAVFoundationMp4MuxerConfig; + + async fn setup( + config: Self::Config, + output_path: PathBuf, + video_config: Option, + audio_config: Option, + pause_flag: Arc, + _tasks: &mut TaskPool, + ) -> anyhow::Result { + let video_config = + video_config.ok_or_else(|| anyhow!("Invariant: No video source provided"))?; + + Ok(Self { + inner: SegmentedMP4Encoder::init( + output_path, + video_config, + audio_config, + config.output_height, + config.segment_duration, + ) + .map_err(|e| anyhow!("{e}"))?, + pause_flag, + }) + } + + fn finish(&mut self, timestamp: Duration) -> anyhow::Result> { + Ok(self.inner.finish(Some(timestamp)).map(Ok)?) + } +} + +#[cfg(target_os = "macos")] +impl VideoMuxer for FragmentedAVFoundationMp4Muxer { + type VideoFrame = screen_capture::VideoFrame; + + fn send_video_frame( + &mut self, + frame: Self::VideoFrame, + timestamp: Duration, + ) -> anyhow::Result<()> { + if self.pause_flag.load(std::sync::atomic::Ordering::Relaxed) { + self.inner.pause(); + } else { + self.inner.resume(); + } + + let mut retry_count = 0; + loop { + match self + .inner + .queue_video_frame(frame.sample_buf.clone(), timestamp) + { + Ok(()) => break, + Err(cap_enc_avfoundation::QueueFrameError::NotReadyForMore) => { + retry_count += 1; + if retry_count >= Self::MAX_QUEUE_RETRIES { + return Err(anyhow!( + "send_video_frame/timeout after {} retries", + Self::MAX_QUEUE_RETRIES + )); + } + std::thread::sleep(Duration::from_millis(2)); + continue; + } + Err(e) => return Err(anyhow!("send_video_frame/{e}")), + } + } + + Ok(()) + } +} + +#[cfg(target_os = "macos")] +impl AudioMuxer for FragmentedAVFoundationMp4Muxer { + fn send_audio_frame(&mut self, frame: AudioFrame, timestamp: Duration) -> anyhow::Result<()> { + let mut retry_count = 0; + loop { + match self.inner.queue_audio_frame(&frame.inner, timestamp) { + Ok(()) => break, + Err(cap_enc_avfoundation::QueueFrameError::NotReadyForMore) => { + retry_count += 1; + if retry_count >= Self::MAX_QUEUE_RETRIES { + return Err(anyhow!( + "send_audio_frame/retries_exceeded after {} retries", + Self::MAX_QUEUE_RETRIES + )); + } + std::thread::sleep(Duration::from_millis(2)); + continue; + } + Err(e) => return Err(anyhow!("send_audio_frame/{e}")), + } + } + + Ok(()) + } +} + +#[cfg(target_os = "macos")] +pub struct FragmentedAVFoundationCameraMuxer { + inner: SegmentedMP4Encoder, + pause_flag: Arc, +} + +#[cfg(target_os = "macos")] +pub struct FragmentedAVFoundationCameraMuxerConfig { + pub output_height: Option, + pub segment_duration: Duration, +} + +#[cfg(target_os = "macos")] +impl Default for FragmentedAVFoundationCameraMuxerConfig { + fn default() -> Self { + Self { + output_height: None, + segment_duration: Duration::from_secs(3), + } + } +} + +#[cfg(target_os = "macos")] +impl FragmentedAVFoundationCameraMuxer { + const MAX_QUEUE_RETRIES: u32 = 500; +} + +#[cfg(target_os = "macos")] +impl Muxer for FragmentedAVFoundationCameraMuxer { + type Config = FragmentedAVFoundationCameraMuxerConfig; + + async fn setup( + config: Self::Config, + output_path: PathBuf, + video_config: Option, + audio_config: Option, + pause_flag: Arc, + _tasks: &mut TaskPool, + ) -> anyhow::Result { + let video_config = + video_config.ok_or_else(|| anyhow!("Invariant: No video source provided"))?; + + Ok(Self { + inner: SegmentedMP4Encoder::init( + output_path, + video_config, + audio_config, + config.output_height, + config.segment_duration, + ) + .map_err(|e| anyhow!("{e}"))?, + pause_flag, + }) + } + + fn finish(&mut self, timestamp: Duration) -> anyhow::Result> { + Ok(self.inner.finish(Some(timestamp)).map(Ok)?) + } +} + +#[cfg(target_os = "macos")] +impl VideoMuxer for FragmentedAVFoundationCameraMuxer { + type VideoFrame = crate::output_pipeline::NativeCameraFrame; + + fn send_video_frame( + &mut self, + frame: Self::VideoFrame, + timestamp: Duration, + ) -> anyhow::Result<()> { + if self.pause_flag.load(std::sync::atomic::Ordering::Relaxed) { + self.inner.pause(); + } else { + self.inner.resume(); + } + + let mut retry_count = 0; + loop { + match self + .inner + .queue_video_frame(frame.sample_buf.clone(), timestamp) + { + Ok(()) => break, + Err(cap_enc_avfoundation::QueueFrameError::NotReadyForMore) => { + retry_count += 1; + if retry_count >= Self::MAX_QUEUE_RETRIES { + return Err(anyhow!( + "send_video_frame/timeout after {} retries", + Self::MAX_QUEUE_RETRIES + )); + } + std::thread::sleep(Duration::from_millis(2)); + continue; + } + Err(e) => return Err(anyhow!("send_video_frame/{e}")), + } + } + + Ok(()) + } +} + +#[cfg(target_os = "macos")] +impl AudioMuxer for FragmentedAVFoundationCameraMuxer { + fn send_audio_frame(&mut self, frame: AudioFrame, timestamp: Duration) -> anyhow::Result<()> { + let mut retry_count = 0; + loop { + match self.inner.queue_audio_frame(&frame.inner, timestamp) { + Ok(()) => break, + Err(cap_enc_avfoundation::QueueFrameError::NotReadyForMore) => { + retry_count += 1; + if retry_count >= Self::MAX_QUEUE_RETRIES { + return Err(anyhow!( + "send_audio_frame/retries_exceeded after {} retries", + Self::MAX_QUEUE_RETRIES + )); + } + std::thread::sleep(Duration::from_millis(2)); + continue; + } + Err(e) => return Err(anyhow!("send_audio_frame/{e}")), + } + } + + Ok(()) + } +} diff --git a/crates/recording/src/output_pipeline/mod.rs b/crates/recording/src/output_pipeline/mod.rs index dece16d9ab..f141fefe51 100644 --- a/crates/recording/src/output_pipeline/mod.rs +++ b/crates/recording/src/output_pipeline/mod.rs @@ -1,10 +1,12 @@ mod async_camera; mod core; pub mod ffmpeg; +mod fragmented; pub use async_camera::*; pub use core::*; pub use ffmpeg::*; +pub use fragmented::*; #[cfg(target_os = "macos")] mod macos; diff --git a/crates/recording/src/output_pipeline/win.rs b/crates/recording/src/output_pipeline/win.rs index 4948303236..c06076bfe8 100644 --- a/crates/recording/src/output_pipeline/win.rs +++ b/crates/recording/src/output_pipeline/win.rs @@ -72,7 +72,6 @@ impl PauseTracker { } } -/// Muxes to MP4 using a combination of FFmpeg and Media Foundation pub struct WindowsMuxer { video_tx: SyncSender>, output: Arc>, @@ -87,6 +86,8 @@ pub struct WindowsMuxerConfig { pub bitrate_multiplier: f32, pub output_size: Option, pub encoder_preferences: crate::capture_pipeline::EncoderPreferences, + pub fragmented: bool, + pub frag_duration_us: i64, } impl Muxer for WindowsMuxer { @@ -110,9 +111,15 @@ impl Muxer for WindowsMuxer { Height: video_config.height as i32, }; let output_size = config.output_size.unwrap_or(input_size); + let fragmented = config.fragmented; + let frag_duration_us = config.frag_duration_us; let (video_tx, video_rx) = sync_channel::>(8); let mut output = ffmpeg::format::output(&output_path)?; + + if fragmented { + cap_mediafoundation_ffmpeg::set_fragmented_mp4_options(&mut output, frag_duration_us)?; + } let audio_encoder = audio_config .map(|config| AACEncoder::init(config, &mut output)) .transpose()?; @@ -162,7 +169,7 @@ impl Muxer for WindowsMuxer { cap_enc_ffmpeg::h264::H264Encoder::builder(video_config) .with_output_size(fallback_width, fallback_height) - .and_then(|builder| builder.build(&mut *output_guard)) + .and_then(|builder| builder.build(&mut output_guard)) .map(either::Right) .map_err(|e| anyhow!("ScreenSoftwareEncoder/{e}")) }; @@ -212,12 +219,14 @@ impl Muxer for WindowsMuxer { }; cap_mediafoundation_ffmpeg::H264StreamMuxer::new( - &mut *output_guard, + &mut output_guard, cap_mediafoundation_ffmpeg::MuxerConfig { width, height, fps: config.frame_rate, bitrate: encoder.bitrate(), + fragmented, + frag_duration_us, }, ) }; @@ -421,9 +430,20 @@ pub struct WindowsCameraMuxer { pause: PauseTracker, } -#[derive(Default)] pub struct WindowsCameraMuxerConfig { pub output_height: Option, + pub fragmented: bool, + pub frag_duration_us: i64, +} + +impl Default for WindowsCameraMuxerConfig { + fn default() -> Self { + Self { + output_height: None, + fragmented: false, + frag_duration_us: 2_000_000, + } + } } impl Muxer for WindowsCameraMuxer { @@ -460,10 +480,17 @@ impl Muxer for WindowsCameraMuxer { let frame_rate = video_config.fps(); let bitrate_multiplier = 0.2; + let fragmented = config.fragmented; + let frag_duration_us = config.frag_duration_us; let (video_tx, video_rx) = sync_channel::>(30); let mut output = ffmpeg::format::output(&output_path)?; + + if fragmented { + cap_mediafoundation_ffmpeg::set_fragmented_mp4_options(&mut output, frag_duration_us)?; + } + let audio_encoder = audio_config .map(|config| AACEncoder::init(config, &mut output)) .transpose()?; @@ -521,12 +548,14 @@ impl Muxer for WindowsCameraMuxer { }; cap_mediafoundation_ffmpeg::H264StreamMuxer::new( - &mut *output_guard, + &mut output_guard, cap_mediafoundation_ffmpeg::MuxerConfig { width: output_width, height: output_height, fps: frame_rate, bitrate: encoder.bitrate(), + fragmented, + frag_duration_us, }, ) }; @@ -596,7 +625,7 @@ impl Muxer for WindowsCameraMuxer { return Ok(None); }; frame_count += 1; - if frame_count % 30 == 0 { + if frame_count.is_multiple_of(30) { debug!( "Windows camera encoder: processed {} frames", frame_count @@ -610,7 +639,7 @@ impl Muxer for WindowsCameraMuxer { |output_sample| { let mut output = output.lock().unwrap(); let _ = muxer - .write_sample(&output_sample, &mut *output) + .write_sample(&output_sample, &mut output) .map_err(|e| format!("WriteSample: {e}")); Ok(()) }, diff --git a/crates/recording/src/recovery.rs b/crates/recording/src/recovery.rs new file mode 100644 index 0000000000..dac11d84a3 --- /dev/null +++ b/crates/recording/src/recovery.rs @@ -0,0 +1,741 @@ +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; + +use cap_enc_ffmpeg::remux::{ + concatenate_audio_to_ogg, concatenate_video_fragments, get_media_duration, get_video_fps, + probe_media_valid, probe_video_can_decode, +}; +use cap_project::{ + AudioMeta, Cursors, MultipleSegment, MultipleSegments, ProjectConfiguration, RecordingMeta, + RecordingMetaInner, StudioRecordingMeta, StudioRecordingStatus, TimelineConfiguration, + TimelineSegment, VideoMeta, +}; +use relative_path::RelativePathBuf; +use tracing::{debug, info, warn}; + +#[derive(Debug, Clone)] +pub struct IncompleteRecording { + pub project_path: PathBuf, + pub meta: RecordingMeta, + pub recoverable_segments: Vec, + pub estimated_duration: Duration, +} + +#[derive(Debug, Clone)] +pub struct RecoverableSegment { + pub index: u32, + pub display_fragments: Vec, + pub camera_fragments: Option>, + pub mic_fragments: Option>, + pub system_audio_fragments: Option>, + pub cursor_path: Option, +} + +#[derive(Debug, Clone)] +pub struct RecoveredRecording { + pub project_path: PathBuf, + pub meta: StudioRecordingMeta, +} + +#[derive(Debug, thiserror::Error)] +pub enum RecoveryError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Failed to concatenate video fragments: {0}")] + VideoConcat(cap_enc_ffmpeg::remux::RemuxError), + #[error("Failed to concatenate audio fragments: {0}")] + AudioConcat(cap_enc_ffmpeg::remux::RemuxError), + #[error("Failed to serialize meta: {0}")] + Serialize(#[from] serde_json::Error), + #[error("No recoverable segments found")] + NoRecoverableSegments, + #[error("Meta save failed")] + MetaSave, + #[error("Recovered video is not playable: {0}")] + UnplayableVideo(String), +} + +pub struct RecoveryManager; + +impl RecoveryManager { + pub fn find_incomplete(recordings_dir: &Path) -> Vec { + let mut incomplete = Vec::new(); + + let Ok(entries) = std::fs::read_dir(recordings_dir) else { + return incomplete; + }; + + for entry in entries.flatten() { + let path = entry.path(); + if !path.is_dir() { + continue; + } + + if !path.join("recording-meta.json").exists() { + continue; + } + + let Ok(meta) = RecordingMeta::load_for_project(&path) else { + continue; + }; + + if let Some(studio_meta) = meta.studio_meta() + && matches!( + studio_meta.status(), + StudioRecordingStatus::InProgress + | StudioRecordingStatus::NeedsRemux + | StudioRecordingStatus::Failed { .. } + ) + && let Some(incomplete_recording) = Self::analyze_incomplete(&path, &meta) + { + incomplete.push(incomplete_recording); + } + } + + incomplete + } + + fn analyze_incomplete( + project_path: &Path, + meta: &RecordingMeta, + ) -> Option { + let content_dir = project_path.join("content"); + let segments_dir = content_dir.join("segments"); + + if !segments_dir.exists() { + debug!("No segments directory found at {:?}", segments_dir); + return None; + } + + let mut recoverable_segments = Vec::new(); + let mut total_duration = Duration::ZERO; + + let mut segment_dirs: Vec<_> = std::fs::read_dir(&segments_dir) + .ok()? + .filter_map(|e| e.ok()) + .filter(|e| e.path().is_dir()) + .collect(); + + segment_dirs.sort_by_key(|e| e.file_name()); + + for (index, segment_entry) in segment_dirs.iter().enumerate() { + let segment_path = segment_entry.path(); + + let display_dir = segment_path.join("display"); + let mut display_fragments = Self::find_complete_fragments(&display_dir); + + if display_fragments.is_empty() + && let Some(display_mp4) = + Self::probe_single_file(&segment_path.join("display.mp4")) + { + display_fragments = vec![display_mp4]; + } + + if display_fragments.is_empty() { + debug!( + "No display fragments found for segment {} at {:?}", + index, segment_path + ); + continue; + } + + let camera_dir = segment_path.join("camera"); + let camera_fragments = { + let frags = Self::find_complete_fragments(&camera_dir); + if frags.is_empty() { + Self::probe_single_file(&segment_path.join("camera.mp4")).map(|p| vec![p]) + } else { + Some(frags) + } + }; + + let mic_fragments = Self::find_audio_fragments(&segment_path.join("audio-input")); + let system_audio_fragments = + Self::find_audio_fragments(&segment_path.join("system_audio")); + + if let Some(duration) = Self::estimate_fragments_duration(&display_fragments) { + total_duration += duration; + } + + let cursor_path = Self::probe_cursor(&segment_path.join("cursor.json")); + + recoverable_segments.push(RecoverableSegment { + index: index as u32, + display_fragments, + camera_fragments, + mic_fragments, + system_audio_fragments, + cursor_path, + }); + } + + if recoverable_segments.is_empty() { + info!("No recoverable segments found in {:?}", project_path); + return None; + } + + info!( + "Found {} recoverable segments in {:?} with estimated duration {:?}", + recoverable_segments.len(), + project_path, + total_duration + ); + + Some(IncompleteRecording { + project_path: project_path.to_path_buf(), + meta: meta.clone(), + recoverable_segments, + estimated_duration: total_duration, + }) + } + + fn find_complete_fragments(dir: &Path) -> Vec { + let manifest_path = dir.join("manifest.json"); + + if manifest_path.exists() + && let Ok(content) = std::fs::read_to_string(&manifest_path) + && let Ok(manifest) = serde_json::from_str::(&content) + && let Some(fragments) = manifest.get("fragments").and_then(|f| f.as_array()) + { + let result: Vec = fragments + .iter() + .filter(|f| { + f.get("is_complete") + .and_then(|c| c.as_bool()) + .unwrap_or(false) + }) + .filter_map(|f| f.get("path").and_then(|p| p.as_str())) + .map(|p| dir.join(p)) + .filter(|p| p.exists()) + .collect(); + + if !result.is_empty() { + return result; + } + } + + Self::probe_fragments_in_dir(dir) + } + + fn probe_fragments_in_dir(dir: &Path) -> Vec { + let Ok(entries) = std::fs::read_dir(dir) else { + return Vec::new(); + }; + + let mut fragments: Vec<_> = entries + .filter_map(|e| e.ok()) + .map(|e| e.path()) + .filter(|p| { + p.extension() + .map(|e| e == "mp4" || e == "m4a" || e == "ogg") + .unwrap_or(false) + }) + .filter(|p| probe_media_valid(p)) + .collect(); + + fragments.sort(); + fragments + } + + fn probe_single_file(path: &Path) -> Option { + if path.exists() && probe_media_valid(path) { + Some(path.to_path_buf()) + } else { + None + } + } + + fn find_audio_fragments(base_path: &Path) -> Option> { + let dir_fragments = Self::find_complete_fragments(base_path); + if !dir_fragments.is_empty() { + return Some(dir_fragments); + } + + let ogg_path = base_path.with_extension("ogg"); + if let Some(p) = Self::probe_single_file(&ogg_path) { + return Some(vec![p]); + } + + let m4a_path = base_path.with_extension("m4a"); + if let Some(p) = Self::probe_single_file(&m4a_path) { + return Some(vec![p]); + } + + let mp3_path = base_path.with_extension("mp3"); + Self::probe_single_file(&mp3_path).map(|p| vec![p]) + } + + fn probe_cursor(path: &Path) -> Option { + if path.exists() { + Some(path.to_path_buf()) + } else { + None + } + } + + fn estimate_fragments_duration(fragments: &[PathBuf]) -> Option { + let mut total = Duration::ZERO; + + for fragment in fragments { + if let Some(duration) = get_media_duration(fragment) { + total += duration; + } + } + + if total.is_zero() { None } else { Some(total) } + } + + pub fn recover(recording: &IncompleteRecording) -> Result { + if recording.recoverable_segments.is_empty() { + return Err(RecoveryError::NoRecoverableSegments); + } + + for segment in &recording.recoverable_segments { + let segment_dir = recording + .project_path + .join("content/segments") + .join(format!("segment-{}", segment.index)); + + let display_output = segment_dir.join("display.mp4"); + if segment.display_fragments.len() == 1 { + let source = &segment.display_fragments[0]; + if source != &display_output { + info!("Moving single display fragment to {:?}", display_output); + std::fs::rename(source, &display_output)?; + let display_dir = segment_dir.join("display"); + if display_dir.exists() { + let _ = std::fs::remove_dir_all(&display_dir); + } + } + } else if segment.display_fragments.len() > 1 { + info!( + "Concatenating {} display fragments to {:?}", + segment.display_fragments.len(), + display_output + ); + concatenate_video_fragments(&segment.display_fragments, &display_output) + .map_err(RecoveryError::VideoConcat)?; + + for fragment in &segment.display_fragments { + let _ = std::fs::remove_file(fragment); + } + let display_dir = segment_dir.join("display"); + if display_dir.exists() { + let _ = std::fs::remove_dir_all(&display_dir); + } + } + + if let Some(camera_frags) = &segment.camera_fragments { + let camera_output = segment_dir.join("camera.mp4"); + if camera_frags.len() == 1 { + let source = &camera_frags[0]; + if source != &camera_output { + info!("Moving single camera fragment to {:?}", camera_output); + std::fs::rename(source, &camera_output)?; + let camera_dir = segment_dir.join("camera"); + if camera_dir.exists() { + let _ = std::fs::remove_dir_all(&camera_dir); + } + } + } else if camera_frags.len() > 1 { + info!( + "Concatenating {} camera fragments to {:?}", + camera_frags.len(), + camera_output + ); + concatenate_video_fragments(camera_frags, &camera_output) + .map_err(RecoveryError::VideoConcat)?; + + for fragment in camera_frags { + let _ = std::fs::remove_file(fragment); + } + let camera_dir = segment_dir.join("camera"); + if camera_dir.exists() { + let _ = std::fs::remove_dir_all(&camera_dir); + } + } + } + + if let Some(mic_frags) = &segment.mic_fragments { + let mic_output = segment_dir.join("audio-input.ogg"); + if mic_frags.len() == 1 { + let source = &mic_frags[0]; + let is_ogg = source.extension().map(|e| e == "ogg").unwrap_or(false); + if source != &mic_output { + if is_ogg { + info!("Moving single mic fragment to {:?}", mic_output); + std::fs::rename(source, &mic_output)?; + } else { + info!("Transcoding single mic fragment to {:?}", mic_output); + concatenate_audio_to_ogg(mic_frags, &mic_output) + .map_err(RecoveryError::AudioConcat)?; + let _ = std::fs::remove_file(source); + } + let mic_dir = segment_dir.join("audio-input"); + if mic_dir.exists() { + let _ = std::fs::remove_dir_all(&mic_dir); + } + } + } else if mic_frags.len() > 1 { + info!( + "Concatenating {} mic fragments to {:?}", + mic_frags.len(), + mic_output + ); + concatenate_audio_to_ogg(mic_frags, &mic_output) + .map_err(RecoveryError::AudioConcat)?; + + for fragment in mic_frags { + let _ = std::fs::remove_file(fragment); + } + let mic_dir = segment_dir.join("audio-input"); + if mic_dir.exists() { + let _ = std::fs::remove_dir_all(&mic_dir); + } + } + } + + if let Some(system_frags) = &segment.system_audio_fragments { + let system_output = segment_dir.join("system_audio.ogg"); + if system_frags.len() == 1 { + let source = &system_frags[0]; + let is_ogg = source.extension().map(|e| e == "ogg").unwrap_or(false); + if source != &system_output { + if is_ogg { + info!("Moving single system audio fragment to {:?}", system_output); + std::fs::rename(source, &system_output)?; + } else { + info!( + "Transcoding single system audio fragment to {:?}", + system_output + ); + concatenate_audio_to_ogg(system_frags, &system_output) + .map_err(RecoveryError::AudioConcat)?; + let _ = std::fs::remove_file(source); + } + let system_dir = segment_dir.join("system_audio"); + if system_dir.exists() { + let _ = std::fs::remove_dir_all(&system_dir); + } + } + } else if system_frags.len() > 1 { + info!( + "Concatenating {} system audio fragments to {:?}", + system_frags.len(), + system_output + ); + concatenate_audio_to_ogg(system_frags, &system_output) + .map_err(RecoveryError::AudioConcat)?; + + for fragment in system_frags { + let _ = std::fs::remove_file(fragment); + } + let system_dir = segment_dir.join("system_audio"); + if system_dir.exists() { + let _ = std::fs::remove_dir_all(&system_dir); + } + } + } + } + + for segment in &recording.recoverable_segments { + let segment_dir = recording + .project_path + .join("content/segments") + .join(format!("segment-{}", segment.index)); + + let display_output = segment_dir.join("display.mp4"); + if display_output.exists() { + info!("Validating recovered display video: {:?}", display_output); + match probe_video_can_decode(&display_output) { + Ok(true) => { + info!("Display video validation passed"); + } + Ok(false) => { + return Err(RecoveryError::UnplayableVideo(format!( + "Display video has no decodable frames: {:?}", + display_output + ))); + } + Err(e) => { + return Err(RecoveryError::UnplayableVideo(format!( + "Display video validation failed for {:?}: {}", + display_output, e + ))); + } + } + } + + let camera_output = segment_dir.join("camera.mp4"); + if camera_output.exists() { + info!("Validating recovered camera video: {:?}", camera_output); + match probe_video_can_decode(&camera_output) { + Ok(true) => { + info!("Camera video validation passed"); + } + Ok(false) => { + warn!( + "Camera video has no decodable frames, removing: {:?}", + camera_output + ); + let _ = std::fs::remove_file(&camera_output); + } + Err(e) => { + warn!( + "Camera video validation failed for {:?}: {}, removing", + camera_output, e + ); + let _ = std::fs::remove_file(&camera_output); + } + } + } + } + + let meta = Self::build_recovered_meta(recording)?; + + let mut recording_meta = recording.meta.clone(); + recording_meta.inner = RecordingMetaInner::Studio(meta.clone()); + recording_meta + .save_for_project() + .map_err(|_| RecoveryError::MetaSave)?; + + Self::create_project_config(recording, &meta)?; + + info!( + "Successfully recovered recording at {:?}", + recording.project_path + ); + + Ok(RecoveredRecording { + project_path: recording.project_path.clone(), + meta, + }) + } + + fn build_recovered_meta( + recording: &IncompleteRecording, + ) -> Result { + let segments: Vec = recording + .recoverable_segments + .iter() + .map(|seg| { + let segment_base = format!("content/segments/segment-{}", seg.index); + let segment_dir = recording.project_path.join(&segment_base); + + let display_path = segment_dir.join("display.mp4"); + let fps = get_video_fps(&display_path).unwrap_or(30); + + let camera_path = segment_dir.join("camera.mp4"); + let mic_path = segment_dir.join("audio-input.ogg"); + let system_audio_path = segment_dir.join("system_audio.ogg"); + let cursor_path = segment_dir.join("cursor.json"); + + MultipleSegment { + display: VideoMeta { + path: RelativePathBuf::from(format!("{}/display.mp4", segment_base)), + fps, + start_time: None, + }, + camera: if camera_path.exists() { + Some(VideoMeta { + path: RelativePathBuf::from(format!("{}/camera.mp4", segment_base)), + fps: 30, + start_time: None, + }) + } else { + None + }, + mic: if mic_path.exists() { + Some(AudioMeta { + path: RelativePathBuf::from(format!( + "{}/audio-input.ogg", + segment_base + )), + start_time: None, + }) + } else { + None + }, + system_audio: if system_audio_path.exists() { + Some(AudioMeta { + path: RelativePathBuf::from(format!( + "{}/system_audio.ogg", + segment_base + )), + start_time: None, + }) + } else { + None + }, + cursor: if cursor_path.exists() { + Some(RelativePathBuf::from(format!( + "{}/cursor.json", + segment_base + ))) + } else { + None + }, + } + }) + .collect(); + + let existing_cursors = Self::load_existing_cursors(&recording.project_path); + + Ok(StudioRecordingMeta::MultipleSegments { + inner: MultipleSegments { + segments, + cursors: existing_cursors, + status: Some(StudioRecordingStatus::Complete), + }, + }) + } + + fn create_project_config( + recording: &IncompleteRecording, + meta: &StudioRecordingMeta, + ) -> Result<(), RecoveryError> { + let StudioRecordingMeta::MultipleSegments { inner, .. } = meta else { + return Ok(()); + }; + + let timeline_segments: Vec = inner + .segments + .iter() + .enumerate() + .filter_map(|(i, segment)| { + let segment_base = format!("content/segments/segment-{}", i); + let display_path = recording + .project_path + .join(&segment_base) + .join("display.mp4"); + + let duration = get_media_duration(&display_path) + .map(|d| d.as_secs_f64()) + .unwrap_or_else(|| { + let fps = segment.display.fps as f64; + if fps > 0.0 { + recording.estimated_duration.as_secs_f64() + / recording.recoverable_segments.len() as f64 + } else { + 5.0 + } + }); + + if duration <= 0.0 { + return None; + } + + Some(TimelineSegment { + recording_clip: i as u32, + start: 0.0, + end: duration, + timescale: 1.0, + }) + }) + .collect(); + + if timeline_segments.is_empty() { + warn!("No valid timeline segments could be created"); + return Ok(()); + } + + let mut config = ProjectConfiguration::load(&recording.project_path).unwrap_or_default(); + + config.timeline = Some(TimelineConfiguration { + segments: timeline_segments, + zoom_segments: Vec::new(), + scene_segments: Vec::new(), + mask_segments: Vec::new(), + text_segments: Vec::new(), + }); + + config + .write(&recording.project_path) + .map_err(RecoveryError::Io)?; + + info!("Created project configuration with timeline for recovered recording"); + + Ok(()) + } + + fn load_existing_cursors(project_path: &Path) -> Cursors { + let cursors_dir = project_path.join("content/cursors"); + if !cursors_dir.exists() { + return Cursors::default(); + } + + if let Ok(meta) = RecordingMeta::load_for_project(project_path) + && let Some(StudioRecordingMeta::MultipleSegments { inner, .. }) = meta.studio_meta() + && !inner.cursors.is_empty() + { + return inner.cursors.clone(); + } + + Self::scan_cursor_images(&cursors_dir) + } + + fn scan_cursor_images(cursors_dir: &Path) -> Cursors { + let Ok(entries) = std::fs::read_dir(cursors_dir) else { + return Cursors::default(); + }; + + let mut cursors = std::collections::HashMap::new(); + + for entry in entries.flatten() { + let path = entry.path(); + if path.extension().map(|e| e == "png").unwrap_or(false) + && let Some(file_name) = path.file_stem().and_then(|s| s.to_str()) + && let Some(id_str) = file_name.strip_prefix("cursor_") + && let Some(full_file_name) = path.file_name().and_then(|n| n.to_str()) + { + let relative_path = RelativePathBuf::from("content/cursors").join(full_file_name); + + cursors.insert( + id_str.to_string(), + cap_project::CursorMeta { + image_path: relative_path, + hotspot: cap_project::XY::new(0.0, 0.0), + shape: None, + }, + ); + + info!( + "Recovered cursor {} from image file: {:?}", + id_str, + path.file_name() + ); + } + } + + if cursors.is_empty() { + Cursors::default() + } else { + Cursors::Correct(cursors) + } + } + + pub fn discard(recording: &IncompleteRecording) -> std::io::Result<()> { + warn!( + "Discarding incomplete recording at {:?}", + recording.project_path + ); + std::fs::remove_dir_all(&recording.project_path) + } + + pub fn mark_needs_remux(project_path: &Path) -> Result<(), RecoveryError> { + let mut meta = + RecordingMeta::load_for_project(project_path).map_err(|_| RecoveryError::MetaSave)?; + + if let RecordingMetaInner::Studio(StudioRecordingMeta::MultipleSegments { inner, .. }) = + &mut meta.inner + { + inner.status = Some(StudioRecordingStatus::NeedsRemux); + meta.save_for_project() + .map_err(|_| RecoveryError::MetaSave)?; + } + + Ok(()) + } +} diff --git a/crates/recording/src/screenshot.rs b/crates/recording/src/screenshot.rs index 21754e4aeb..f711f5eaaa 100644 --- a/crates/recording/src/screenshot.rs +++ b/crates/recording/src/screenshot.rs @@ -369,7 +369,7 @@ fn capture_bitmap_with( let mut data = std::ptr::null_mut(); let bitmap = - unsafe { CreateDIBSection(Some(mem_dc), &mut info, DIB_RGB_COLORS, &mut data, None, 0) }; + unsafe { CreateDIBSection(Some(mem_dc), &info, DIB_RGB_COLORS, &mut data, None, 0) }; let bitmap = match bitmap { Ok(b) if !b.0.is_null() && !data.is_null() => b, _ => { diff --git a/crates/recording/src/sources/screen_capture/mod.rs b/crates/recording/src/sources/screen_capture/mod.rs index 0353d5bb57..536507b7e4 100644 --- a/crates/recording/src/sources/screen_capture/mod.rs +++ b/crates/recording/src/sources/screen_capture/mod.rs @@ -305,8 +305,8 @@ impl ScreenCaptureConfig { let target_refresh = validated_refresh_rate(display.refresh_rate()); let fps = std::cmp::max(1, std::cmp::min(max_fps, target_refresh)); - let output_size = crop_bounds - .and_then(|b| { + let output_size: PhysicalSize = crop_bounds + .and_then(|b| -> Option { #[cfg(target_os = "macos")] { let logical_size = b.size(); @@ -317,8 +317,10 @@ impl ScreenCaptureConfig { )) } - #[cfg(windows)] - Some(b.size().map(|v| (v / 2.0).floor() * 2.0)) + #[cfg(target_os = "windows")] + { + Some(b.size().map(|v| (v / 2.0).floor() * 2.0)) + } }) .or_else(|| display.physical_size()) .ok_or(ScreenCaptureInitError::NoBounds)?; diff --git a/crates/recording/src/sources/screen_capture/windows.rs b/crates/recording/src/sources/screen_capture/windows.rs index bb9b3513ff..a62d59e3ce 100644 --- a/crates/recording/src/sources/screen_capture/windows.rs +++ b/crates/recording/src/sources/screen_capture/windows.rs @@ -25,7 +25,6 @@ use tracing::*; // const WINDOW_DURATION: Duration = Duration::from_secs(3); // const LOG_INTERVAL: Duration = Duration::from_secs(5); -const MAX_DROP_RATE_THRESHOLD: f64 = 0.25; #[derive(Debug)] pub struct Direct3DCapture; diff --git a/crates/recording/src/studio_recording.rs b/crates/recording/src/studio_recording.rs index 19e9bcbef4..592679b8b4 100644 --- a/crates/recording/src/studio_recording.rs +++ b/crates/recording/src/studio_recording.rs @@ -5,20 +5,26 @@ use crate::{ }, cursor::{CursorActor, Cursors, spawn_cursor_recorder}, feeds::{camera::CameraFeedLock, microphone::MicrophoneFeedLock}, - ffmpeg::OggMuxer, + ffmpeg::{OggMuxer, SegmentedAudioMuxer, SegmentedAudioMuxerConfig}, output_pipeline::{DoneFut, FinishedOutputPipeline, OutputPipeline, PipelineDoneError}, screen_capture::ScreenCaptureConfig, sources::{self, screen_capture}, }; #[cfg(target_os = "macos")] -use crate::output_pipeline::{AVFoundationCameraMuxer, AVFoundationCameraMuxerConfig}; +use crate::output_pipeline::{ + AVFoundationCameraMuxer, AVFoundationCameraMuxerConfig, FragmentedAVFoundationCameraMuxer, + FragmentedAVFoundationCameraMuxerConfig, +}; #[cfg(windows)] use crate::output_pipeline::{WindowsCameraMuxer, WindowsCameraMuxerConfig}; use anyhow::{Context as _, anyhow, bail}; use cap_media_info::VideoInfo; -use cap_project::{CursorEvents, StudioRecordingMeta}; +use cap_project::{ + CursorEvents, MultipleSegments, Platform, RecordingMeta, RecordingMetaInner, + StudioRecordingMeta, StudioRecordingStatus, +}; use cap_timestamp::{Timestamp, Timestamps}; use futures::{FutureExt, StreamExt, future::OptionFuture, stream::FuturesUnordered}; use kameo::{Actor as _, prelude::*}; @@ -139,6 +145,7 @@ impl Message for Actor { self.recording_dir.clone(), std::mem::take(&mut self.segments), cursors, + self.segment_factory.fragmented, ) .await?; @@ -442,6 +449,7 @@ pub struct ActorBuilder { mic_feed: Option>, camera_feed: Option>, custom_cursor: bool, + fragmented: bool, #[cfg(target_os = "macos")] excluded_windows: Vec, } @@ -455,6 +463,7 @@ impl ActorBuilder { mic_feed: None, camera_feed: None, custom_cursor: false, + fragmented: false, #[cfg(target_os = "macos")] excluded_windows: Vec::new(), } @@ -480,6 +489,11 @@ impl ActorBuilder { self } + pub fn with_fragmented(mut self, fragmented: bool) -> Self { + self.fragmented = fragmented; + self + } + #[cfg(target_os = "macos")] pub fn with_excluded_windows(mut self, excluded_windows: Vec) -> Self { self.excluded_windows = excluded_windows; @@ -503,6 +517,7 @@ impl ActorBuilder { excluded_windows: self.excluded_windows, }, self.custom_cursor, + self.fragmented, ) .await } @@ -513,6 +528,7 @@ async fn spawn_studio_recording_actor( recording_dir: PathBuf, base_inputs: RecordingBaseInputs, custom_cursor_capture: bool, + fragmented: bool, ) -> anyhow::Result { ensure_dir(&recording_dir)?; @@ -542,10 +558,15 @@ async fn spawn_studio_recording_actor( cursors_dir, base_inputs.clone(), custom_cursor_capture, + fragmented, start_time, completion_tx.clone(), ); + if fragmented { + write_in_progress_meta(&recording_dir)?; + } + let index = 0; let pipeline = segment_pipeline_factory .create_next(Default::default(), 0) @@ -590,6 +611,7 @@ async fn stop_recording( recording_dir: PathBuf, segments: Vec, cursors: Cursors, + fragmented: bool, ) -> Result { use cap_project::*; @@ -597,44 +619,61 @@ async fn stop_recording( RelativePathBuf::from_path(path.strip_prefix(&recording_dir).unwrap()).unwrap() }; + let segment_metas: Vec<_> = futures::stream::iter(segments) + .then(async |s| { + let to_start_time = |timestamp: Timestamp| { + timestamp + .duration_since(s.pipeline.start_time) + .as_secs_f64() + }; + + MultipleSegment { + display: VideoMeta { + path: make_relative(&s.pipeline.screen.path), + fps: s.pipeline.screen.video_info.unwrap().fps(), + start_time: Some(to_start_time(s.pipeline.screen.first_timestamp)), + }, + camera: s.pipeline.camera.map(|camera| VideoMeta { + path: make_relative(&camera.path), + fps: camera.video_info.unwrap().fps(), + start_time: Some(to_start_time(camera.first_timestamp)), + }), + mic: s.pipeline.microphone.map(|mic| AudioMeta { + path: make_relative(&mic.path), + start_time: Some(to_start_time(mic.first_timestamp)), + }), + system_audio: s.pipeline.system_audio.map(|audio| AudioMeta { + path: make_relative(&audio.path), + start_time: Some(to_start_time(audio.first_timestamp)), + }), + cursor: s + .pipeline + .cursor + .as_ref() + .map(|cursor| make_relative(&cursor.output_path)), + } + }) + .collect::>() + .await; + + let needs_remux = if fragmented { + segment_metas.iter().any(|seg| { + let display_path = seg.display.path.to_path(&recording_dir); + display_path.is_dir() + }) + } else { + false + }; + + let status = if needs_remux { + Some(StudioRecordingStatus::NeedsRemux) + } else { + Some(StudioRecordingStatus::Complete) + }; + let meta = StudioRecordingMeta::MultipleSegments { inner: MultipleSegments { - segments: futures::stream::iter(segments) - .then(async |s| { - let to_start_time = |timestamp: Timestamp| { - timestamp - .duration_since(s.pipeline.start_time) - .as_secs_f64() - }; - - MultipleSegment { - display: VideoMeta { - path: make_relative(&s.pipeline.screen.path), - fps: s.pipeline.screen.video_info.unwrap().fps(), - start_time: Some(to_start_time(s.pipeline.screen.first_timestamp)), - }, - camera: s.pipeline.camera.map(|camera| VideoMeta { - path: make_relative(&camera.path), - fps: camera.video_info.unwrap().fps(), - start_time: Some(to_start_time(camera.first_timestamp)), - }), - mic: s.pipeline.microphone.map(|mic| AudioMeta { - path: make_relative(&mic.path), - start_time: Some(to_start_time(mic.first_timestamp)), - }), - system_audio: s.pipeline.system_audio.map(|audio| AudioMeta { - path: make_relative(&audio.path), - start_time: Some(to_start_time(audio.first_timestamp)), - }), - cursor: s - .pipeline - .cursor - .as_ref() - .map(|cursor| make_relative(&cursor.output_path)), - } - }) - .collect::>() - .await, + segments: segment_metas, cursors: cap_project::Cursors::Correct( cursors .into_values() @@ -651,7 +690,7 @@ async fn stop_recording( }) .collect(), ), - status: Some(StudioRecordingStatus::Complete), + status, }, }; @@ -674,6 +713,7 @@ struct SegmentPipelineFactory { cursors_dir: PathBuf, base_inputs: RecordingBaseInputs, custom_cursor_capture: bool, + fragmented: bool, start_time: Timestamps, index: u32, completion_tx: watch::Sender>>, @@ -688,6 +728,7 @@ impl SegmentPipelineFactory { cursors_dir: PathBuf, base_inputs: RecordingBaseInputs, custom_cursor_capture: bool, + fragmented: bool, start_time: Timestamps, completion_tx: watch::Sender>>, ) -> Self { @@ -696,6 +737,7 @@ impl SegmentPipelineFactory { cursors_dir, base_inputs, custom_cursor_capture, + fragmented, start_time, index: 0, completion_tx, @@ -717,6 +759,7 @@ impl SegmentPipelineFactory { cursors, next_cursors_id, self.custom_cursor_capture, + self.fragmented, self.start_time, #[cfg(windows)] self.encoder_preferences.clone(), @@ -785,6 +828,7 @@ async fn create_segment_pipeline( prev_cursors: Cursors, next_cursors_id: u32, custom_cursor_capture: bool, + fragmented: bool, start_time: Timestamps, #[cfg(windows)] encoder_preferences: crate::capture_pipeline::EncoderPreferences, ) -> anyhow::Result { @@ -823,6 +867,7 @@ async fn create_segment_pipeline( capture_source, screen_output_path.clone(), start_time, + fragmented, #[cfg(windows)] encoder_preferences, ) @@ -831,16 +876,29 @@ async fn create_segment_pipeline( .context("screen pipeline setup")?; #[cfg(target_os = "macos")] - let camera = OptionFuture::from(base_inputs.camera_feed.map(|camera_feed| { - OutputPipeline::builder(dir.join("camera.mp4")) - .with_video::(camera_feed) - .with_timestamps(start_time) - .build::(AVFoundationCameraMuxerConfig::default()) - .instrument(error_span!("camera-out")) - })) - .await - .transpose() - .context("camera pipeline setup")?; + let camera = if let Some(camera_feed) = base_inputs.camera_feed { + let pipeline = if fragmented { + let fragments_dir = dir.join("camera"); + OutputPipeline::builder(fragments_dir) + .with_video::(camera_feed) + .with_timestamps(start_time) + .build::( + FragmentedAVFoundationCameraMuxerConfig::default(), + ) + .instrument(error_span!("camera-out")) + .await + } else { + OutputPipeline::builder(dir.join("camera.mp4")) + .with_video::(camera_feed) + .with_timestamps(start_time) + .build::(AVFoundationCameraMuxerConfig::default()) + .instrument(error_span!("camera-out")) + .await + }; + Some(pipeline.context("camera pipeline setup")?) + } else { + None + }; #[cfg(windows)] let camera = OptionFuture::from(base_inputs.camera_feed.map(|camera_feed| { @@ -854,27 +912,49 @@ async fn create_segment_pipeline( .transpose() .context("camera pipeline setup")?; - let microphone = OptionFuture::from(base_inputs.mic_feed.map(|mic_feed| { - OutputPipeline::builder(dir.join("audio-input.ogg")) - .with_audio_source::(mic_feed) - .with_timestamps(start_time) - .build::(()) - .instrument(error_span!("mic-out")) - })) - .await - .transpose() - .context("microphone pipeline setup")?; + let microphone = if let Some(mic_feed) = base_inputs.mic_feed { + let pipeline = if fragmented { + let fragments_dir = dir.join("audio-input"); + OutputPipeline::builder(fragments_dir) + .with_audio_source::(mic_feed) + .with_timestamps(start_time) + .build::(SegmentedAudioMuxerConfig::default()) + .instrument(error_span!("mic-out")) + .await + } else { + OutputPipeline::builder(dir.join("audio-input.ogg")) + .with_audio_source::(mic_feed) + .with_timestamps(start_time) + .build::(()) + .instrument(error_span!("mic-out")) + .await + }; + Some(pipeline.context("microphone pipeline setup")?) + } else { + None + }; - let system_audio = OptionFuture::from(system_audio.map(|system_audio| { - OutputPipeline::builder(dir.join("system_audio.ogg")) - .with_audio_source::(system_audio) - .with_timestamps(start_time) - .build::(()) - .instrument(error_span!("system-audio-out")) - })) - .await - .transpose() - .context("system audio pipeline setup")?; + let system_audio = if let Some(system_audio_source) = system_audio { + let pipeline = if fragmented { + let fragments_dir = dir.join("system_audio"); + OutputPipeline::builder(fragments_dir) + .with_audio_source::(system_audio_source) + .with_timestamps(start_time) + .build::(SegmentedAudioMuxerConfig::default()) + .instrument(error_span!("system-audio-out")) + .await + } else { + OutputPipeline::builder(dir.join("system_audio.ogg")) + .with_audio_source::(system_audio_source) + .with_timestamps(start_time) + .build::(()) + .instrument(error_span!("system-audio-out")) + .await + }; + Some(pipeline.context("system audio pipeline setup")?) + } else { + None + }; let cursor = custom_cursor_capture .then(move || { @@ -883,6 +963,13 @@ async fn create_segment_pipeline( .cursor_crop() .ok_or(CreateSegmentPipelineError::NoBounds)?; + let cursor_output_path = dir.join("cursor.json"); + let incremental_output = if fragmented { + Some(cursor_output_path.clone()) + } else { + None + }; + let cursor = spawn_cursor_recorder( cursor_crop_bounds, display, @@ -890,10 +977,11 @@ async fn create_segment_pipeline( prev_cursors, next_cursors_id, start_time, + incremental_output, ); Ok::<_, CreateSegmentPipelineError>(CursorPipeline { - output_path: dir.join("cursor.json"), + output_path: cursor_output_path, actor: cursor, }) }) @@ -922,3 +1010,27 @@ fn current_time_f64() -> f64 { .unwrap() .as_secs_f64() } + +fn write_in_progress_meta(recording_dir: &Path) -> anyhow::Result<()> { + use chrono::Local; + + let pretty_name = Local::now().format("Cap %Y-%m-%d at %H.%M.%S").to_string(); + + let meta = RecordingMeta { + platform: Some(Platform::default()), + project_path: recording_dir.to_path_buf(), + pretty_name, + sharing: None, + inner: RecordingMetaInner::Studio(StudioRecordingMeta::MultipleSegments { + inner: MultipleSegments { + segments: Vec::new(), + cursors: cap_project::Cursors::default(), + status: Some(StudioRecordingStatus::InProgress), + }, + }), + upload: None, + }; + + meta.save_for_project() + .map_err(|e| anyhow!("Failed to save in-progress meta: {:?}", e)) +} diff --git a/crates/rendering/src/decoder/avassetreader.rs b/crates/rendering/src/decoder/avassetreader.rs index f4ad42eb77..a36dd4e086 100644 --- a/crates/rendering/src/decoder/avassetreader.rs +++ b/crates/rendering/src/decoder/avassetreader.rs @@ -216,6 +216,9 @@ impl AVAssetReaderDecoder { } }; + let video_width = this.inner.width(); + let video_height = this.inner.height(); + let mut cache = BTreeMap::::new(); #[allow(unused)] @@ -374,15 +377,24 @@ impl AVAssetReaderDecoder { this.is_done = true; - // not inlining this is important so that last_sent_frame is dropped before the sender is invoked let last_sent_frame = last_sent_frame.borrow().clone(); - if let Some((sender, last_sent_frame)) = sender.take().zip(last_sent_frame) { - // info!( - // "sending hail mary frame {} for {requested_frame}", - // last_sent_frame.0 - // ); - - (sender)(last_sent_frame); + if let Some(sender) = sender.take() { + if let Some(last_sent_frame) = last_sent_frame { + (sender)(last_sent_frame); + } else { + tracing::debug!( + "No frames available for request {requested_frame}, sending black frame" + ); + let black_frame_data = + vec![0u8; (video_width * video_height * 4) as usize]; + let black_frame = ProcessedFrame { + number: requested_frame, + data: Arc::new(black_frame_data), + width: video_width, + height: video_height, + }; + (sender)(black_frame); + } } } } diff --git a/crates/rendering/src/decoder/ffmpeg.rs b/crates/rendering/src/decoder/ffmpeg.rs index 5a25eca6fc..52cc65af5c 100644 --- a/crates/rendering/src/decoder/ffmpeg.rs +++ b/crates/rendering/src/decoder/ffmpeg.rs @@ -81,6 +81,8 @@ impl FfmpegDecoder { let time_base = this.decoder().time_base(); let start_time = this.start_time(); + let video_width = this.decoder().width(); + let video_height = this.decoder().height(); let mut cache = BTreeMap::::new(); // active frame is a frame that triggered decode. @@ -240,16 +242,24 @@ impl FfmpegDecoder { } } - // not inlining this is important so that last_sent_frame is dropped before the sender is invoked let last_sent_frame = last_sent_frame.borrow().clone(); - if let Some((sender, last_sent_frame)) = sender.take().zip(last_sent_frame) - { - // info!( - // "sending hail mary frame {} for {requested_frame}", - // last_sent_frame.0 - // ); - - (sender)(last_sent_frame); + if let Some(sender) = sender.take() { + if let Some(last_sent_frame) = last_sent_frame { + (sender)(last_sent_frame); + } else { + debug!( + "No frames available for request {requested_frame}, sending black frame" + ); + let black_frame_data = + vec![0u8; (video_width * video_height * 4) as usize]; + let black_frame = ProcessedFrame { + number: requested_frame, + data: Arc::new(black_frame_data), + width: video_width, + height: video_height, + }; + (sender)(black_frame); + } } } } diff --git a/crates/rendering/src/decoder/mod.rs b/crates/rendering/src/decoder/mod.rs index 47eb1358fb..61f48e5968 100644 --- a/crates/rendering/src/decoder/mod.rs +++ b/crates/rendering/src/decoder/mod.rs @@ -80,6 +80,8 @@ pub async fn spawn_decoder( let handle = AsyncVideoDecoderHandle { sender: tx, offset }; + let path_display = path.display().to_string(); + if cfg!(target_os = "macos") { #[cfg(target_os = "macos")] avassetreader::AVAssetReaderDecoder::spawn(name, path, fps, rx, ready_tx); @@ -88,5 +90,12 @@ pub async fn spawn_decoder( .map_err(|e| format!("'{name}' decoder / {e}"))?; } - ready_rx.await.map_err(|e| e.to_string())?.map(|()| handle) + match tokio::time::timeout(std::time::Duration::from_secs(30), ready_rx).await { + Ok(result) => result + .map_err(|e| format!("'{name}' decoder channel closed: {e}"))? + .map(|()| handle), + Err(_) => Err(format!( + "'{name}' decoder timed out after 30s initializing: {path_display}" + )), + } } diff --git a/crates/rendering/src/lib.rs b/crates/rendering/src/lib.rs index fa31c5c4b5..f95158a90b 100644 --- a/crates/rendering/src/lib.rs +++ b/crates/rendering/src/lib.rs @@ -37,7 +37,7 @@ mod zoom; pub use coord::*; pub use decoder::DecodedFrame; pub use frame_pipeline::RenderedFrame; -pub use project_recordings::{ProjectRecordingsMeta, SegmentRecordings}; +pub use project_recordings::{ProjectRecordingsMeta, SegmentRecordings, Video}; use mask::interpolate_masks; use scene::*; @@ -211,6 +211,8 @@ impl RecordingSegmentDecoders { pub enum RenderingError { #[error("No GPU adapter found")] NoAdapter, + #[error("No segments available in recording")] + NoSegments, #[error(transparent)] RequestDeviceFailed(#[from] wgpu::RequestDeviceError), #[error("Failed to wait for buffer mapping")] @@ -366,9 +368,11 @@ impl RenderVideoConstants { recording_meta: RecordingMeta, meta: StudioRecordingMeta, ) -> Result { + let first_segment = segments.first().ok_or(RenderingError::NoSegments)?; + let options = RenderOptions { - screen_size: XY::new(segments[0].display.width, segments[0].display.height), - camera_size: segments[0] + screen_size: XY::new(first_segment.display.width, first_segment.display.height), + camera_size: first_segment .camera .as_ref() .map(|c| XY::new(c.width, c.height)), diff --git a/crates/video-decode/src/avassetreader.rs b/crates/video-decode/src/avassetreader.rs index dc2e53a6c4..8d2c72cd9f 100644 --- a/crates/video-decode/src/avassetreader.rs +++ b/crates/video-decode/src/avassetreader.rs @@ -137,6 +137,14 @@ impl AVAssetReaderDecoder { track_output: &mut self.track_output, } } + + pub fn width(&self) -> u32 { + self.width + } + + pub fn height(&self) -> u32 { + self.height + } } pub struct FramesIter<'a> {