Skip to content

Commit

Permalink
Use SharedError for body streaming (#4392)
Browse files Browse the repository at this point in the history
Follow up to #4329, this removes
`BodyError` (which was just a simple `String` and not an `Error`) and
switches to `SharedError`.

I've implemented a basic `PartialEq` and `Serialization` for
`SharedError` so that it doesn't infect everything that uses `Body`.
  • Loading branch information
jridgewell committed Mar 31, 2023
1 parent 8d6cd99 commit 1414a69
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 58 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 39 additions & 2 deletions crates/turbo-tasks/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use std::{
any::Provider,
error::Error as StdError,
fmt::{Debug, Display},
hash::{Hash, Hasher},
ops::Deref,
sync::Arc,
time::Duration,
};

use anyhow::Error;
use anyhow::{anyhow, Error};
use serde::{Deserialize, Deserializer, Serialize, Serializer};

pub use super::{id_factory::IdFactory, no_move_vec::NoMoveVec, once_map::*};

Expand All @@ -25,7 +27,7 @@ impl SharedError {
}
}

impl std::error::Error for SharedError {
impl StdError for SharedError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.inner.source()
}
Expand All @@ -47,6 +49,41 @@ impl From<Error> for SharedError {
}
}

impl PartialEq for SharedError {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.inner, &other.inner)
}
}

impl Eq for SharedError {}

impl Serialize for SharedError {
fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
let mut v = vec![self.to_string()];
let mut source = self.source();
while let Some(s) = source {
v.push(s.to_string());
source = s.source();
}
Serialize::serialize(&v, serializer)
}
}

impl<'de> Deserialize<'de> for SharedError {
fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
use serde::de::Error;
let mut messages = <Vec<String>>::deserialize(deserializer)?;
let mut e = match messages.pop() {
Some(e) => anyhow!(e),
None => return Err(Error::custom("expected at least 1 error message")),
};
while let Some(message) = messages.pop() {
e = e.context(message);
}
Ok(SharedError::new(e))
}
}

pub struct FormatDuration(pub Duration);

