Files
floorbell/src/main.rs
Jared Burce dfbbe01a24 Log client disconnected on ring sse
This is only detected when ring sse attempts to send on the closed
channel. It is not detected when the client actually
disconnects. Lower layers in the stack do notice the disconnect and
drop the sse stream so pings will stop being sent, but we don't have
any drop handler on the ring substream so we don't notice until we try
to use it.
2020-10-15 11:58:48 -07:00

65 lines
1.9 KiB
Rust

use std::convert::Infallible;
use std::sync::{Arc, Mutex};
use futures_util::StreamExt;
#[allow(unused_imports)] use log::{debug, error, info, trace};
use rppal::gpio::{ Gpio, Level, Trigger };
use smallvec::SmallVec;
use tokio::sync::mpsc;
use warp::{Filter, sse};
const BUTTON_PIN: u8 = 26;
fn button_pressed(_level: Level, clients: &Arc<Mutex<SmallVec<[mpsc::Sender<()>; 8]>>>) {
info!("DOORBELL PRESS");
clients.lock().unwrap().retain(|tx: &mut mpsc::Sender<()>| {
match tx.try_send(()) {
Ok(_) => true,
Err(mpsc::error::TrySendError::Full(_)) => true, // we just get some free debouncing
Err(mpsc::error::TrySendError::Closed(_)) => { info!("Event client disconnected"); false }
}
});
}
#[tokio::main(basic_scheduler)]
async fn main() {
pretty_env_logger::init_timed();
let clients = Arc::new(Mutex::new(SmallVec::new()));
let clients_filter = clients.clone();
let clients_filter = warp::any().map(move || clients_filter.clone());
let gpio = Gpio::new().expect("gpio init");
let mut pin = gpio.get(BUTTON_PIN).expect("pin init").into_input_pulldown();
pin.set_async_interrupt(Trigger::RisingEdge, move |level| button_pressed(level, &clients))
.expect("set interrupt");
// GET /
let root = warp::path::end()
.and(warp::get())
.map(|| info!("GET /")).untuple_one()
.and(warp::fs::file("./static/main.html"));
// GET /events
let events = warp::path!("events")
.and(warp::get())
.and(clients_filter)
.map(|clients: Arc<Mutex<SmallVec<_>>>| {
info!("GET /events");
let (tx, rx) = mpsc::channel(1);
clients.lock().unwrap().push(tx);
let stream = rx.map(|()| {
debug!("sending sse");
Ok::<_, Infallible>((sse::event("ring"),
sse::data("")))
});
sse::reply(stream)
});
let routes = root.or(events);
warp::serve(routes).run(([0, 0, 0, 0], 8060)).await;
}