Skip to content

Commit

Permalink
improve error handling in update stream (vercel/turborepo#4705)
Browse files Browse the repository at this point in the history
### Description

Handle fatal errors in update stream, by sending an not found update
with issues.

---------

Co-authored-by: Alex Kirszenberg <alex.kirszenberg@vercel.com>
  • Loading branch information
sokra and alexkirsz authored Apr 26, 2023
1 parent e56edd9 commit dfbb1e6
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 68 deletions.
39 changes: 18 additions & 21 deletions crates/turbopack-dev-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,27 @@ enum GetFromSourceResult {
async fn get_from_source(
source: ContentSourceVc,
request: TransientInstance<SourceRequest>,
issue_repoter: IssueReporterVc,
) -> Result<GetFromSourceResultVc> {
Ok(
match &*resolve_source_request(source, request, issue_repoter).await? {
ResolveSourceRequestResult::Static(static_content_vc, header_overwrites) => {
let static_content = static_content_vc.await?;
if let AssetContent::File(file) = &*static_content.content.content().await? {
GetFromSourceResult::Static {
content: file.await?,
status_code: static_content.status_code,
headers: static_content.headers.await?,
header_overwrites: header_overwrites.await?,
}
} else {
GetFromSourceResult::NotFound
Ok(match &*resolve_source_request(source, request).await? {
ResolveSourceRequestResult::Static(static_content_vc, header_overwrites) => {
let static_content = static_content_vc.await?;
if let AssetContent::File(file) = &*static_content.content.content().await? {
GetFromSourceResult::Static {
content: file.await?,
status_code: static_content.status_code,
headers: static_content.headers.await?,
header_overwrites: header_overwrites.await?,
}
} else {
GetFromSourceResult::NotFound
}
ResolveSourceRequestResult::HttpProxy(proxy) => {
GetFromSourceResult::HttpProxy(proxy.await?)
}
ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
}
.cell(),
)
ResolveSourceRequestResult::HttpProxy(proxy) => {
GetFromSourceResult::HttpProxy(proxy.await?)
}
ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
}
.cell())
}

/// Processes an HTTP request within a given content source and returns the
Expand All @@ -77,7 +74,7 @@ pub async fn process_request_with_content_source(
) -> Result<Response<hyper::Body>> {
let original_path = request.uri().path().to_string();
let request = http_request_to_source_request(request).await?;
let result = get_from_source(source, TransientInstance::new(request), issue_reporter);
let result = get_from_source(source, TransientInstance::new(request));
handle_issues(result, &original_path, "get_from_source", issue_reporter).await?;
match &*result.strongly_consistent().await? {
GetFromSourceResult::Static {
Expand Down
15 changes: 1 addition & 14 deletions crates/turbopack-dev-server/src/source/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use anyhow::{bail, Result};
use hyper::Uri;
use turbo_tasks::{TransientInstance, Value};
use turbopack_core::issue::IssueReporterVc;

use super::{
headers::{HeaderValue, Headers},
Expand All @@ -15,10 +14,7 @@ use super::{
ContentSourceContent, ContentSourceDataVary, ContentSourceResult, ContentSourceVc,
HeaderListVc, ProxyResultVc, StaticContentVc,
};
use crate::{
handle_issues,
source::{ContentSource, ContentSourceData, GetContentSourceContent},
};
use crate::source::{ContentSource, ContentSourceData, GetContentSourceContent};

/// The result of [`resolve_source_request`]. Similar to a
/// `ContentSourceContent`, but without the `Rewrite` variant as this is taken
Expand All @@ -36,7 +32,6 @@ pub enum ResolveSourceRequestResult {
pub async fn resolve_source_request(
source: ContentSourceVc,
request: TransientInstance<SourceRequest>,
issue_reporter: IssueReporterVc,
) -> Result<ResolveSourceRequestResultVc> {
let mut data = ContentSourceData::default();
let mut current_source = source;
Expand All @@ -47,14 +42,6 @@ pub async fn resolve_source_request(
let mut response_header_overwrites = Vec::new();
loop {
let result = current_source.get(&current_asset_path, Value::new(data));
handle_issues(
result,
&original_path,
"get content from source",
issue_reporter,
)
.await?;

match &*result.strongly_consistent().await? {
ContentSourceResult::NotFound => break Ok(ResolveSourceRequestResult::NotFound.cell()),
ContentSourceResult::NeedData(needed) => {
Expand Down
14 changes: 10 additions & 4 deletions crates/turbopack-dev-server/src/update/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ impl<P: SourceProvider + Clone + Send + Sync> UpdateServer<P> {
let source = source_provider.get_source();
resolve_source_request(
source,
TransientInstance::new(request),
self.issue_reporter
TransientInstance::new(request)
)
}
};
match UpdateStream::new(TransientInstance::new(Box::new(get_content))).await {
match UpdateStream::new(resource.to_string(), TransientInstance::new(Box::new(get_content))).await {
Ok(stream) => {
streams.insert(resource, stream);
}
Expand All @@ -94,7 +93,14 @@ impl<P: SourceProvider + Clone + Send + Sync> UpdateServer<P> {
}
}
Some((resource, update)) = streams.next() => {
Self::send_update(&mut client, &mut streams, resource, &update).await?;
match update {
Ok(update) => {
Self::send_update(&mut client, &mut streams, resource, &update).await?;
}
Err(err) => {
eprintln!("Failed to get update for {resource}: {}", PrettyPrintError(&err));
}
}
}
else => break
}
Expand Down
136 changes: 107 additions & 29 deletions crates/turbopack-dev-server/src/update/stream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use std::pin::Pin;

use anyhow::{bail, Result};
use anyhow::Result;
use futures::{prelude::*, Stream};
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use turbo_tasks::{CollectiblesSource, IntoTraitRef, State, TraitRef, TransientInstance};
use turbo_tasks::{
primitives::StringVc, CollectiblesSource, IntoTraitRef, State, TraitRef, TransientInstance,
};
use turbo_tasks_fs::{FileSystem, FileSystemPathVc};
use turbopack_core::{
issue::{IssueVc, PlainIssueReadRef},
error::PrettyPrintError,
issue::{
Issue, IssueSeverity, IssueSeverityVc, IssueVc, OptionIssueProcessingPathItemsVc,
PlainIssueReadRef,
},
server_fs::ServerFileSystemVc,
version::{
NotFoundVersionVc, PartialUpdate, TotalUpdate, Update, UpdateReadRef, VersionVc,
VersionedContent,
Expand Down Expand Up @@ -38,12 +46,43 @@ fn extend_issues(issues: &mut Vec<PlainIssueReadRef>, new_issues: Vec<PlainIssue

#[turbo_tasks::function]
async fn get_update_stream_item(
resource: &str,
from: VersionStateVc,
get_content: TransientInstance<GetContentFn>,
) -> Result<UpdateStreamItemVc> {
let content = get_content();
let mut plain_issues = peek_issues(content).await?;

let content_value = match content.await {
Ok(content) => content,
Err(e) => {
plain_issues.push(
FatalStreamIssue {
resource: resource.to_string(),
description: StringVc::cell(format!("{}", PrettyPrintError(&e))),
}
.cell()
.as_issue()
.into_plain(OptionIssueProcessingPathItemsVc::none())
.await?,
);

let update = Update::Total(TotalUpdate {
to: NotFoundVersionVc::new()
.as_version()
.into_trait_ref()
.await?,
})
.cell();
return Ok(UpdateStreamItem::Found {
update: update.await?,
issues: plain_issues,
}
.cell());
}
};

match *content.await? {
match *content_value {
ResolveSourceRequestResult::Static(static_content_vc, _) => {
let static_content = static_content_vc.await?;

Expand All @@ -56,8 +95,7 @@ async fn get_update_stream_item(
let from = from.get();
let update = resolved_content.update(from);

let mut plain_issues = peek_issues(update).await?;
extend_issues(&mut plain_issues, peek_issues(content).await?);
extend_issues(&mut plain_issues, peek_issues(update).await?);

let update = update.await?;

Expand All @@ -74,7 +112,7 @@ async fn get_update_stream_item(
return Ok(UpdateStreamItem::NotFound.cell());
}

let plain_issues = peek_issues(proxy_result).await?;
extend_issues(&mut plain_issues, peek_issues(proxy_result).await?);

let from = from.get();
if let Some(from) = ProxyResultVc::resolve_from(from).await? {
Expand All @@ -98,8 +136,6 @@ async fn get_update_stream_item(
.cell())
}
_ => {
let plain_issues = peek_issues(content).await?;

let update = if plain_issues.is_empty() {
// Client requested a non-existing asset
// It might be removed in meantime, reload client
Expand Down Expand Up @@ -127,19 +163,17 @@ async fn get_update_stream_item(

#[turbo_tasks::function]
async fn compute_update_stream(
resource: &str,
from: VersionStateVc,
get_content: TransientInstance<GetContentFn>,
sender: TransientInstance<Sender<UpdateStreamItemReadRef>>,
) -> Result<()> {
let item = get_update_stream_item(from, get_content)
sender: TransientInstance<Sender<Result<UpdateStreamItemReadRef>>>,
) {
let item = get_update_stream_item(resource, from, get_content)
.strongly_consistent()
.await?;

if sender.send(item).await.is_err() {
bail!("channel closed");
}
.await;

Ok(())
// Send update. Ignore channel closed error.
let _ = sender.send(item).await;
}

#[turbo_tasks::value]
Expand Down Expand Up @@ -172,10 +206,15 @@ impl VersionStateVc {
}
}

pub(super) struct UpdateStream(Pin<Box<dyn Stream<Item = UpdateStreamItemReadRef> + Send + Sync>>);
pub(super) struct UpdateStream(
Pin<Box<dyn Stream<Item = Result<UpdateStreamItemReadRef>> + Send + Sync>>,
);

impl UpdateStream {
pub async fn new(get_content: TransientInstance<GetContentFn>) -> Result<UpdateStream> {
pub async fn new(
resource: String,
get_content: TransientInstance<GetContentFn>,
) -> Result<UpdateStream> {
let (sx, rx) = tokio::sync::mpsc::channel(32);

let content = get_content();
Expand All @@ -190,13 +229,18 @@ impl UpdateStream {
};
let version_state = VersionStateVc::new(version.into_trait_ref().await?).await?;

compute_update_stream(version_state, get_content, TransientInstance::new(sx));
compute_update_stream(
&resource,
version_state,
get_content,
TransientInstance::new(sx),
);

let mut last_had_issues = false;

let stream = ReceiverStream::new(rx).filter_map(move |item| {
let (has_issues, issues_changed) =
if let UpdateStreamItem::Found { issues, .. } = &*item {
if let Some(UpdateStreamItem::Found { issues, .. }) = item.as_deref().ok() {
let has_issues = !issues.is_empty();
let issues_changed = has_issues != last_had_issues;
last_had_issues = has_issues;
Expand All @@ -206,12 +250,8 @@ impl UpdateStream {
};

async move {
match &*item {
UpdateStreamItem::NotFound => {
// Propagate not found updates so we can drop this update stream.
Some(item)
}
UpdateStreamItem::Found { update, .. } => {
match item.as_deref() {
Ok(UpdateStreamItem::Found { update, .. }) => {
match &**update {
Update::Partial(PartialUpdate { to, .. })
| Update::Total(TotalUpdate { to }) => {
Expand All @@ -232,6 +272,10 @@ impl UpdateStream {
}
}
}
_ => {
// Propagate other updates
Some(item)
}
}
}
});
Expand All @@ -241,7 +285,7 @@ impl UpdateStream {
}

impl Stream for UpdateStream {
type Item = UpdateStreamItemReadRef;
type Item = Result<UpdateStreamItemReadRef>;

fn poll_next(
self: Pin<&mut Self>,
Expand All @@ -260,3 +304,37 @@ pub enum UpdateStreamItem {
issues: Vec<PlainIssueReadRef>,
},
}

#[turbo_tasks::value(serialization = "none")]
struct FatalStreamIssue {
description: StringVc,
resource: String,
}

#[turbo_tasks::value_impl]
impl Issue for FatalStreamIssue {
#[turbo_tasks::function]
fn severity(&self) -> IssueSeverityVc {
IssueSeverity::Fatal.into()
}

#[turbo_tasks::function]
fn context(&self) -> FileSystemPathVc {
ServerFileSystemVc::new().root().join(&self.resource)
}

#[turbo_tasks::function]
fn category(&self) -> StringVc {
StringVc::cell("websocket".to_string())
}

#[turbo_tasks::function]
fn title(&self) -> StringVc {
StringVc::cell("Fatal error while getting content to stream".to_string())
}

#[turbo_tasks::function]
fn description(&self) -> StringVc {
self.description
}
}

0 comments on commit dfbb1e6

Please sign in to comment.