Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove actix-threadpool.use actix_rt::task::spawn_blocking #1878

Merged
merged 9 commits into from
Jan 9, 2021
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@
### Removed
* Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now
exposed directly by the `middleware` module.
* Remove `actix-threadpool` as dependency. `actix_threadpool::BlockingError` error type can be imported
from `actix_web::error` module. [#1878]

[#1812]: https://github.com/actix/actix-web/pull/1812
[#1813]: https://github.com/actix/actix-web/pull/1813
[#1852]: https://github.com/actix/actix-web/pull/1852
[#1865]: https://github.com/actix/actix-web/pull/1865
[#1875]: https://github.com/actix/actix-web/pull/1875

[#1878]: https://github.com/actix/actix-web/pull/1878

## 3.3.2 - 2020-12-01
### Fixed
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ actix-rt = "2.0.0-beta.1"
actix-server = "2.0.0-beta.2"
actix-service = "2.0.0-beta.2"
actix-utils = "3.0.0-beta.1"
actix-threadpool = "0.3.1"
actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true }

actix-web-codegen = "0.4.0"
Expand Down Expand Up @@ -133,6 +132,9 @@ actix-multipart = { path = "actix-multipart" }
actix-files = { path = "actix-files" }
awc = { path = "awc" }

# TODO: remove override
actix-rt = { git = "https://github.com/actix/actix-net.git" }

robjtede marked this conversation as resolved.
Show resolved Hide resolved
[[bench]]
name = "server"
harness = false
Expand Down
48 changes: 20 additions & 28 deletions actix-files/src/chunked.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@ use std::{
};

use actix_web::{
error::{BlockingError, Error},
web,
error::{Error, ErrorInternalServerError},
rt::task::{spawn_blocking, JoinHandle},
};
use bytes::Bytes;
use futures_core::{ready, Stream};
use futures_util::future::{FutureExt, LocalBoxFuture};

use crate::handle_error;

type ChunkedBoxFuture =
LocalBoxFuture<'static, Result<(File, Bytes), BlockingError<io::Error>>>;

#[doc(hidden)]
/// A helper created from a `std::fs::File` which reads the file
Expand All @@ -27,7 +21,7 @@ pub struct ChunkedReadFile {
pub(crate) size: u64,
pub(crate) offset: u64,
pub(crate) file: Option<File>,
pub(crate) fut: Option<ChunkedBoxFuture>,
pub(crate) fut: Option<JoinHandle<Result<(File, Bytes), io::Error>>>,
pub(crate) counter: u64,
}

Expand All @@ -45,18 +39,20 @@ impl Stream for ChunkedReadFile {
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
if let Some(ref mut fut) = self.fut {
return match ready!(Pin::new(fut).poll(cx)) {
Ok((file, bytes)) => {
let res = match ready!(Pin::new(fut).poll(cx)) {
Ok(Ok((file, bytes))) => {
self.fut.take();
self.file = Some(file);

self.offset += bytes.len() as u64;
self.counter += bytes.len() as u64;

Poll::Ready(Some(Ok(bytes)))
Ok(bytes)
}
Err(e) => Poll::Ready(Some(Err(handle_error(e)))),
Ok(Err(e)) => Err(e.into()),
Err(_) => Err(ErrorInternalServerError("Unexpected error")),
};
return Poll::Ready(Some(res));
}

let size = self.size;
Expand All @@ -68,25 +64,21 @@ impl Stream for ChunkedReadFile {
} else {
let mut file = self.file.take().expect("Use after completion");

self.fut = Some(
web::block(move || {
let max_bytes =
cmp::min(size.saturating_sub(counter), 65_536) as usize;
self.fut = Some(spawn_blocking(move || {
let max_bytes = cmp::min(size.saturating_sub(counter), 65_536) as usize;

let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;
let mut buf = Vec::with_capacity(max_bytes);
file.seek(io::SeekFrom::Start(offset))?;

let n_bytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;
let n_bytes =
file.by_ref().take(max_bytes as u64).read_to_end(&mut buf)?;

if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}
if n_bytes == 0 {
return Err(io::ErrorKind::UnexpectedEof.into());
}

Ok((file, Bytes::from(buf)))
})
.boxed_local(),
);
Ok((file, Bytes::from(buf)))
}));

self.poll_next(cx)
}
Expand Down
11 changes: 1 addition & 10 deletions actix-files/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@
#![deny(rust_2018_idioms)]
#![warn(missing_docs, missing_debug_implementations)]

use std::io;

use actix_service::boxed::{BoxService, BoxServiceFactory};
use actix_web::{
dev::{ServiceRequest, ServiceResponse},
error::{BlockingError, Error, ErrorInternalServerError},
error::Error,
http::header::DispositionType,
};
use mime_guess::from_ext;
Expand Down Expand Up @@ -56,13 +54,6 @@ pub fn file_extension_to_mime(ext: &str) -> mime::Mime {
from_ext(ext).first_or_octet_stream()
}

pub(crate) fn handle_error(err: BlockingError<io::Error>) -> Error {
match err {
BlockingError::Error(err) => err.into(),
BlockingError::Canceled => ErrorInternalServerError("Unexpected error"),
}
}

type MimeOverride = dyn Fn(&mime::Name<'_>) -> DispositionType;

#[cfg(test)]
Expand Down
5 changes: 4 additions & 1 deletion actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
* Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`.
due to the removal of this type from `tokio-openssl` crate. openssl handshake
error would return as `ConnectError::SslError`. [#1813]
* Remove `actix-threadpool` dependency. Use `actix_rt::task::spawn_blocking`.
Due to this change `actix_threadpool::BlockingError` type is moved into
`actix_http::error` module. [#1878]

[#1813]: https://github.com/actix/actix-web/pull/1813
[#1857]: https://github.com/actix/actix-web/pull/1857
[#1864]: https://github.com/actix/actix-web/pull/1864

[#1878]: https://github.com/actix/actix-web/pull/1878

## 2.2.0 - 2020-11-25
### Added
Expand Down
1 change: 0 additions & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ actix-service = "2.0.0-beta.2"
actix-codec = "0.4.0-beta.1"
actix-utils = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.1"
actix-threadpool = "0.3.1"
actix-tls = "3.0.0-beta.2"
actix = { version = "0.11.0-beta.1", optional = true }

Expand Down
17 changes: 11 additions & 6 deletions actix-http/src/encoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

use actix_threadpool::{run, CpuFuture};
use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliDecoder;
use bytes::Bytes;
use flate2::write::{GzDecoder, ZlibDecoder};
use futures_core::{ready, Stream};

use super::Writer;
use crate::error::PayloadError;
use crate::error::{BlockingError, PayloadError};
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};

const INPLACE: usize = 2049;
Expand All @@ -19,7 +19,7 @@ pub struct Decoder<S> {
decoder: Option<ContentDecoder>,
stream: S,
eof: bool,
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
}

impl<S> Decoder<S>
Expand Down Expand Up @@ -80,8 +80,13 @@ where
loop {
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
Ok(Ok(item)) => item,
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(BlockingError::Canceled.into())))
}
};
self.decoder = Some(decoder);
self.fut.take();
Expand All @@ -105,7 +110,7 @@ where
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
self.fut = Some(spawn_blocking(move || {
let chunk = decoder.feed_data(chunk)?;
Ok((chunk, decoder))
}));
Expand Down
18 changes: 13 additions & 5 deletions actix-http/src/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

use actix_threadpool::{run, CpuFuture};
use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliEncoder;
use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder};
Expand All @@ -17,6 +17,7 @@ use crate::http::{HeaderValue, StatusCode};
use crate::{Error, ResponseHead};

use super::Writer;
use crate::error::BlockingError;

const INPLACE: usize = 1024;

Expand All @@ -26,7 +27,7 @@ pub struct Encoder<B> {
#[pin]
body: EncoderBody<B>,
encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
}

impl<B: MessageBody> Encoder<B> {
Expand Down Expand Up @@ -136,8 +137,15 @@ impl<B: MessageBody> MessageBody for Encoder<B> {

if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
Ok(Ok(item)) => item,
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(
BlockingError::<io::Error>::Canceled.into(),
)))
}
};
let chunk = encoder.take();
*this.encoder = Some(encoder);
Expand All @@ -160,7 +168,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
return Poll::Ready(Some(Ok(chunk)));
}
} else {
*this.fut = Some(run(move || {
*this.fut = Some(spawn_blocking(move || {
encoder.write(&chunk)?;
Ok(encoder)
}));
Expand Down
18 changes: 14 additions & 4 deletions actix-http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::string::FromUtf8Error;
use std::{fmt, io, result};

use actix_codec::{Decoder, Encoder};
pub use actix_threadpool::BlockingError;
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
use actix_utils::timeout::TimeoutError;
use bytes::BytesMut;
Expand Down Expand Up @@ -186,9 +185,6 @@ impl ResponseError for DeError {
/// `InternalServerError` for `Canceled`
impl ResponseError for Canceled {}

/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}

/// Return `BAD_REQUEST` for `Utf8Error`
impl ResponseError for Utf8Error {
fn status_code(&self) -> StatusCode {
Expand Down Expand Up @@ -300,6 +296,20 @@ impl From<httparse::Error> for ParseError {
}
}

/// A set of errors that can occur running blocking tasks in thread pool.
#[derive(Debug, Display)]
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}

impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

derive_more Error would delcare E the source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the actix-threadpool code and I believe it's explictly added by a PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay can do it later if needed


/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}

#[derive(Display, Debug)]
/// A set of errors that can occur during payload parsing
pub enum PayloadError {
Expand Down
5 changes: 4 additions & 1 deletion src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,5 +280,8 @@ where
I: Send + 'static,
E: Send + std::fmt::Debug + 'static,
{
actix_threadpool::run(f).await
match actix_rt::task::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled),
}
}