From 069099c7bb981b57313d05a54e7c8a99b3c022e0 Mon Sep 17 00:00:00 2001 From: Jared Burce Date: Wed, 14 Oct 2020 12:58:31 -0700 Subject: [PATCH] button presses -> sse -> web display --- Cargo.toml | 1 + src/main.rs | 62 +++++++++++++++++++++++++++++++++++++++--------- static/main.html | 16 ++++++++++++- 3 files changed, 67 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 927d3dd..17b7e05 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +futures-util = "0.3.6" log = "0.4.11" pretty_env_logger = "0.4.0" rppal = "0.11.3" diff --git a/src/main.rs b/src/main.rs index 032532e..22db0a6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,25 +1,43 @@ -use std::thread::sleep; -use std::time::Duration; +use std::convert::Infallible; +use std::sync::{Arc, Mutex}; + +use futures_util::{FutureExt, + select, + StreamExt}; +#[allow(unused_imports)] use log::{debug, error, info, trace}; +use rppal::gpio::{ Gpio, Level, Trigger }; +use tokio::sync::mpsc; +use warp::{Filter, sse}; -use log::info; -use rppal::gpio::{Gpio, Level, Trigger}; -use warp::Filter; const BUTTON_PIN: u8 = 26; -fn button_pressed(_level: Level) { - println!("pushed"); - sleep(Duration::from_millis(25)); +fn button_pressed(_level: Level, tx: &mut mpsc::Sender<()>) { + info!("DOORBELL PRESS"); + // if rx full, we get some free debouncing + // if rx is closed, program is terminating anyway + let _ignore_err = tx.try_send(()); } -#[tokio::main] +#[tokio::main(basic_scheduler)] async fn main() { pretty_env_logger::init(); let gpio = Gpio::new().expect("gpio init"); let mut pin = gpio.get(BUTTON_PIN).expect("pin init").into_input_pulldown(); - pin.set_async_interrupt(Trigger::RisingEdge, button_pressed).expect("set interrupt"); + let (mut tx, rx) = mpsc::channel(1); + pin.set_async_interrupt(Trigger::RisingEdge, move |level| button_pressed(level, &mut tx)) + .expect("set interrupt"); + + let clients = Arc::new(Mutex::new(Vec::new())); + let clients_filter = clients.clone(); + let clients_filter = warp::any().map(move || clients_filter.clone()); + let pushes = rx.for_each(|()| { + debug!("received signal from interrupt"); + clients.lock().unwrap().retain(|tx: &mpsc::UnboundedSender<()>| tx.send(()).is_ok()); + futures_util::future::ready(()) + }); // GET / let root = warp::path::end() @@ -27,5 +45,27 @@ async fn main() { .map(|| info!("GET /")).untuple_one() .and(warp::fs::file("./static/main.html")); - warp::serve(root).run(([0, 0, 0, 0], 8060)).await; + // GET /events + let events = warp::path!("events") + .and(warp::get()) + .and(clients_filter) + .map(|clients: Arc>>| { + info!("GET /events"); + let (tx, rx) = mpsc::unbounded_channel(); + clients.lock().unwrap().push(tx); + + let stream = rx.map(|()| { + debug!("sending sse"); + Ok::<_, Infallible>((sse::event("ring"), + sse::data(""))) + }); + sse::reply(stream) + }); + + let routes = root.or(events); + + select! { + _ = warp::serve(routes).run(([0, 0, 0, 0], 8060)).fuse() => (), + _ = pushes.fuse() => () + } } diff --git a/static/main.html b/static/main.html index 21218b2..fdc9a9f 100644 --- a/static/main.html +++ b/static/main.html @@ -1,9 +1,23 @@ - + Fáfnir Doorbell +
+

Incoming Events

+
+ +