From 83232ad888c7090b8d9b85b9b3b0edbd52e4361b Mon Sep 17 00:00:00 2001 From: Jared Burce Date: Tue, 7 Apr 2020 22:16:53 -0700 Subject: [PATCH] simple counting http server-side-events stream --- Cargo.toml | 1 + src/main.rs | 35 ++++++++++++++++++--- src/stream.rs | 52 ++++++++++++++++++++++++++++++++ static/game.html | 25 +++++++++++++++ static/{index.html => user.html} | 0 5 files changed, 109 insertions(+), 4 deletions(-) create mode 100644 src/stream.rs create mode 100644 static/game.html rename static/{index.html => user.html} (100%) diff --git a/Cargo.toml b/Cargo.toml index 9814452..6637b3b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ cookie = "0.13" futures = "0.3" log = "0.4" nanoid = "0.3" +pin-utils = "0.1.0-alpha.4" pretty_env_logger = "0.4" sessions = { version = "0.0.2", features = ["fs-store", "tokio"] } time = "0.2" diff --git a/src/main.rs b/src/main.rs index cea7ca0..1598977 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,14 +1,21 @@ +use std::convert::Infallible; use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use cookie::{Cookie, SameSite}; +use futures::{StreamExt, TryStreamExt}; use log::{debug, error, info, trace}; use sessions::Session; +use tokio::time::interval; use warp::{Filter, Rejection, Reply}; use warp::http::{StatusCode, Uri}; +use warp::sse; mod session; +mod stream; use crate::session::{NoSession, SESSION_HEADER, SESSION_NAME, SessionPolicy, with_session}; +use crate::stream::StreamExt as NoDropStreamExt; #[tokio::main] async fn main() { @@ -20,9 +27,8 @@ async fn main() { .and(warp::get()) .map(|| info!("GET /")).untuple_one() .and(with_session(store.clone(), SessionPolicy::Existing)) - .map(|session: Session| format!("Hello, {} (user #{})!", - session.get::(SESSION_NAME).unwrap().unwrap(), - session.id().unwrap())); + .and(warp::fs::file("./static/game.html")) + .map(|_session, file| file); // GET/POST /user name={name} let namechange = warp::path!("user") @@ -30,7 +36,7 @@ async fn main() { .map(|| info!("GET /user")).untuple_one() .and(with_session(store.clone(), SessionPolicy::AllowNew)) // .recover(|err| todo!("prevent redir loop")) // reply::with_status(..) - .and(warp::fs::file("./static/index.html")) + .and(warp::fs::file("./static/user.html")) .map(|_session, file| file) .or(warp::post() .and(with_session(store.clone(), SessionPolicy::AllowNew)) @@ -55,8 +61,29 @@ async fn main() { ) ); + // GET /events + let events = warp::path!("events") + .and(warp::get()) + .map(|| info!("GET /events")).untuple_one() + .map(|| { + let mut counter = 0u64; + let stream = interval(Duration::from_secs(5)).map(move |_| { + counter += 1; + let res: Result<_, Infallible> = Ok((sse::event("count"), + sse::data(counter))); + res + }); + let sse = sse::keep_alive() + .interval(Duration::from_secs(3)) + .stream(stream) + .into_stream() + .on_drop(|_| info!("SSE stream dropped")); + sse::reply(sse) + }); + let routes = root .or(namechange) + .or(events) .recover(handle_no_session); warp::serve(routes).run(([127, 0, 0, 1], 8060)).await; diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..5d8241a --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,52 @@ +use std::pin::Pin; +use std::task::Poll; +use std::task::Context; + +use futures::Stream; +use pin_utils::unsafe_pinned; + +pub struct OnDrop +where F: FnMut(&mut S) { + stream: S, + drop_fn: F +} +impl Unpin for OnDrop where F: FnMut(&mut S) {} + +impl OnDrop where F: FnMut(&mut S) { + unsafe_pinned!(stream: S); +} + +impl StreamExt for T where T: Stream {} + +pub trait StreamExt: Stream { + fn on_drop(self, drop_fn: F) -> OnDrop + where + Self: Sized, + F: FnMut(&mut Self), + { + OnDrop { stream: self, drop_fn: drop_fn } + } +} + +impl Drop for OnDrop +where + F: FnMut(&mut S) { + fn drop(&mut self) { + (self.drop_fn)(&mut self.stream); + } +} + +impl Stream for OnDrop +where + S: Stream, + F: FnMut(&mut S) +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.stream().poll_next(cx) + } + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} diff --git a/static/game.html b/static/game.html new file mode 100644 index 0000000..59dd65f --- /dev/null +++ b/static/game.html @@ -0,0 +1,25 @@ + + + + + Wabi Spectrum + + + +
+

Incoming Events

+
+ + + + diff --git a/static/index.html b/static/user.html similarity index 100% rename from static/index.html rename to static/user.html