scope Arena to each request

This commit is contained in:
Greg Johnston 2024-03-23 20:53:46 -04:00
parent f5935c6333
commit ea3790d91c
7 changed files with 216 additions and 41 deletions

View File

@ -20,6 +20,7 @@ leptos = { workspace = true, features = ["nonce", "hydration"] }
server_fn = { workspace = true, features = ["axum-no-default"] }
leptos_macro = { workspace = true, features = ["axum"] }
leptos_meta = { workspace = true }
reactive_graph = { workspace = true, features = ["sandboxed-arenas"] }
routing = { workspace = true }
#leptos_integration_utils = { workspace = true }
parking_lot = "0.12"

View File

@ -58,6 +58,7 @@ use leptos::{
use leptos_meta::{MetaContext, ServerMetaContext};
use once_cell::sync::OnceCell;
use parking_lot::RwLock;
use reactive_graph::owner::Sandboxed;
use routing::{
location::RequestUrl, PathSegment, RouteList, RouteListing, SsrMode,
StaticDataMap, StaticMode,
@ -711,13 +712,12 @@ where
let res_options2 = default_res_options.clone();
let res_options3 = default_res_options.clone();
Box::pin(async move {
let owner = Owner::new_root(Arc::new(SsrSharedContext::new()));
let owner = Owner::new_root(Some(Arc::new(SsrSharedContext::new())));
Box::pin(Sandboxed::new(async move {
let meta_context = ServerMetaContext::new();
let stream = owner.with(|| {
// Need to get the path and query string of the Request
// For reasons that escape me, if the incoming URI protocol is https, it provides the absolute URI
// if http, it returns a relative path. Adding .path() seems to make it explicitly return the relative uri
let path = req.uri().path_and_query().unwrap().as_str();
let full_path = format!("http://leptos.dev{path}");
@ -748,7 +748,7 @@ where
let stream = meta_context.inject_meta_context(stream).await;
Html(Body::from_stream(
Html(Body::from_stream(Sandboxed::new(
stream
.map(|chunk| Ok(chunk) as Result<String, std::io::Error>)
// drop the owner, cleaning up the reactive runtime,
@ -757,9 +757,9 @@ where
drop(owner);
Ok(Default::default())
})),
))
)))
.into_response()
})
}))
}
}
@ -1427,7 +1427,7 @@ where
{
init_executor();
let owner = Owner::new();
let owner = Owner::new_root(None);
let routes = owner
.with(|| {
// stub out a path for now

View File

@ -39,7 +39,7 @@ where
Executor::init_wasm_bindgen();
// create a new reactive owner and use it as the root node to run the app
let owner = Owner::new_root(Arc::new(HydrateSharedContext::new()));
let owner = Owner::new_root(Some(Arc::new(HydrateSharedContext::new())));
let mountable = owner.with(move || {
let view = f().into_view();
view.hydrate::<true>(

View File

@ -25,6 +25,7 @@ nightly = []
serde = ["dep:serde"]
tracing = ["dep:tracing"]
hydration = ["dep:hydration_context"]
sandboxed-arenas = []
[package.metadata.docs.rs]
all-features = true

View File

@ -2,6 +2,8 @@ use super::{
inner::ArcAsyncDerivedInner, AsyncDerivedReadyFuture, AsyncState,
ScopedFuture,
};
#[cfg(feature = "sandboxed-arenas")]
use crate::owner::Sandboxed;
use crate::{
channel::channel,
graph::{
@ -118,7 +120,7 @@ macro_rules! spawn_derived {
let value = Arc::downgrade(&this.value);
let inner = Arc::downgrade(&this.inner);
let wakers = Arc::downgrade(&this.wakers);
async move {
let fut = async move {
while rx.next().await.is_some() {
match (value.upgrade(), inner.upgrade(), wakers.upgrade()) {
(Some(value), Some(inner), Some(wakers)) => {
@ -128,6 +130,8 @@ macro_rules! spawn_derived {
any_subscriber
.with_observer(|| ScopedFuture::new($fun()))
});
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
// update state from Complete to Reloading
{
@ -162,7 +166,12 @@ macro_rules! spawn_derived {
_ => break,
}
}
}
};
#[cfg(feature = "sandboxed-arenas")]
let fut = Sandboxed::new(fut);
fut
});
this

View File

@ -12,6 +12,9 @@ use std::{
mod arena;
mod context;
use self::arena::Arena;
#[cfg(feature = "sandboxed-arenas")]
pub use arena::sandboxed::Sandboxed;
use arena::NodeId;
pub use arena::{StoredData, StoredValue};
pub use context::*;
@ -58,8 +61,10 @@ impl Owner {
#[cfg(feature = "hydration")]
pub fn new_root(
shared_context: Arc<dyn SharedContext + Send + Sync>,
shared_context: Option<Arc<dyn SharedContext + Send + Sync>>,
) -> Self {
Arena::enter_new();
Self {
inner: Arc::new(RwLock::new(OwnerInner {
parent: None,
@ -68,7 +73,7 @@ impl Owner {
cleanups: Default::default(),
})),
#[cfg(feature = "hydration")]
shared_context: Some(shared_context),
shared_context,
}
}
@ -108,8 +113,12 @@ impl Owner {
cleanup();
}
for node in nodes {
_ = arena::map().write().or_poisoned().remove(node);
if !nodes.is_empty() {
Arena::with_mut(|arena| {
for node in nodes {
_ = arena.remove(node);
}
});
}
self.with(fun)
@ -170,8 +179,13 @@ impl Drop for OwnerInner {
cleanup();
}
for node in mem::take(&mut self.nodes) {
_ = arena::map().write().or_poisoned().remove(node);
let nodes = mem::take(&mut self.nodes);
if !nodes.is_empty() {
Arena::with_mut(|arena| {
for node in nodes {
_ = arena.remove(node);
}
});
}
}
}

View File

@ -1,21 +1,172 @@
use super::OWNER;
use or_poisoned::OrPoisoned;
use slotmap::{new_key_type, SlotMap};
use std::{
any::Any,
hash::Hash,
marker::PhantomData,
sync::{OnceLock, RwLock},
};
#[cfg(not(feature = "sandboxed-arenas"))]
use std::sync::OnceLock;
use std::{any::Any, hash::Hash, marker::PhantomData, sync::RwLock};
#[cfg(feature = "sandboxed-arenas")]
use std::{cell::RefCell, sync::Arc};
new_key_type! { pub(crate) struct NodeId; }
static MAP: OnceLock<RwLock<SlotMap<NodeId, Box<dyn Any + Send + Sync>>>> =
OnceLock::new();
pub(crate) struct Arena;
pub(crate) fn map(
) -> &'static RwLock<SlotMap<NodeId, Box<dyn Any + Send + Sync>>> {
MAP.get_or_init(Default::default)
type ArenaMap = SlotMap<NodeId, Box<dyn Any + Send + Sync>>;
#[cfg(not(feature = "sandboxed-arenas"))]
static MAP: OnceLock<RwLock<ArenaMap>> = OnceLock::new();
#[cfg(feature = "sandboxed-arenas")]
thread_local! {
pub(crate) static MAP: RefCell<Option<Arc<RwLock<ArenaMap>>>> = RefCell::new(None);
}
impl Arena {
#[inline(always)]
pub fn enter_new() {
#[cfg(feature = "sandboxed-arenas")]
MAP.with_borrow_mut(|arena| {
*arena =
Some(Arc::new(RwLock::new(SlotMap::with_capacity_and_key(32))))
})
}
#[track_caller]
pub fn with<U>(fun: impl FnOnce(&ArenaMap) -> U) -> U {
#[cfg(not(feature = "sandboxed-arenas"))]
{
fun(&MAP.get_or_init(Default::default).read().or_poisoned())
}
#[cfg(feature = "sandboxed-arenas")]
{
MAP.with_borrow(|arena| {
fun(&arena
.as_ref()
.unwrap_or_else(|| {
panic!(
"at {}, the `sandboxed-arenas` feature is active, \
but no Arena is active",
std::panic::Location::caller()
)
})
.read()
.or_poisoned())
})
}
}
#[track_caller]
pub fn with_mut<U>(fun: impl FnOnce(&mut ArenaMap) -> U) -> U {
#[cfg(not(feature = "sandboxed-arenas"))]
{
fun(&mut MAP.get_or_init(Default::default).write().or_poisoned())
}
#[cfg(feature = "sandboxed-arenas")]
{
let caller = std::panic::Location::caller();
MAP.with_borrow(|arena| {
fun(&mut arena
.as_ref()
.unwrap_or_else(|| {
panic!(
"at {}, the `sandboxed-arenas` feature is active, \
but no Arena is active",
caller
)
})
.write()
.or_poisoned())
})
}
}
}
#[cfg(feature = "sandboxed-arenas")]
pub mod sandboxed {
use super::{Arena, ArenaMap, MAP};
use futures::Stream;
use pin_project_lite::pin_project;
use std::{
future::Future,
mem,
pin::Pin,
sync::{Arc, RwLock},
task::{Context, Poll},
};
impl Arena {
fn set(new_arena: Arc<RwLock<ArenaMap>>) -> UnsetArenaOnDrop {
MAP.with_borrow_mut(|arena| {
UnsetArenaOnDrop(mem::replace(arena, Some(new_arena)))
})
}
}
pin_project! {
pub struct Sandboxed<T> {
arena: Arc<RwLock<ArenaMap>>,
#[pin]
inner: T,
}
}
impl<T> Sandboxed<T> {
pub fn new(inner: T) -> Self {
let arena = MAP.with_borrow(|current| {
Arc::clone(current.as_ref().expect(
"the `sandboxed-arenas` feature is active, but no Arena \
is active",
))
});
Self { arena, inner }
}
}
impl<Fut> Future for Sandboxed<Fut>
where
Fut: Future,
{
type Output = Fut::Output;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let unset = Arena::set(Arc::clone(&self.arena));
let this = self.project();
let res = this.inner.poll(cx);
drop(unset);
res
}
}
impl<T> Stream for Sandboxed<T>
where
T: Stream,
{
type Item = T::Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let unset = Arena::set(Arc::clone(&self.arena));
let this = self.project();
let res = this.inner.poll_next(cx);
drop(unset);
res
}
}
#[derive(Debug)]
struct UnsetArenaOnDrop(Option<Arc<RwLock<ArenaMap>>>);
impl Drop for UnsetArenaOnDrop {
fn drop(&mut self) {
if let Some(inner) = self.0.take() {
MAP.with_borrow_mut(|current_map| *current_map = Some(inner));
}
}
}
}
#[derive(Debug)]
@ -54,10 +205,9 @@ where
#[track_caller]
pub fn new(value: T) -> Self {
let node = {
map()
.write()
.or_poisoned()
.insert(Box::new(value) as Box<dyn Any + Send + Sync>)
Arena::with_mut(|arena| {
arena.insert(Box::new(value) as Box<dyn Any + Send + Sync>)
})
};
OWNER.with(|o| {
if let Some(owner) = &*o.borrow() {
@ -72,17 +222,17 @@ where
}
pub fn with_value<U>(&self, fun: impl FnOnce(&T) -> U) -> Option<U> {
let m = map().read().or_poisoned();
let m = m.get(self.node);
m.and_then(|n| n.downcast_ref::<T>()).map(fun)
Arena::with(|arena| {
let m = arena.get(self.node);
m.and_then(|n| n.downcast_ref::<T>()).map(fun)
})
}
pub fn update_value<U>(&self, fun: impl FnOnce(&mut T) -> U) -> Option<U> {
let mut m = map().write().or_poisoned();
let m = m.get_mut(self.node);
m.and_then(|n| n.downcast_mut::<T>()).map(fun)
Arena::with_mut(|arena| {
let m = arena.get_mut(self.node);
m.and_then(|n| n.downcast_mut::<T>()).map(fun)
})
}
pub fn get(&self) -> Option<T>
@ -100,11 +250,11 @@ where
where
T: Clone,
{
map().read().or_poisoned().contains_key(self.node)
Arena::with(|arena| arena.contains_key(self.node))
}
pub fn dispose(&self) {
map().write().or_poisoned().remove(self.node);
Arena::with_mut(|arena| arena.remove(self.node));
}
}