Skip to content

Commit

Permalink
Merge pull request #11 from cyrinux/feat/error-handling
Browse files Browse the repository at this point in the history
Exit app if one of the threads exits
  • Loading branch information
cyrinux authored Nov 6, 2023
2 parents 6cf192b + 1df1572 commit 775a046
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 37 deletions.
24 changes: 12 additions & 12 deletions src/libinput/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use input::event::keyboard::KeyboardEventTrait;
use input::{Libinput, LibinputInterface};
use itertools::Itertools;
use libc::{O_RDWR, O_WRONLY};
use log::{debug, error, trace};
use log::{debug, trace};
use std::error::Error;
use std::fs::{File, OpenOptions};
use std::io;
Expand Down Expand Up @@ -79,20 +79,20 @@ impl Controller {

libinput_context.dispatch()?;

match is_paused.lock() {
Ok(is_paused) if is_running == *is_paused => {
is_running = !is_running;
let is_paused_now = is_paused.lock().map_err(|err| {
format!("Deadlock in libinput checking if we are paused: {err:?}")
})?;

// Toggle mute on pause/resume
tx.send(is_running)?;
if is_running == *is_paused_now {
is_running = !is_running;

// ignore final events that happened just before the resume signal
if is_running {
libinput_context.by_ref().for_each(drop);
}
// Toggle mute on pause/resume
tx.send(is_running)?;

// ignore final events that happened just before the resume signal
if is_running {
libinput_context.by_ref().for_each(drop);
}
Err(err) => error!("Deadlock in libinput checking if we are paused: {err:?}"),
_ => (),
}

for event in libinput_context.by_ref() {
Expand Down
64 changes: 42 additions & 22 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use std::fs::OpenOptions;
use std::path::PathBuf;
use std::process::Command;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use std::sync::mpsc::Sender;
use std::sync::{mpsc, Mutex};
use std::time::Duration;
use std::{
sync::{atomic::AtomicBool, Arc},
Expand Down Expand Up @@ -51,27 +52,30 @@ fn main() -> Result<(), Box<dyn Error>> {
// Initialize logging
setup_logging();

let (tx_exit, rx_exit) = mpsc::channel();

// Register UNIX signals for pause
let is_paused = Arc::new(Mutex::new(false));
register_signal(is_paused.clone())?;
register_signal(tx_exit.clone(), is_paused.clone())?;

let libinput_ctl = libinput::Controller::new()?;
let (pulseaudio_ctl, tx_libinput) = pulseaudio::Controller::new();

// Start set source thread
let is_paused_pulseaudio = is_paused.clone();
thread::spawn(move || {
pulseaudio_ctl
.run(is_paused_pulseaudio)
.expect("Error in pulseaudio thread");
let tx_exit_pulseaudio = tx_exit.clone();
run_in_thread(tx_exit.clone(), move || {
pulseaudio_ctl.run(tx_exit_pulseaudio, is_paused_pulseaudio)
});

// Init libinput
run_in_thread(tx_exit.clone(), move || {
libinput::Controller::new()?.run(tx_libinput, is_paused)
});

// Start the application
info!("Push2talk started");

// Init libinput
libinput_ctl.run(tx_libinput, is_paused)?;

rx_exit.recv()?;
Ok(())
}

Expand All @@ -97,28 +101,44 @@ fn setup_logging() {
);
}

fn register_signal(is_paused: Arc<Mutex<bool>>) -> Result<(), Box<dyn Error>> {
fn run_in_thread<F>(tx_exit: Sender<bool>, f: F)
where
F: FnOnce() -> Result<(), Box<dyn Error>> + Send + 'static,
{
thread::spawn(move || {
if let Err(err) = f() {
error!("Error in thread: {err:?}");
if let Err(err) = tx_exit.send(true) {
error!("Unable to send exit signal from thread: {err:?}");
}
}
});
}

fn register_signal(
tx_exit: Sender<bool>,
is_paused: Arc<Mutex<bool>>,
) -> Result<(), Box<dyn Error>> {
let sig_pause = Arc::new(AtomicBool::new(false));

flag::register(signal_hook::consts::SIGUSR1, Arc::clone(&sig_pause))
.map_err(|e| format!("Unable to register SIGUSR1 signal: {e}"))?;

thread::spawn(move || loop {
run_in_thread(tx_exit, move || loop {
if !sig_pause.swap(false, Ordering::Relaxed) {
thread::sleep(Duration::from_millis(250));
continue;
}

match is_paused.lock() {
Ok(mut lock) => {
*lock = !*lock;
info!(
"Received SIGUSR1 signal, {}",
if *lock { "pausing" } else { "resuming" }
);
}
Err(err) => error!("Deadlock in handling UNIX signal: {err:?}"),
}
let mut lock = is_paused
.lock()
.map_err(|err| format!("Deadlock in handling UNIX signal: {err:?}"))?;

*lock = !*lock;
info!(
"Received SIGUSR1 signal, {}",
if *lock { "pausing" } else { "resuming" }
);
});

Ok(())
Expand Down
13 changes: 10 additions & 3 deletions src/pulseaudio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ impl Controller {
)
}

pub fn run(&self, is_paused: Arc<Mutex<bool>>) -> Result<(), Box<dyn Error>> {
pub fn run(
&self,
tx_exit: Sender<bool>,
is_paused: Arc<Mutex<bool>>,
) -> Result<(), Box<dyn Error>> {
let mut mainloop = Mainloop::new().ok_or("Failed to create mainloop")?;

let mut context =
Expand Down Expand Up @@ -60,13 +64,16 @@ impl Controller {
context.set_subscribe_callback(Some(Box::new(move |facility, operation, _index| {
match (is_paused.lock(), facility, operation) {
(Err(err), _, _) => {
error!("Deadlock in pulseaudio checking if we are paused: {err:?}")
error!("Deadlock in pulseaudio checking if we are paused: {err:?}");
if let Err(err) = tx_exit.send(true) {
error!("Unable to send exit signal from pulseaudio callback: {err:?}");
}
}
(Ok(is_paused), _, _) if *is_paused => (),
(_, Some(Facility::Card), Some(Operation::Changed))
| (_, Some(Facility::Card), Some(Operation::Removed))
| (_, Some(Facility::Card), Some(Operation::New)) => {
trace!("Card change, new or removed device, muting");
trace!("Card changed, added or removed device => muting");
if let Err(err) = tx.send(true) {
error!("Can't mute devices, ignoring...: {err}");
};
Expand Down

0 comments on commit 775a046

Please sign in to comment.