diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs index 96a7f2bb16..1e225041a3 100644 --- a/apps/desktop/src-tauri/src/api.rs +++ b/apps/desktop/src-tauri/src/api.rs @@ -52,7 +52,6 @@ pub async fn upload_multipart_presign_part( video_id: &str, upload_id: &str, part_number: u32, - md5_sum: &str, ) -> Result { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -68,7 +67,6 @@ pub async fn upload_multipart_presign_part( "videoId": video_id, "uploadId": upload_id, "partNumber": part_number, - "md5Sum": md5_sum })) }) .await diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 06e37f0822..4f4527615b 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -14,7 +14,7 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{Stream, StreamExt, TryStreamExt, future::join, stream}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; @@ -629,17 +629,32 @@ fn multipart_uploader( try_stream! { let mut stream = pin!(stream); let mut prev_part_number = None; + let mut expected_part_number = 1u32; + + loop { + let (Some(item), presigned_url) = join( + stream.next(), + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) + ).await else { + break; + }; + let mut presigned_url = presigned_url?; + - while let Some(item) = stream.next().await { let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); prev_part_number = Some(part_number); let md5_sum = base64::encode(md5::compute(&chunk).0); let size = chunk.len(); - let presigned_url = - api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, &md5_sum) - .await?; + // We prefetched for the wrong chunk. Let's try again. + if expected_part_number != part_number { + presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await? + } trace!("Uploading part {part_number}"); @@ -671,6 +686,8 @@ fn multipart_uploader( size, total_size }; + + expected_part_number = part_number + 1; } debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed()); diff --git a/apps/web/app/api/upload/[...route]/multipart.ts b/apps/web/app/api/upload/[...route]/multipart.ts index 45194169d8..5045ae40ec 100644 --- a/apps/web/app/api/upload/[...route]/multipart.ts +++ b/apps/web/app/api/upload/[...route]/multipart.ts @@ -141,18 +141,19 @@ app.post( .object({ uploadId: z.string(), partNumber: z.number(), - md5Sum: z.string(), }) .and( z.union([ z.object({ videoId: z.string() }), // deprecated z.object({ fileKey: z.string() }), + // deprecated + // z.object({ md5Sum: z.string() }), ]), ), ), async (c) => { - const { uploadId, partNumber, md5Sum, ...body } = c.req.valid("json"); + const { uploadId, partNumber, ...body } = c.req.valid("json"); const user = c.get("user"); const fileKey = parseVideoIdOrFileKey(user.id, { @@ -174,7 +175,6 @@ app.post( fileKey, uploadId, partNumber, - { ContentMD5: md5Sum }, ); return presignedUrl;