Use tokio-process for chime mplayer subprocess
This commit is contained in:
@@ -13,6 +13,6 @@ pretty_env_logger = "0.4.0"
|
|||||||
reqwest = { version = "0.11", default-features = false }
|
reqwest = { version = "0.11", default-features = false }
|
||||||
rppal = { git = "https://github.com/golemparts/rppal/", rev = "2e980caf76756c97bb0b18fb3ab08fb51ed1f90e" }
|
rppal = { git = "https://github.com/golemparts/rppal/", rev = "2e980caf76756c97bb0b18fb3ab08fb51ed1f90e" }
|
||||||
smallvec = "1.4.2"
|
smallvec = "1.4.2"
|
||||||
tokio = { version = "1", features = ["macros"] }
|
tokio = { version = "1", features = ["macros", "process"] }
|
||||||
tokio-stream = "0.1.2"
|
tokio-stream = "0.1.2"
|
||||||
warp = "0.3"
|
warp = "0.3"
|
||||||
|
|||||||
67
src/main.rs
67
src/main.rs
@@ -1,14 +1,13 @@
|
|||||||
use std::cell::{Cell, RefCell};
|
use std::cell::Cell;
|
||||||
use std::convert::Infallible;
|
use std::convert::Infallible;
|
||||||
use std::process;
|
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures_util::{FutureExt, select, stream, StreamExt};
|
use futures_util::{FutureExt, join, select, stream, StreamExt};
|
||||||
#[allow(unused_imports)] use log::{debug, error, info, trace};
|
#[allow(unused_imports)] use log::{debug, error, info, trace};
|
||||||
use rppal::gpio::{Gpio, Trigger};
|
use rppal::gpio::{Gpio, Trigger};
|
||||||
use smallvec::SmallVec;
|
use smallvec::SmallVec;
|
||||||
use tokio::{ sync::mpsc,
|
use tokio::{ process, sync::mpsc,
|
||||||
time::{interval, sleep} };
|
time::{interval, sleep} };
|
||||||
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
|
use tokio_stream::wrappers::{IntervalStream, ReceiverStream};
|
||||||
use warp::{Filter, sse};
|
use warp::{Filter, sse};
|
||||||
@@ -37,17 +36,33 @@ async fn main() {
|
|||||||
events_clients.lock().unwrap().push(tx);
|
events_clients.lock().unwrap().push(tx);
|
||||||
|
|
||||||
let reqwest = &reqwest::Client::new();
|
let reqwest = &reqwest::Client::new();
|
||||||
let audio_child = &RefCell::new(None);
|
let audio_busy = &Cell::new(false);
|
||||||
let hue_busy = &Cell::new(false);
|
let hue_busy = &Cell::new(false);
|
||||||
let pushes = ReceiverStream::new(rx).for_each_concurrent(2, |()| async move {
|
let pushes = ReceiverStream::new(rx).for_each_concurrent(3, |()| async move {
|
||||||
if !audio_busy(&audio_child) {
|
// Process up to three concurrent button presses. A second press
|
||||||
play_chime(&audio_child)
|
// may arrive when only one of either the chime or flash has
|
||||||
|
// completed and so dispatch the other. A third press then
|
||||||
|
// arriving at this time should not get stuck in the channel, it
|
||||||
|
// should be consumed without effect.
|
||||||
|
//
|
||||||
|
// A shorter way to think of this is that the number of concurrent
|
||||||
|
// presses should be one for every async block being joined here,
|
||||||
|
// plus one more to report that they're all busy.
|
||||||
|
join! {
|
||||||
|
async {
|
||||||
|
if !audio_busy.replace(true) {
|
||||||
|
play_chime().await;
|
||||||
|
audio_busy.set(false);
|
||||||
} else { debug!("doorbell still ringing, not playing new chime"); }
|
} else { debug!("doorbell still ringing, not playing new chime"); }
|
||||||
|
},
|
||||||
|
|
||||||
|
async {
|
||||||
if !hue_busy.replace(true) {
|
if !hue_busy.replace(true) {
|
||||||
flash_porch(&reqwest).await;
|
flash_porch(&reqwest).await;
|
||||||
hue_busy.set(false);
|
hue_busy.set(false);
|
||||||
} else { debug!("hue already in use, not scheduling new flashing"); }
|
} else { debug!("hue already in use, not scheduling new flashing"); }
|
||||||
|
}
|
||||||
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
select! {
|
select! {
|
||||||
@@ -106,25 +121,27 @@ async fn warp(clients: Arc<Mutex<SmallVec<[mpsc::Sender<()>; CHANNEL_VEC_SIZE]>>
|
|||||||
warp::serve(routes).run(([0, 0, 0, 0], 8060)).await;
|
warp::serve(routes).run(([0, 0, 0, 0], 8060)).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
fn audio_busy(audio_child: &RefCell<Option<process::Child>>) -> bool {
|
async fn play_chime() {
|
||||||
if let Some(ref mut audio_child) = *audio_child.borrow_mut() {
|
|
||||||
if let Ok(None) = audio_child.try_wait() {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn play_chime(audio_child: &RefCell<Option<process::Child>>) {
|
|
||||||
trace!("Playing doorbell chime");
|
trace!("Playing doorbell chime");
|
||||||
match process::Command::new("mplayer")
|
let child = process::Command::new("mplayer")
|
||||||
.arg(OUTDOOR_CHIME_FILE)
|
.arg(OUTDOOR_CHIME_FILE)
|
||||||
.stdout(process::Stdio::null())
|
.stdout(std::process::Stdio::null())
|
||||||
.stderr(process::Stdio::null())
|
.stderr(std::process::Stdio::null())
|
||||||
.spawn() {
|
.spawn();
|
||||||
Ok(child) => { audio_child.replace(Some(child)); },
|
// Combine and log any errors from spawning the child (which we know
|
||||||
Err(err) => error!("Error playing outdoor chime: {}", err)
|
// synchronously), and from waiting for it (which we only know
|
||||||
};
|
// asynchronously). Ick! Maybe this will become easier when
|
||||||
|
// try-blocks are A Thing? Something like "try { child?.wait().await? } catch ..."
|
||||||
|
//
|
||||||
|
// Also note the Err from wait() does not mean the command returned
|
||||||
|
// an error exit status. The exit status is given to the Ok arm (and
|
||||||
|
// ignored).
|
||||||
|
match (|| async { child?.wait().await })().await {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(ref err) => {
|
||||||
|
error!("Error playing outdoor chime: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn hue_base(reqwest: &reqwest::Client, body: &'static str, delay_millis: u64) {
|
async fn hue_base(reqwest: &reqwest::Client, body: &'static str, delay_millis: u64) {
|
||||||
|
|||||||
Reference in New Issue
Block a user