From 71c62dc6a2c85f9d21e6b246c51cfb22b75f9892 Mon Sep 17 00:00:00 2001 From: Jared Burce Date: Wed, 14 Oct 2020 13:32:23 -0700 Subject: [PATCH] Enqueue sse ring directly from interrupt. SmallVec for client list --- Cargo.toml | 1 + src/main.rs | 43 ++++++++++++++++++------------------------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 17b7e05..543947f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,5 +11,6 @@ futures-util = "0.3.6" log = "0.4.11" pretty_env_logger = "0.4.0" rppal = "0.11.3" +smallvec = "1.4.2" tokio = { version = "0.2", features = ["macros"] } warp = "0.2.5" diff --git a/src/main.rs b/src/main.rs index 22db0a6..f2b4802 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,44 +1,41 @@ use std::convert::Infallible; use std::sync::{Arc, Mutex}; -use futures_util::{FutureExt, - select, - StreamExt}; +use futures_util::StreamExt; #[allow(unused_imports)] use log::{debug, error, info, trace}; use rppal::gpio::{ Gpio, Level, Trigger }; +use smallvec::SmallVec; use tokio::sync::mpsc; use warp::{Filter, sse}; const BUTTON_PIN: u8 = 26; -fn button_pressed(_level: Level, tx: &mut mpsc::Sender<()>) { +fn button_pressed(_level: Level, clients: &Arc; 8]>>>) { info!("DOORBELL PRESS"); - // if rx full, we get some free debouncing - // if rx is closed, program is terminating anyway - let _ignore_err = tx.try_send(()); + clients.lock().unwrap().retain(|tx: &mut mpsc::Sender<()>| { + match tx.try_send(()) { + Ok(_) => true, + Err(mpsc::error::TrySendError::Full(_)) => true, // we just get some free debouncing + Err(mpsc::error::TrySendError::Closed(_)) => false + } + }); } #[tokio::main(basic_scheduler)] async fn main() { pretty_env_logger::init(); + let clients = Arc::new(Mutex::new(SmallVec::new())); + let clients_filter = clients.clone(); + let clients_filter = warp::any().map(move || clients_filter.clone()); + let gpio = Gpio::new().expect("gpio init"); let mut pin = gpio.get(BUTTON_PIN).expect("pin init").into_input_pulldown(); - let (mut tx, rx) = mpsc::channel(1); - pin.set_async_interrupt(Trigger::RisingEdge, move |level| button_pressed(level, &mut tx)) + pin.set_async_interrupt(Trigger::RisingEdge, move |level| button_pressed(level, &clients)) .expect("set interrupt"); - let clients = Arc::new(Mutex::new(Vec::new())); - let clients_filter = clients.clone(); - let clients_filter = warp::any().map(move || clients_filter.clone()); - let pushes = rx.for_each(|()| { - debug!("received signal from interrupt"); - clients.lock().unwrap().retain(|tx: &mpsc::UnboundedSender<()>| tx.send(()).is_ok()); - futures_util::future::ready(()) - }); - // GET / let root = warp::path::end() .and(warp::get()) @@ -49,9 +46,9 @@ async fn main() { let events = warp::path!("events") .and(warp::get()) .and(clients_filter) - .map(|clients: Arc>>| { + .map(|clients: Arc>>| { info!("GET /events"); - let (tx, rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::channel(1); clients.lock().unwrap().push(tx); let stream = rx.map(|()| { @@ -63,9 +60,5 @@ async fn main() { }); let routes = root.or(events); - - select! { - _ = warp::serve(routes).run(([0, 0, 0, 0], 8060)).fuse() => (), - _ = pushes.fuse() => () - } + warp::serve(routes).run(([0, 0, 0, 0], 8060)).await; }