Enqueue sse ring directly from interrupt. SmallVec for client list
This commit is contained in:
@@ -11,5 +11,6 @@ futures-util = "0.3.6"
|
||||
log = "0.4.11"
|
||||
pretty_env_logger = "0.4.0"
|
||||
rppal = "0.11.3"
|
||||
smallvec = "1.4.2"
|
||||
tokio = { version = "0.2", features = ["macros"] }
|
||||
warp = "0.2.5"
|
||||
|
||||
43
src/main.rs
43
src/main.rs
@@ -1,44 +1,41 @@
|
||||
use std::convert::Infallible;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use futures_util::{FutureExt,
|
||||
select,
|
||||
StreamExt};
|
||||
use futures_util::StreamExt;
|
||||
#[allow(unused_imports)] use log::{debug, error, info, trace};
|
||||
use rppal::gpio::{ Gpio, Level, Trigger };
|
||||
use smallvec::SmallVec;
|
||||
use tokio::sync::mpsc;
|
||||
use warp::{Filter, sse};
|
||||
|
||||
|
||||
const BUTTON_PIN: u8 = 26;
|
||||
|
||||
fn button_pressed(_level: Level, tx: &mut mpsc::Sender<()>) {
|
||||
fn button_pressed(_level: Level, clients: &Arc<Mutex<SmallVec<[mpsc::Sender<()>; 8]>>>) {
|
||||
info!("DOORBELL PRESS");
|
||||
// if rx full, we get some free debouncing
|
||||
// if rx is closed, program is terminating anyway
|
||||
let _ignore_err = tx.try_send(());
|
||||
clients.lock().unwrap().retain(|tx: &mut mpsc::Sender<()>| {
|
||||
match tx.try_send(()) {
|
||||
Ok(_) => true,
|
||||
Err(mpsc::error::TrySendError::Full(_)) => true, // we just get some free debouncing
|
||||
Err(mpsc::error::TrySendError::Closed(_)) => false
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
#[tokio::main(basic_scheduler)]
|
||||
async fn main() {
|
||||
pretty_env_logger::init();
|
||||
|
||||
let clients = Arc::new(Mutex::new(SmallVec::new()));
|
||||
let clients_filter = clients.clone();
|
||||
let clients_filter = warp::any().map(move || clients_filter.clone());
|
||||
|
||||
let gpio = Gpio::new().expect("gpio init");
|
||||
let mut pin = gpio.get(BUTTON_PIN).expect("pin init").into_input_pulldown();
|
||||
|
||||
let (mut tx, rx) = mpsc::channel(1);
|
||||
pin.set_async_interrupt(Trigger::RisingEdge, move |level| button_pressed(level, &mut tx))
|
||||
pin.set_async_interrupt(Trigger::RisingEdge, move |level| button_pressed(level, &clients))
|
||||
.expect("set interrupt");
|
||||
|
||||
let clients = Arc::new(Mutex::new(Vec::new()));
|
||||
let clients_filter = clients.clone();
|
||||
let clients_filter = warp::any().map(move || clients_filter.clone());
|
||||
let pushes = rx.for_each(|()| {
|
||||
debug!("received signal from interrupt");
|
||||
clients.lock().unwrap().retain(|tx: &mpsc::UnboundedSender<()>| tx.send(()).is_ok());
|
||||
futures_util::future::ready(())
|
||||
});
|
||||
|
||||
// GET /
|
||||
let root = warp::path::end()
|
||||
.and(warp::get())
|
||||
@@ -49,9 +46,9 @@ async fn main() {
|
||||
let events = warp::path!("events")
|
||||
.and(warp::get())
|
||||
.and(clients_filter)
|
||||
.map(|clients: Arc<Mutex<Vec<_>>>| {
|
||||
.map(|clients: Arc<Mutex<SmallVec<_>>>| {
|
||||
info!("GET /events");
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
clients.lock().unwrap().push(tx);
|
||||
|
||||
let stream = rx.map(|()| {
|
||||
@@ -63,9 +60,5 @@ async fn main() {
|
||||
});
|
||||
|
||||
let routes = root.or(events);
|
||||
|
||||
select! {
|
||||
_ = warp::serve(routes).run(([0, 0, 0, 0], 8060)).fuse() => (),
|
||||
_ = pushes.fuse() => ()
|
||||
}
|
||||
warp::serve(routes).run(([0, 0, 0, 0], 8060)).await;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user