diff --git a/Cargo.toml b/Cargo.toml index 332a798..65fbccf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,6 @@ pretty_env_logger = "0.4.0" reqwest = { version = "0.11", default-features = false } rppal = { git = "https://github.com/golemparts/rppal/", rev = "2e980caf76756c97bb0b18fb3ab08fb51ed1f90e" } smallvec = "1.4.2" -tokio = { version = "1", features = ["macros"] } +tokio = { version = "1", features = ["macros", "process"] } tokio-stream = "0.1.2" warp = "0.3" diff --git a/src/main.rs b/src/main.rs index e2cc652..2186c58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,13 @@ -use std::cell::{Cell, RefCell}; +use std::cell::Cell; use std::convert::Infallible; -use std::process; use std::sync::{Arc, Mutex}; use std::time::Duration; -use futures_util::{FutureExt, select, stream, StreamExt}; +use futures_util::{FutureExt, join, select, stream, StreamExt}; #[allow(unused_imports)] use log::{debug, error, info, trace}; use rppal::gpio::{Gpio, Trigger}; use smallvec::SmallVec; -use tokio::{ sync::mpsc, +use tokio::{ process, sync::mpsc, time::{interval, sleep} }; use tokio_stream::wrappers::{IntervalStream, ReceiverStream}; use warp::{Filter, sse}; @@ -37,17 +36,33 @@ async fn main() { events_clients.lock().unwrap().push(tx); let reqwest = &reqwest::Client::new(); - let audio_child = &RefCell::new(None); + let audio_busy = &Cell::new(false); let hue_busy = &Cell::new(false); - let pushes = ReceiverStream::new(rx).for_each_concurrent(2, |()| async move { - if !audio_busy(&audio_child) { - play_chime(&audio_child) - } else { debug!("doorbell still ringing, not playing new chime"); } + let pushes = ReceiverStream::new(rx).for_each_concurrent(3, |()| async move { + // Process up to three concurrent button presses. A second press + // may arrive when only one of either the chime or flash has + // completed and so dispatch the other. A third press then + // arriving at this time should not get stuck in the channel, it + // should be consumed without effect. + // + // A shorter way to think of this is that the number of concurrent + // presses should be one for every async block being joined here, + // plus one more to report that they're all busy. + join! { + async { + if !audio_busy.replace(true) { + play_chime().await; + audio_busy.set(false); + } else { debug!("doorbell still ringing, not playing new chime"); } + }, - if !hue_busy.replace(true) { - flash_porch(&reqwest).await; - hue_busy.set(false); - } else { debug!("hue already in use, not scheduling new flashing"); } + async { + if !hue_busy.replace(true) { + flash_porch(&reqwest).await; + hue_busy.set(false); + } else { debug!("hue already in use, not scheduling new flashing"); } + } + }; }); select! { @@ -106,25 +121,27 @@ async fn warp(clients: Arc; CHANNEL_VEC_SIZE]>> warp::serve(routes).run(([0, 0, 0, 0], 8060)).await; } -fn audio_busy(audio_child: &RefCell>) -> bool { - if let Some(ref mut audio_child) = *audio_child.borrow_mut() { - if let Ok(None) = audio_child.try_wait() { - return true; +async fn play_chime() { + trace!("Playing doorbell chime"); + let child = process::Command::new("mplayer") + .arg(OUTDOOR_CHIME_FILE) + .stdout(std::process::Stdio::null()) + .stderr(std::process::Stdio::null()) + .spawn(); + // Combine and log any errors from spawning the child (which we know + // synchronously), and from waiting for it (which we only know + // asynchronously). Ick! Maybe this will become easier when + // try-blocks are A Thing? Something like "try { child?.wait().await? } catch ..." + // + // Also note the Err from wait() does not mean the command returned + // an error exit status. The exit status is given to the Ok arm (and + // ignored). + match (|| async { child?.wait().await })().await { + Ok(_) => (), + Err(ref err) => { + error!("Error playing outdoor chime: {}", err); } } - false -} - -fn play_chime(audio_child: &RefCell>) { - trace!("Playing doorbell chime"); - match process::Command::new("mplayer") - .arg(OUTDOOR_CHIME_FILE) - .stdout(process::Stdio::null()) - .stderr(process::Stdio::null()) - .spawn() { - Ok(child) => { audio_child.replace(Some(child)); }, - Err(err) => error!("Error playing outdoor chime: {}", err) - }; } async fn hue_base(reqwest: &reqwest::Client, body: &'static str, delay_millis: u64) {