simple counting http server-side-events stream
This commit is contained in:
@@ -9,6 +9,7 @@ cookie = "0.13"
|
|||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
nanoid = "0.3"
|
nanoid = "0.3"
|
||||||
|
pin-utils = "0.1.0-alpha.4"
|
||||||
pretty_env_logger = "0.4"
|
pretty_env_logger = "0.4"
|
||||||
sessions = { version = "0.0.2", features = ["fs-store", "tokio"] }
|
sessions = { version = "0.0.2", features = ["fs-store", "tokio"] }
|
||||||
time = "0.2"
|
time = "0.2"
|
||||||
|
|||||||
35
src/main.rs
35
src/main.rs
@@ -1,14 +1,21 @@
|
|||||||
|
use std::convert::Infallible;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use cookie::{Cookie, SameSite};
|
use cookie::{Cookie, SameSite};
|
||||||
|
use futures::{StreamExt, TryStreamExt};
|
||||||
use log::{debug, error, info, trace};
|
use log::{debug, error, info, trace};
|
||||||
use sessions::Session;
|
use sessions::Session;
|
||||||
|
use tokio::time::interval;
|
||||||
use warp::{Filter, Rejection, Reply};
|
use warp::{Filter, Rejection, Reply};
|
||||||
use warp::http::{StatusCode, Uri};
|
use warp::http::{StatusCode, Uri};
|
||||||
|
use warp::sse;
|
||||||
|
|
||||||
mod session;
|
mod session;
|
||||||
|
mod stream;
|
||||||
use crate::session::{NoSession, SESSION_HEADER, SESSION_NAME, SessionPolicy, with_session};
|
use crate::session::{NoSession, SESSION_HEADER, SESSION_NAME, SessionPolicy, with_session};
|
||||||
|
use crate::stream::StreamExt as NoDropStreamExt;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
@@ -20,9 +27,8 @@ async fn main() {
|
|||||||
.and(warp::get())
|
.and(warp::get())
|
||||||
.map(|| info!("GET /")).untuple_one()
|
.map(|| info!("GET /")).untuple_one()
|
||||||
.and(with_session(store.clone(), SessionPolicy::Existing))
|
.and(with_session(store.clone(), SessionPolicy::Existing))
|
||||||
.map(|session: Session| format!("Hello, {} (user #{})!",
|
.and(warp::fs::file("./static/game.html"))
|
||||||
session.get::<String>(SESSION_NAME).unwrap().unwrap(),
|
.map(|_session, file| file);
|
||||||
session.id().unwrap()));
|
|
||||||
|
|
||||||
// GET/POST /user name={name}
|
// GET/POST /user name={name}
|
||||||
let namechange = warp::path!("user")
|
let namechange = warp::path!("user")
|
||||||
@@ -30,7 +36,7 @@ async fn main() {
|
|||||||
.map(|| info!("GET /user")).untuple_one()
|
.map(|| info!("GET /user")).untuple_one()
|
||||||
.and(with_session(store.clone(), SessionPolicy::AllowNew))
|
.and(with_session(store.clone(), SessionPolicy::AllowNew))
|
||||||
// .recover(|err| todo!("prevent redir loop")) // reply::with_status(..)
|
// .recover(|err| todo!("prevent redir loop")) // reply::with_status(..)
|
||||||
.and(warp::fs::file("./static/index.html"))
|
.and(warp::fs::file("./static/user.html"))
|
||||||
.map(|_session, file| file)
|
.map(|_session, file| file)
|
||||||
.or(warp::post()
|
.or(warp::post()
|
||||||
.and(with_session(store.clone(), SessionPolicy::AllowNew))
|
.and(with_session(store.clone(), SessionPolicy::AllowNew))
|
||||||
@@ -55,8 +61,29 @@ async fn main() {
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// GET /events
|
||||||
|
let events = warp::path!("events")
|
||||||
|
.and(warp::get())
|
||||||
|
.map(|| info!("GET /events")).untuple_one()
|
||||||
|
.map(|| {
|
||||||
|
let mut counter = 0u64;
|
||||||
|
let stream = interval(Duration::from_secs(5)).map(move |_| {
|
||||||
|
counter += 1;
|
||||||
|
let res: Result<_, Infallible> = Ok((sse::event("count"),
|
||||||
|
sse::data(counter)));
|
||||||
|
res
|
||||||
|
});
|
||||||
|
let sse = sse::keep_alive()
|
||||||
|
.interval(Duration::from_secs(3))
|
||||||
|
.stream(stream)
|
||||||
|
.into_stream()
|
||||||
|
.on_drop(|_| info!("SSE stream dropped"));
|
||||||
|
sse::reply(sse)
|
||||||
|
});
|
||||||
|
|
||||||
let routes = root
|
let routes = root
|
||||||
.or(namechange)
|
.or(namechange)
|
||||||
|
.or(events)
|
||||||
.recover(handle_no_session);
|
.recover(handle_no_session);
|
||||||
|
|
||||||
warp::serve(routes).run(([127, 0, 0, 1], 8060)).await;
|
warp::serve(routes).run(([127, 0, 0, 1], 8060)).await;
|
||||||
|
|||||||
52
src/stream.rs
Normal file
52
src/stream.rs
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
use std::pin::Pin;
|
||||||
|
use std::task::Poll;
|
||||||
|
use std::task::Context;
|
||||||
|
|
||||||
|
use futures::Stream;
|
||||||
|
use pin_utils::unsafe_pinned;
|
||||||
|
|
||||||
|
pub struct OnDrop<S, F>
|
||||||
|
where F: FnMut(&mut S) {
|
||||||
|
stream: S,
|
||||||
|
drop_fn: F
|
||||||
|
}
|
||||||
|
impl<S: Unpin, F> Unpin for OnDrop<S, F> where F: FnMut(&mut S) {}
|
||||||
|
|
||||||
|
impl<S, F> OnDrop<S, F> where F: FnMut(&mut S) {
|
||||||
|
unsafe_pinned!(stream: S);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: ?Sized> StreamExt for T where T: Stream {}
|
||||||
|
|
||||||
|
pub trait StreamExt: Stream {
|
||||||
|
fn on_drop<F>(self, drop_fn: F) -> OnDrop<Self, F>
|
||||||
|
where
|
||||||
|
Self: Sized,
|
||||||
|
F: FnMut(&mut Self),
|
||||||
|
{
|
||||||
|
OnDrop { stream: self, drop_fn: drop_fn }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, F> Drop for OnDrop<S, F>
|
||||||
|
where
|
||||||
|
F: FnMut(&mut S) {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
(self.drop_fn)(&mut self.stream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S, F> Stream for OnDrop<S, F>
|
||||||
|
where
|
||||||
|
S: Stream,
|
||||||
|
F: FnMut(&mut S)
|
||||||
|
{
|
||||||
|
type Item = S::Item;
|
||||||
|
|
||||||
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||||
|
self.stream().poll_next(cx)
|
||||||
|
}
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
self.stream.size_hint()
|
||||||
|
}
|
||||||
|
}
|
||||||
25
static/game.html
Normal file
25
static/game.html
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
<!DOCTYPE html>
|
||||||
|
<html lang="en">
|
||||||
|
<head>
|
||||||
|
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
|
||||||
|
<title>Wabi Spectrum</title>
|
||||||
|
<script src="https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js"></script>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<section id="log">
|
||||||
|
<h1>Incoming Events</h1>
|
||||||
|
</section>
|
||||||
|
<template id="logline">
|
||||||
|
<p>Count updated: <span></span>
|
||||||
|
</template>
|
||||||
|
<script>
|
||||||
|
"use strict";
|
||||||
|
let sse = new EventSource('events');
|
||||||
|
sse.addEventListener('count', msg => {
|
||||||
|
const template = document.querySelector('#logline').content.cloneNode(true);
|
||||||
|
template.querySelector('span').textContent = msg.data;
|
||||||
|
document.querySelector('#log').appendChild(template);
|
||||||
|
});
|
||||||
|
</script>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
Reference in New Issue
Block a user