WARNING: THIS SITE IS A MIRROR OF GITHUB.COM / IT CANNOT LOGIN OR REGISTER ACCOUNTS / THE CONTENTS ARE PROVIDED AS-IS / THIS SITE ASSUMES NO RESPONSIBILITY FOR ANY DISPLAYED CONTENT OR LINKS / IF YOU FOUND SOMETHING MAY NOT GOOD FOR EVERYONE, CONTACT ADMIN AT ilovescratch@foxmail.com
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion screenpipe-app-tauri/src-tauri/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions screenpipe-audio/src/audio_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,52 @@ impl AudioManager {
pub async fn enabled_devices(&self) -> HashSet<String> {
self.options.read().await.enabled_devices.clone()
}

/// Check and maintain audio recording health by reconnecting disconnected devices
/// Returns true if all devices are healthy, false if issues persist after reconnection attempts
pub async fn maintain_recording_health(&self) -> bool {
if self.status().await != AudioManagerStatus::Running {
return false;
}

let enabled_devices = self.enabled_devices().await;
let current_devices = self.current_devices();

// Check if we have the expected number of recording handles
if current_devices.len() != enabled_devices.len() {
warn!("Audio health check: expected {} devices, found {} recording handles",
enabled_devices.len(), current_devices.len());
}

let mut all_healthy = true;

// Check each device and attempt reconnection if needed
for device in &current_devices {
// Check if recording handle is still alive
if let Some(handle_mutex) = self.recording_handles.get(device) {
let handle = handle_mutex.lock().await;
if handle.is_finished() {
warn!("Recording handle for device {} has finished, attempting reconnection", device);
drop(handle); // Release the lock before reconnection

// Attempt to reconnect the device
if let Err(e) = self.device_manager.check_and_reconnect_device(device).await {
error!("Failed to reconnect device {}: {}", device, e);
all_healthy = false;
}
}
} else {
// Try to restart this specific device
warn!("No recording handle found for device {}, attempting to restart", device);
if let Err(e) = self.start_device(device).await {
error!("Failed to restart device {}: {}", device, e);
all_healthy = false;
}
}
}

all_healthy
}
}

