Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve error handling in update stream #4705

Merged
merged 3 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 18 additions & 21 deletions crates/turbopack-dev-server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,30 +42,27 @@ enum GetFromSourceResult {
async fn get_from_source(
source: ContentSourceVc,
request: TransientInstance<SourceRequest>,
issue_repoter: IssueReporterVc,
) -> Result<GetFromSourceResultVc> {
Ok(
match &*resolve_source_request(source, request, issue_repoter).await? {
ResolveSourceRequestResult::Static(static_content_vc, header_overwrites) => {
let static_content = static_content_vc.await?;
if let AssetContent::File(file) = &*static_content.content.content().await? {
GetFromSourceResult::Static {
content: file.await?,
status_code: static_content.status_code,
headers: static_content.headers.await?,
header_overwrites: header_overwrites.await?,
}
} else {
GetFromSourceResult::NotFound
Ok(match &*resolve_source_request(source, request).await? {
ResolveSourceRequestResult::Static(static_content_vc, header_overwrites) => {
let static_content = static_content_vc.await?;
if let AssetContent::File(file) = &*static_content.content.content().await? {
GetFromSourceResult::Static {
content: file.await?,
status_code: static_content.status_code,
headers: static_content.headers.await?,
header_overwrites: header_overwrites.await?,
}
} else {
GetFromSourceResult::NotFound
}
ResolveSourceRequestResult::HttpProxy(proxy) => {
GetFromSourceResult::HttpProxy(proxy.await?)
}
ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
}
.cell(),
)
ResolveSourceRequestResult::HttpProxy(proxy) => {
GetFromSourceResult::HttpProxy(proxy.await?)
}
ResolveSourceRequestResult::NotFound => GetFromSourceResult::NotFound,
}
.cell())
}

/// Processes an HTTP request within a given content source and returns the
Expand All @@ -77,7 +74,7 @@ pub async fn process_request_with_content_source(
) -> Result<Response<hyper::Body>> {
let original_path = request.uri().path().to_string();
let request = http_request_to_source_request(request).await?;
let result = get_from_source(source, TransientInstance::new(request), issue_reporter);
let result = get_from_source(source, TransientInstance::new(request));
handle_issues(result, &original_path, "get_from_source", issue_reporter).await?;
match &*result.strongly_consistent().await? {
GetFromSourceResult::Static {
Expand Down
15 changes: 1 addition & 14 deletions crates/turbopack-dev-server/src/source/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
use anyhow::{bail, Result};
use hyper::Uri;
use turbo_tasks::{TransientInstance, Value};
use turbopack_core::issue::IssueReporterVc;

use super::{
headers::{HeaderValue, Headers},
Expand All @@ -15,10 +14,7 @@ use super::{
ContentSourceContent, ContentSourceDataVary, ContentSourceResult, ContentSourceVc,
HeaderListVc, ProxyResultVc, StaticContentVc,
};
use crate::{
handle_issues,
source::{ContentSource, ContentSourceData, GetContentSourceContent},
};
use crate::source::{ContentSource, ContentSourceData, GetContentSourceContent};

/// The result of [`resolve_source_request`]. Similar to a
/// `ContentSourceContent`, but without the `Rewrite` variant as this is taken
Expand All @@ -36,7 +32,6 @@ pub enum ResolveSourceRequestResult {
pub async fn resolve_source_request(
source: ContentSourceVc,
request: TransientInstance<SourceRequest>,
issue_reporter: IssueReporterVc,
) -> Result<ResolveSourceRequestResultVc> {
let mut data = ContentSourceData::default();
let mut current_source = source;
Expand All @@ -47,14 +42,6 @@ pub async fn resolve_source_request(
let mut response_header_overwrites = Vec::new();
loop {
let result = current_source.get(&current_asset_path, Value::new(data));
handle_issues(
result,
&original_path,
"get content from source",
issue_reporter,
)
.await?;

match &*result.strongly_consistent().await? {
ContentSourceResult::NotFound => break Ok(ResolveSourceRequestResult::NotFound.cell()),
ContentSourceResult::NeedData(needed) => {
Expand Down
14 changes: 10 additions & 4 deletions crates/turbopack-dev-server/src/update/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,11 @@ impl<P: SourceProvider + Clone + Send + Sync> UpdateServer<P> {
let source = source_provider.get_source();
resolve_source_request(
source,
TransientInstance::new(request),
self.issue_reporter
TransientInstance::new(request)
)
}
};
match UpdateStream::new(TransientInstance::new(Box::new(get_content))).await {
match UpdateStream::new(format!("{resource}"), TransientInstance::new(Box::new(get_content))).await {
sokra marked this conversation as resolved.
Show resolved Hide resolved
Ok(stream) => {
streams.insert(resource, stream);
}
Expand All @@ -94,7 +93,14 @@ impl<P: SourceProvider + Clone + Send + Sync> UpdateServer<P> {
}
}
Some((resource, update)) = streams.next() => {
Self::send_update(&mut client, &mut streams, resource, &update).await?;
match update {
Ok(update) => {
Self::send_update(&mut client, &mut streams, resource, &update).await?;
}
Err(err) => {
eprintln!("Failed to get update for {resource}: {}", PrettyPrintError(&err));
}
}
}
else => break
}
Expand Down
136 changes: 107 additions & 29 deletions crates/turbopack-dev-server/src/update/stream.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use std::pin::Pin;

use anyhow::{bail, Result};
use anyhow::Result;
use futures::{prelude::*, Stream};
use tokio::sync::mpsc::Sender;
use tokio_stream::wrappers::ReceiverStream;
use turbo_tasks::{CollectiblesSource, IntoTraitRef, State, TraitRef, TransientInstance};
use turbo_tasks::{
primitives::StringVc, CollectiblesSource, IntoTraitRef, State, TraitRef, TransientInstance,
};
use turbo_tasks_fs::{FileSystem, FileSystemPathVc};
use turbopack_core::{
issue::{IssueVc, PlainIssueReadRef},
error::PrettyPrintError,
issue::{
Issue, IssueSeverity, IssueSeverityVc, IssueVc, OptionIssueProcessingPathItemsVc,
PlainIssueReadRef,
},
server_fs::ServerFileSystemVc,
version::{
NotFoundVersionVc, PartialUpdate, TotalUpdate, Update, UpdateReadRef, VersionVc,
VersionedContent,
Expand Down Expand Up @@ -38,12 +46,43 @@ fn extend_issues(issues: &mut Vec<PlainIssueReadRef>, new_issues: Vec<PlainIssue

#[turbo_tasks::function]
async fn get_update_stream_item(
resource: &str,
from: VersionStateVc,
get_content: TransientInstance<GetContentFn>,
) -> Result<UpdateStreamItemVc> {
let content = get_content();
let mut plain_issues = peek_issues(content).await?;

let content_value = match content.await {
Ok(content) => content,
Err(e) => {
plain_issues.push(
FatalStreamIssue {
resource: resource.to_string(),
description: StringVc::cell(format!("{}", PrettyPrintError(&e))),
}
.cell()
.as_issue()
.into_plain(OptionIssueProcessingPathItemsVc::none())
.await?,
);

let update = Update::Total(TotalUpdate {
to: NotFoundVersionVc::new()
.as_version()
.into_trait_ref()
.await?,
})
.cell();
return Ok(UpdateStreamItem::Found {
update: update.await?,
issues: plain_issues,
}
.cell());
}
};

match *content.await? {
match *content_value {
ResolveSourceRequestResult::Static(static_content_vc, _) => {
let static_content = static_content_vc.await?;

Expand All @@ -56,8 +95,7 @@ async fn get_update_stream_item(
let from = from.get();
let update = resolved_content.update(from);

let mut plain_issues = peek_issues(update).await?;
extend_issues(&mut plain_issues, peek_issues(content).await?);
extend_issues(&mut plain_issues, peek_issues(update).await?);

let update = update.await?;

Expand All @@ -74,7 +112,7 @@ async fn get_update_stream_item(
return Ok(UpdateStreamItem::NotFound.cell());
}

let plain_issues = peek_issues(proxy_result).await?;
extend_issues(&mut plain_issues, peek_issues(proxy_result).await?);

let from = from.get();
if let Some(from) = ProxyResultVc::resolve_from(from).await? {
Expand All @@ -98,8 +136,6 @@ async fn get_update_stream_item(
.cell())
}
_ => {
let plain_issues = peek_issues(content).await?;

let update = if plain_issues.is_empty() {
// Client requested a non-existing asset
// It might be removed in meantime, reload client
Expand Down Expand Up @@ -127,19 +163,17 @@ async fn get_update_stream_item(

#[turbo_tasks::function]
async fn compute_update_stream(
resource: &str,
from: VersionStateVc,
get_content: TransientInstance<GetContentFn>,
sender: TransientInstance<Sender<UpdateStreamItemReadRef>>,
) -> Result<()> {
let item = get_update_stream_item(from, get_content)
sender: TransientInstance<Sender<Result<UpdateStreamItemReadRef>>>,
) {
let item = get_update_stream_item(resource, from, get_content)
.strongly_consistent()
.await?;

if sender.send(item).await.is_err() {
bail!("channel closed");
}
.await;

Ok(())
// Send update. Ignore channel closed error.
let _ = sender.send(item).await;
}

#[turbo_tasks::value]
Expand Down Expand Up @@ -172,10 +206,15 @@ impl VersionStateVc {
}
}

pub(super) struct UpdateStream(Pin<Box<dyn Stream<Item = UpdateStreamItemReadRef> + Send + Sync>>);
pub(super) struct UpdateStream(
Pin<Box<dyn Stream<Item = Result<UpdateStreamItemReadRef>> + Send + Sync>>,
);

impl UpdateStream {
pub async fn new(get_content: TransientInstance<GetContentFn>) -> Result<UpdateStream> {
pub async fn new(
resource: String,
get_content: TransientInstance<GetContentFn>,
) -> Result<UpdateStream> {
let (sx, rx) = tokio::sync::mpsc::channel(32);

let content = get_content();
Expand All @@ -190,13 +229,18 @@ impl UpdateStream {
};
let version_state = VersionStateVc::new(version.into_trait_ref().await?).await?;

compute_update_stream(version_state, get_content, TransientInstance::new(sx));
compute_update_stream(
&resource,
version_state,
get_content,
TransientInstance::new(sx),
);

let mut last_had_issues = false;

let stream = ReceiverStream::new(rx).filter_map(move |item| {
let (has_issues, issues_changed) =
if let UpdateStreamItem::Found { issues, .. } = &*item {
if let Some(UpdateStreamItem::Found { issues, .. }) = item.as_deref().ok() {
let has_issues = !issues.is_empty();
let issues_changed = has_issues != last_had_issues;
last_had_issues = has_issues;
Expand All @@ -206,12 +250,8 @@ impl UpdateStream {
};

async move {
match &*item {
UpdateStreamItem::NotFound => {
// Propagate not found updates so we can drop this update stream.
Some(item)
}
UpdateStreamItem::Found { update, .. } => {
match item.as_deref() {
Ok(UpdateStreamItem::Found { update, .. }) => {
match &**update {
Update::Partial(PartialUpdate { to, .. })
| Update::Total(TotalUpdate { to }) => {
Expand All @@ -232,6 +272,10 @@ impl UpdateStream {
}
}
}
_ => {
// Propagate other updates
Some(item)
}
}
}
});
Expand All @@ -241,7 +285,7 @@ impl UpdateStream {
}

impl Stream for UpdateStream {
type Item = UpdateStreamItemReadRef;
type Item = Result<UpdateStreamItemReadRef>;

fn poll_next(
self: Pin<&mut Self>,
Expand All @@ -260,3 +304,37 @@ pub enum UpdateStreamItem {
issues: Vec<PlainIssueReadRef>,
},
}

#[turbo_tasks::value(serialization = "none")]
struct FatalStreamIssue {
description: StringVc,
resource: String,
}

#[turbo_tasks::value_impl]
impl Issue for FatalStreamIssue {
#[turbo_tasks::function]
fn severity(&self) -> IssueSeverityVc {
IssueSeverity::Fatal.into()
}

#[turbo_tasks::function]
fn context(&self) -> FileSystemPathVc {
ServerFileSystemVc::new().root().join(&self.resource)
}

#[turbo_tasks::function]
fn category(&self) -> StringVc {
StringVc::cell("websocket".to_string())
}

#[turbo_tasks::function]
fn title(&self) -> StringVc {
StringVc::cell("Fatal error while getting content to stream".to_string())
}

#[turbo_tasks::function]
fn description(&self) -> StringVc {
self.description
}
}