feat: automatically replace Suspense blocks if they are still waiting to be flushed, without JS (replaces PartiallyBlocked)

This commit is contained in:
Greg Johnston 2024-07-01 22:43:40 -04:00
parent d7ca969848
commit a68653b385
3 changed files with 193 additions and 106 deletions

View File

@ -94,6 +94,10 @@ pub fn App() -> impl IntoView {
view=Post
ssr=SsrMode::InOrder
/>
<Route
path=(StaticSegment("post_partially_blocked"), ParamSegment("id"))
view=Post
/>
<ProtectedRoute
path=StaticSegment("admin")
view=Admin
@ -135,7 +139,9 @@ fn HomePage() -> impl IntoView {
<li>
<a href=format!("/post/{}", post.id)>{post.title.clone()}</a>
"|"
<a href=format!("/post_in_order/{}", post.id)>{post.title} "(in order)"</a>
<a href=format!("/post_in_order/{}", post.id)>{post.title.clone()} "(in order)"</a>
"|"
<a href=format!("/post_partially_blocked/{}", post.id)>{post.title} "(partially blocked)"</a>
</li>
</For>
</ul>
@ -158,7 +164,7 @@ fn Post() -> impl IntoView {
.map_err(|_| PostError::InvalidId)
})
};
let post_resource = Resource::new(id, |id| async move {
let post_resource = Resource::new_blocking(id, |id| async move {
match id {
Err(e) => Err(e),
Ok(id) => get_post(id)
@ -167,10 +173,19 @@ fn Post() -> impl IntoView {
.map_err(|_| PostError::ServerError),
}
});
let comments_resource = Resource::new(id, |id| async move {
match id {
Err(e) => Err(e),
Ok(id) => {
get_comments(id).await.map_err(|_| PostError::ServerError)
}
}
});
let post_view = Suspend(async move {
match post_resource.await.to_owned() {
Ok(Ok(post)) => Ok(view! {
match post_resource.await {
Ok(Ok(post)) => {
Ok(view! {
<h1>{post.title.clone()}</h1>
<p>{post.content.clone()}</p>
@ -179,6 +194,23 @@ fn Post() -> impl IntoView {
// when it's first served
<Title text=post.title/>
<Meta name="description" content=post.content/>
})
}
_ => Err(PostError::ServerError),
}
});
let comments_view = Suspend(async move {
match comments_resource.await {
Ok(comments) => Ok(view! {
<h1>"Comments"</h1>
<ul>
{comments.into_iter()
.map(|comment| view! {
<li>{comment}</li>
})
.collect_view()
}
</ul>
}),
_ => Err(PostError::ServerError),
}
@ -205,6 +237,9 @@ fn Post() -> impl IntoView {
}
}>{post_view}</ErrorBoundary>
</Suspense>
<Suspense fallback=move || view! { <p>"Loading comments..."</p> }>
{comments_view}
</Suspense>
}
}
@ -276,3 +311,10 @@ pub async fn get_post(id: usize) -> Result<Option<Post>, ServerFnError> {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
Ok(POSTS.iter().find(|post| post.id == id).cloned())
}
#[server]
pub async fn get_comments(id: usize) -> Result<Vec<String>, ServerFnError> {
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
_ = id;
Ok(vec!["Some comment".into(), "Some other comment".into()])
}

View File

@ -239,13 +239,9 @@ where
if OUT_OF_ORDER {
let mut fallback_position = *position;
buf.push_fallback(self.fallback, &mut fallback_position);
buf.push_async_out_of_order(
false, /* TODO should_block */ fut, position,
);
buf.push_async_out_of_order(fut, position);
} else {
buf.push_async(
false, // TODO should_block
{
buf.push_async({
let mut position = *position;
async move {
let value = fut.await;
@ -258,8 +254,7 @@ where
);
builder.finish().take_chunks()
}
},
);
});
*position = Position::NextChild;
}
}

View File

@ -17,7 +17,7 @@ pub struct StreamBuilder {
sync_buf: String,
pub chunks: VecDeque<StreamChunk>,
pending: Option<ChunkFuture>,
pending_ooo: VecDeque<ChunkFuture>,
pending_ooo: VecDeque<PinnedFuture<OooChunk>>,
id: Option<Vec<u16>>,
}
@ -47,7 +47,6 @@ impl StreamBuilder {
pub fn push_async(
&mut self,
should_block: bool,
fut: impl Future<Output = VecDeque<StreamChunk>> + Send + 'static,
) {
// flush sync chunk
@ -57,7 +56,6 @@ impl StreamBuilder {
}
self.chunks.push_back(StreamChunk::Async {
chunks: Box::pin(fut) as PinnedFuture<VecDeque<StreamChunk>>,
should_block,
});
}
@ -140,7 +138,6 @@ impl StreamBuilder {
pub fn push_async_out_of_order<View, Rndr>(
&mut self,
should_block: bool,
view: impl Future<Output = View> + Send + 'static,
position: &mut Position,
) where
@ -153,7 +150,6 @@ impl StreamBuilder {
let mut position = *position;
self.chunks.push_back(StreamChunk::OutOfOrder {
should_block,
chunks: Box::pin(async move {
let view = view.await;
@ -165,12 +161,6 @@ impl StreamBuilder {
}
}
subbuilder.sync_buf.reserve(591 + id.len()); // TODO size
subbuilder.sync_buf.push_str("<template id=\"");
subbuilder.sync_buf.push_str(&id);
subbuilder.sync_buf.push('f');
subbuilder.sync_buf.push_str("\">");
if let Some(id) = subbuilder.id.as_mut() {
id.push(0);
}
@ -179,30 +169,9 @@ impl StreamBuilder {
&mut position,
true,
);
let chunks = subbuilder.finish().take_chunks();
subbuilder.sync_buf.push_str("</template>");
// TODO nonce
subbuilder.sync_buf.push_str("<script");
subbuilder.sync_buf.push_str(r#">(function() { let id = ""#);
subbuilder.sync_buf.push_str(&id);
subbuilder.sync_buf.push_str(
"\";let open = undefined;let close = undefined;let walker \
= document.createTreeWalker(document.body, \
NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \
{if(walker.currentNode.textContent == `s-${id}o`){ \
open=walker.currentNode; } else \
if(walker.currentNode.textContent == `s-${id}c`) { close \
= walker.currentNode;}}let range = new Range(); \
range.setStartBefore(open); range.setEndBefore(close); \
range.deleteContents(); let tpl = \
document.getElementById(`${id}f`); \
close.parentNode.insertBefore(tpl.content.\
cloneNode(true), close);close.remove();})()",
);
subbuilder.sync_buf.push_str("</script>");
subbuilder.finish().take_chunks()
OooChunk { id, chunks }
}),
});
}
@ -222,31 +191,65 @@ pub enum StreamChunk {
Sync(String),
Async {
chunks: PinnedFuture<VecDeque<StreamChunk>>,
should_block: bool,
},
OutOfOrder {
chunks: PinnedFuture<VecDeque<StreamChunk>>,
should_block: bool,
chunks: PinnedFuture<OooChunk>,
},
}
#[derive(Debug)]
struct OooChunk {
id: String,
chunks: VecDeque<StreamChunk>,
}
impl OooChunk {
pub fn push_start(id: &str, buf: &mut String) {
buf.push_str("<template id=\"");
buf.push_str(id);
buf.push('f');
buf.push_str("\">");
}
pub fn push_end(id: &str, buf: &mut String) {
buf.push_str("</template>");
// TODO nonce
buf.push_str("<script");
buf.push_str(r#">(function() { let id = ""#);
buf.push_str(id);
buf.push_str(
"\";let open = undefined;let close = undefined;let walker \
= document.createTreeWalker(document.body, \
NodeFilter.SHOW_COMMENT);while(walker.nextNode()) \
{if(walker.currentNode.textContent == `s-${id}o`){ \
open=walker.currentNode; } else \
if(walker.currentNode.textContent == `s-${id}c`) { close \
= walker.currentNode;}}let range = new Range(); \
range.setStartBefore(open); range.setEndBefore(close); \
range.deleteContents(); let tpl = \
document.getElementById(`${id}f`); \
close.parentNode.insertBefore(tpl.content.\
cloneNode(true), close);close.remove();})()",
);
buf.push_str("</script>");
}
}
impl Debug for StreamChunk {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Sync(arg0) => f.debug_tuple("Sync").field(arg0).finish(),
Self::Async { should_block, .. } => f
.debug_struct("Async")
.field("should_block", should_block)
.finish_non_exhaustive(),
Self::OutOfOrder { should_block, .. } => f
.debug_struct("OutOfOrder")
.field("should_block", should_block)
.finish_non_exhaustive(),
Self::Async { .. } => {
f.debug_struct("Async").finish_non_exhaustive()
}
Self::OutOfOrder { .. } => {
f.debug_struct("OutOfOrder").finish_non_exhaustive()
}
}
}
}
// TODO handle should_block
impl Stream for StreamBuilder {
type Item = String;
@ -273,42 +276,83 @@ impl Stream for StreamBuilder {
let next_chunk = this.chunks.pop_front();
match next_chunk {
None => {
let sync_buf = mem::take(&mut this.sync_buf);
if sync_buf.is_empty() {
// now, handle out-of-order chunks
if let Some(mut pending) = this.pending_ooo.pop_front()
{
if let Some(mut pending) = this.pending_ooo.pop_front() {
match pending.as_mut().poll(cx) {
Poll::Ready(chunks) => {
for chunk in chunks.into_iter().rev() {
Poll::Ready(OooChunk { id, chunks }) => {
let opening = format!("<!--s-{id}o-->");
let placeholder_at =
this.sync_buf.find(&opening);
if let Some(start) = placeholder_at {
let closing = format!("<!--s-{id}c-->");
let end =
this.sync_buf.find(&closing).unwrap();
let chunks_iter = chunks.into_iter().rev();
// TODO can probably make this more efficient
let (before, replaced) =
this.sync_buf.split_at(start);
let (_, after) = replaced
.split_at(end - start + closing.len());
let mut buf = String::new();
buf.push_str(before);
let mut held_chunks = VecDeque::new();
for chunk in chunks_iter {
if let StreamChunk::Sync(ready) = chunk
{
buf.push_str(&ready);
} else {
held_chunks.push_front(chunk);
}
}
buf.push_str(after);
this.sync_buf = buf;
for chunk in held_chunks {
this.chunks.push_front(chunk);
}
} else {
OooChunk::push_start(
&id,
&mut this.sync_buf,
);
for chunk in chunks.into_iter().rev() {
if let StreamChunk::Sync(ready) = chunk
{
this.sync_buf.push_str(&ready);
} else {
this.chunks.push_front(chunk);
}
}
OooChunk::push_end(&id, &mut this.sync_buf);
}
self.poll_next(cx)
}
Poll::Pending => {
this.pending_ooo.push_back(pending);
if this.sync_buf.is_empty() {
Poll::Pending
}
}
} else {
Poll::Ready(Some(mem::take(
&mut this.sync_buf,
)))
}
}
}
} else if this.sync_buf.is_empty() {
Poll::Ready(None)
}
} else {
Poll::Ready(Some(sync_buf))
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
}
Some(StreamChunk::Sync(mut value)) => {
Some(StreamChunk::Sync(value)) => {
this.sync_buf.push_str(&value);
loop {
match this.chunks.pop_front() {
None => break,
Some(StreamChunk::Async {
chunks,
should_block,
}) => {
this.chunks.push_front(StreamChunk::Async {
chunks,
should_block,
});
Some(StreamChunk::Async { chunks }) => {
this.chunks
.push_front(StreamChunk::Async { chunks });
break;
}
Some(StreamChunk::OutOfOrder {
@ -318,22 +362,28 @@ impl Stream for StreamBuilder {
break;
}
Some(StreamChunk::Sync(next)) => {
value.push_str(&next);
this.sync_buf.push_str(&next);
}
}
}
let sync_buf = mem::take(&mut this.sync_buf);
value.push_str(&sync_buf);
Poll::Ready(Some(value))
this.poll_next(cx)
}
Some(StreamChunk::Async { chunks, .. }) => {
this.pending = Some(chunks);
if this.sync_buf.is_empty() {
self.poll_next(cx)
} else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
}
Some(StreamChunk::OutOfOrder { chunks, .. }) => {
this.pending_ooo.push_back(chunks);
if this.sync_buf.is_empty() {
self.poll_next(cx)
} else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
}
}
}