impl Display for FormatDuration {
Expand Down
1 change: 0 additions & 1 deletion crates/turbopack-dev-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ pin-project-lite = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_qs = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-stream = "0.1.9"
tokio-util = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/turbopack-dev-server/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::io::{Error, ErrorKind};

use anyhow::Result;
use anyhow::{anyhow, Result};
use futures::{StreamExt, TryStreamExt};
use hyper::{
header::{HeaderName, CONTENT_ENCODING, CONTENT_LENGTH},
Expand All @@ -10,15 +10,15 @@ use hyper::{
use mime::Mime;
use mime_guess::mime;
use tokio_util::io::{ReaderStream, StreamReader};
use turbo_tasks::TransientInstance;
use turbo_tasks::{util::SharedError, TransientInstance};
use turbo_tasks_bytes::Bytes;
use turbo_tasks_fs::{FileContent, FileContentReadRef};
use turbopack_core::{asset::AssetContent, issue::IssueReporterVc, version::VersionedContent};

use crate::source::{
request::SourceRequest,
resolve::{resolve_source_request, ResolveSourceRequestResult},
Body, BodyError, ContentSourceVc, HeaderListReadRef, ProxyResultReadRef,
Body, ContentSourceVc, HeaderListReadRef, ProxyResultReadRef,
};

#[turbo_tasks::value(serialization = "none")]
Expand Down Expand Up @@ -199,7 +199,7 @@ async fn http_request_to_source_request(request: Request<hyper::Body>) -> Result
let bytes: Vec<_> = body
.map(|bytes| {
bytes.map_or_else(
|e| Err(BodyError::new(e.to_string())),
|e| Err(SharedError::new(anyhow!(e))),
// The outer Ok is consumed by try_collect, but the Body type requires a Result, so
// we need to double wrap.
|b| Ok(Ok(Bytes::from(b))),
Expand Down
38 changes: 3 additions & 35 deletions crates/turbopack-dev-server/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ use std::collections::BTreeSet;
use anyhow::Result;
use futures::stream::Stream as StreamTrait;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use turbo_tasks::{trace::TraceRawVcs, Value};
use turbo_tasks::{trace::TraceRawVcs, util::SharedError, Value};
use turbo_tasks_bytes::{Bytes, Stream, StreamRead};
use turbo_tasks_fs::FileSystemPathVc;
use turbopack_core::version::VersionedContentVc;
Expand All @@ -40,38 +39,6 @@ pub struct ProxyResult {
pub body: Body,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Error)]
#[error("{err}")]
pub struct BodyError {
err: String,
}

impl BodyError {
pub fn new(err: String) -> Self {
BodyError { err }
}
}

impl From<&str> for BodyError {
fn from(err: &str) -> Self {
BodyError {
err: err.to_string(),
}
}
}

impl From<String> for BodyError {
fn from(err: String) -> Self {
BodyError { err }
}
}

impl From<anyhow::Error> for BodyError {
fn from(value: anyhow::Error) -> Self {
value.to_string().into()
}
}

/// The return value of a content source when getting a path. A specificity is
/// attached and when combining results this specificity should be used to order
/// results.
Expand Down Expand Up @@ -266,7 +233,8 @@ pub struct ContentSourceData {
pub cache_buster: u64,
}

pub type BodyChunk = Result<Bytes, BodyError>;
pub type BodyChunk = Result<Bytes, SharedError>;

/// A request body.
#[turbo_tasks::value(shared)]
#[derive(Default, Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion crates/turbopack-node/src/evaluate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ struct JavaScriptStreamSender {
get: Box<dyn Fn() -> UnboundedSender<Result<Bytes, SharedError>> + Send + Sync>,
}

#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")]
#[turbo_tasks::value(transparent)]
#[derive(Clone, Debug)]
pub struct JavaScriptEvaluation(#[turbo_tasks(trace_ignore)] JavaScriptStream);

Expand Down
14 changes: 7 additions & 7 deletions crates/turbopack-node/src/render/render_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use turbo_tasks_bytes::{Bytes, Stream};
use turbo_tasks_env::ProcessEnvVc;
use turbo_tasks_fs::FileSystemPathVc;
use turbopack_core::{chunk::ChunkingContextVc, error::PrettyPrintError};
use turbopack_dev_server::source::{Body, BodyError, BodyVc, ProxyResult, ProxyResultVc};
use turbopack_dev_server::source::{Body, BodyVc, ProxyResult, ProxyResultVc};
use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc};

use super::{
Expand Down Expand Up @@ -68,11 +68,11 @@ pub async fn render_proxy(

let body = Body::from_stream(stream.map(|item| match item {
Ok(RenderItem::BodyChunk(b)) => Ok(b),
Ok(v) => Err(BodyError::new(format!("unexpected render item: {:#?}", v))),
Err(e) => Err(BodyError::new(format!(
"error streaming proxied contents: {}",
e
Ok(v) => Err(SharedError::new(anyhow!(
"unexpected render item: {:#?}",
v
))),
Err(e) => Err(e),
}));
let result = ProxyResult {
status: data.status,
Expand Down Expand Up @@ -131,12 +131,12 @@ enum RenderItem {
type RenderItemResult = Result<RenderItem, SharedError>;

#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
pub struct RenderStreamSender {
struct RenderStreamSender {
#[turbo_tasks(trace_ignore, debug_ignore)]
get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
}

#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")]
#[turbo_tasks::value(transparent)]
struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);

#[turbo_tasks::function]
Expand Down
14 changes: 7 additions & 7 deletions crates/turbopack-node/src/render/render_static.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use turbopack_core::{
};
use turbopack_dev_server::{
html::DevHtmlAssetVc,
source::{Body, BodyError, HeaderListVc, RewriteBuilder, RewriteVc},
source::{Body, HeaderListVc, RewriteBuilder, RewriteVc},
};
use turbopack_ecmascript::{chunk::EcmascriptChunkPlaceablesVc, EcmascriptModuleAssetVc};

Expand Down Expand Up @@ -107,11 +107,11 @@ pub async fn render_static(
RenderItem::Headers(data) => {
let body = stream.map(|item| match item {
Ok(RenderItem::BodyChunk(b)) => Ok(b),
Ok(v) => Err(BodyError::new(format!("unexpected render item: {:#?}", v))),
Err(e) => Err(BodyError::new(format!(
"error streaming proxied contents: {}",
e
Ok(v) => Err(SharedError::new(anyhow!(
"unexpected render item: {:#?}",
v
))),
Err(e) => Err(e),
});
StaticResult::StreamedContent {
status: data.status,
Expand Down Expand Up @@ -180,12 +180,12 @@ enum RenderItem {
type RenderItemResult = Result<RenderItem, SharedError>;

#[turbo_tasks::value(eq = "manual", cell = "new", serialization = "none")]
pub struct RenderStreamSender {
struct RenderStreamSender {
#[turbo_tasks(trace_ignore, debug_ignore)]
get: Box<dyn Fn() -> UnboundedSender<RenderItemResult> + Send + Sync>,
}

#[turbo_tasks::value(transparent, eq = "manual", cell = "new", serialization = "none")]
#[turbo_tasks::value(transparent)]
struct RenderStream(#[turbo_tasks(trace_ignore)] Stream<RenderItemResult>);

#[turbo_tasks::function]
Expand Down

0 comments on commit 1414a69

Please sign in to comment.