impl Drop for AudioManager {
Expand Down
35 changes: 17 additions & 18 deletions screenpipe-audio/src/core/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,31 +137,30 @@ impl AudioStream {

fn create_error_callback(
device_name: String,
is_running_weak: std::sync::Weak<AtomicBool>,
_is_running_weak: std::sync::Weak<AtomicBool>,
is_disconnected: Arc<AtomicBool>,
stream_control_tx: mpsc::Sender<StreamControl>,
) -> impl FnMut(StreamError) + Send + 'static {
move |err: StreamError| {
if err
.to_string()
.contains("The requested device is no longer available")
{
warn!(
"audio device {} disconnected. stopping recording.",
device_name
);
let error_msg = err.to_string();

// Log the error but don't immediately give up
warn!("Audio stream error for device {}: {}", device_name, error_msg);

// Only mark as disconnected for truly terminal errors
if error_msg.contains("The requested device is no longer available")
|| error_msg.contains("device is no longer valid")
|| error_msg.contains("device not found")
|| error_msg.contains("No such device") {
warn!("Device {} permanently unavailable, marking as disconnected", device_name);
is_disconnected.store(true, Ordering::Relaxed);
stream_control_tx
.send(StreamControl::Stop(oneshot::channel().0))
.unwrap();
is_disconnected.store(true, Ordering::Relaxed);
.ok(); // Don't panic if channel is closed
} else {
error!("an error occurred on the audio stream: {}", err);
if err.to_string().contains("device is no longer valid") {
warn!("audio device disconnected. stopping recording.");
if let Some(arc) = is_running_weak.upgrade() {
arc.store(false, Ordering::Relaxed);
}
}
// For temporary errors (audio glitches, driver hiccups), just log and continue
// The stream should recover automatically in most cases
warn!("Temporary audio error for device {}, continuing: {}", device_name, error_msg);
}
}
}
Expand Down
77 changes: 64 additions & 13 deletions screenpipe-audio/src/device/device_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tracing::info;
use tracing::{info, warn, error};

pub struct DeviceManager {
streams: Arc<DashMap<AudioDevice, Arc<AudioStream>>>,
Expand Down Expand Up @@ -36,21 +36,47 @@ impl DeviceManager {
return Err(anyhow!("Device {} already running.", device));
}

let is_running = Arc::new(AtomicBool::new(false));
let stream =
// Try to start with resilience for macOS audio system quirks
self.start_device_with_retry(device, 3).await
}

async fn start_device_with_retry(&self, device: &AudioDevice, max_attempts: u32) -> Result<()> {
let mut last_error = None;

for attempt in 1..=max_attempts {
let is_running = Arc::new(AtomicBool::new(false));

match AudioStream::from_device(Arc::new(device.clone()), is_running.clone()).await {
Ok(stream) => stream,
Ok(stream) => {
info!("Starting recording for device: {} (attempt {})", device, attempt);
self.streams.insert(device.clone(), Arc::new(stream));
self.states.insert(device.clone(), is_running);
return Ok(());
}
Err(e) => {
return Err(e);
last_error = Some(anyhow!("{}", e));

// Check if this is a retryable error
let error_msg = e.to_string().to_lowercase();
if error_msg.contains("device not found")
|| error_msg.contains("no such device")
|| error_msg.contains("device is no longer available") {
// Device genuinely doesn't exist, don't retry
return Err(e);
}

if attempt < max_attempts {
// For potentially temporary issues (audio system busy, driver loading, etc.)
let delay = std::time::Duration::from_millis(100 * attempt as u64);
warn!("Failed to start device {} (attempt {}): {}. Retrying in {:?}...",
device, attempt, e, delay);
tokio::time::sleep(delay).await;
}
}
};

info!("starting recording for device: {}", device);

self.streams.insert(device.clone(), Arc::new(stream));
self.states.insert(device.clone(), is_running);

Ok(())
}
}

Err(last_error.unwrap_or_else(|| anyhow!("Failed to start device after {} attempts", max_attempts)))
}

pub fn stream(&self, device: &AudioDevice) -> Option<Arc<AudioStream>> {
Expand Down Expand Up @@ -96,6 +122,31 @@ impl DeviceManager {
Ok(())
}

/// Check if device is disconnected and attempt reconnection
pub async fn check_and_reconnect_device(&self, device: &AudioDevice) -> Result<bool> {
if let Some(stream) = self.stream(device) {
if stream.is_disconnected() {
warn!("Device {} is disconnected, attempting reconnection", device);

// Stop the old stream
let _ = self.stop_device(device).await;

// Try to reconnect
match self.start_device_with_retry(device, 3).await {
Ok(_) => {
info!("Successfully reconnected device {}", device);
return Ok(true);
}
Err(e) => {
error!("Failed to reconnect device {}: {}", device, e);
return Err(e);
}
}
}
}
Ok(false) // Not disconnected or no stream found
}

pub fn is_running_mut(&self, device: &AudioDevice) -> Option<Arc<AtomicBool>> {
self.states.get(device).map(|s| s.value().clone())
}
Expand Down
43 changes: 43 additions & 0 deletions screenpipe-audio/src/transcription/kyutai/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use anyhow::Result;
use candle::Device;
use std::sync::Arc;
use tokio::sync::Mutex;

use crate::kyutai::model::Model; // This is the Kyutai Model you already have

pub struct KyutaiSttEngine {
model: Model,
}

impl KyutaiSttEngine {
pub fn new(cpu: bool, repo: &str) -> Result<Self> {
let device = if cpu {
Device::Cpu
} else if candle::utils::cuda_is_available() {
Device::new_cuda(0)?
} else {
Device::Cpu
};

let args = crate::kyutai::Args {
in_file: "".to_string(),
hf_repo: repo.to_string(),
cpu,
timestamps: false,
vad: false,
};

let model = Model::load_from_hf(&args, &device)?;
Ok(Self { model })
}

pub fn transcribe(&mut self, audio: &[f32], sample_rate: u32) -> Result<String> {
let pcm = if sample_rate != 24_000 {
kaudio::resample(audio, sample_rate as usize, 24_000)?
} else {
audio.to_vec()
};

self.model.transcribe(pcm)
}
}
23 changes: 23 additions & 0 deletions screenpipe-server/src/bin/screenpipe-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,29 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(Duration::from_secs(10)).await;
audio_manager_clone.start().await.unwrap();
});

// Continuous connection maintenance for macOS stability
// Instead of restarting everything, just reconnect individual failed devices
#[cfg(target_os = "macos")]
{
let audio_manager_monitor = audio_manager.clone();
tokio::spawn(async move {
// Wait for initial startup
tokio::time::sleep(Duration::from_secs(60)).await;

let mut maintenance_interval = tokio::time::interval(Duration::from_secs(30)); // Check every 30 seconds
maintenance_interval.tick().await; // Skip the first tick

loop {
maintenance_interval.tick().await;

// Maintain connection health by reconnecting failed devices
if !audio_manager_monitor.maintain_recording_health().await {
warn!("Some audio devices required reconnection");
}
}
});
}
}

// Start pipes
Expand Down
Loading