Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove actix-threadpool.use actix_rt::task::spawn_blocking #1878

Merged
merged 9 commits into from
Jan 9, 2021
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
### Removed
* Public modules `middleware::{normalize, err_handlers}`. All necessary middleware structs are now
exposed directly by the `middleware` module.
* Remove `actix-threadpool` as dependency. `actix_threadpool::BlockingError` error type can be imported
from `actix_web::error` module. [#1878]

[#1812]: https://github.com/actix/actix-web/pull/1812
[#1813]: https://github.com/actix/actix-web/pull/1813
[#1852]: https://github.com/actix/actix-web/pull/1852
[#1865]: https://github.com/actix/actix-web/pull/1865
[#1875]: https://github.com/actix/actix-web/pull/1875

[#1878]: https://github.com/actix/actix-web/pull/1878

## 3.3.2 - 2020-12-01
### Fixed
Expand Down
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ actix-rt = "2.0.0-beta.1"
actix-server = "2.0.0-beta.2"
actix-service = "2.0.0-beta.2"
actix-utils = "3.0.0-beta.1"
actix-threadpool = "0.3.1"
actix-tls = { version = "3.0.0-beta.2", default-features = false, optional = true }

actix-web-codegen = "0.4.0"
Expand Down Expand Up @@ -131,6 +130,9 @@ actix-multipart = { path = "actix-multipart" }
actix-files = { path = "actix-files" }
awc = { path = "awc" }

# TODO: remove override
actix-rt = { git = "https://github.com/actix/actix-net.git" }

robjtede marked this conversation as resolved.
Show resolved Hide resolved
[[bench]]
name = "server"
harness = false
Expand Down
5 changes: 4 additions & 1 deletion actix-http/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
* Remove `ConnectError::SslHandshakeError` and re-export of `HandshakeError`.
due to the removal of this type from `tokio-openssl` crate. openssl handshake
error would return as `ConnectError::SslError`. [#1813]
* Remove `actix-threadpool` dependency. Use `actix_rt::task::spawn_blocking`.
Due to this change `actix_threadpool::BlockingError` type is moved into
`actix_http::error` module. [#1878]

[#1813]: https://github.com/actix/actix-web/pull/1813
[#1857]: https://github.com/actix/actix-web/pull/1857
[#1864]: https://github.com/actix/actix-web/pull/1864

[#1878]: https://github.com/actix/actix-web/pull/1878

## 2.2.0 - 2020-11-25
### Added
Expand Down
1 change: 0 additions & 1 deletion actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ actix-service = "2.0.0-beta.2"
actix-codec = "0.4.0-beta.1"
actix-utils = "3.0.0-beta.1"
actix-rt = "2.0.0-beta.1"
actix-threadpool = "0.3.1"
actix-tls = "3.0.0-beta.2"
actix = { version = "0.11.0-beta.1", optional = true }

Expand Down
17 changes: 11 additions & 6 deletions actix-http/src/encoding/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

use actix_threadpool::{run, CpuFuture};
use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliDecoder;
use bytes::Bytes;
use flate2::write::{GzDecoder, ZlibDecoder};
use futures_core::{ready, Stream};

use super::Writer;
use crate::error::PayloadError;
use crate::error::{BlockingError, PayloadError};
use crate::http::header::{ContentEncoding, HeaderMap, CONTENT_ENCODING};

const INPLACE: usize = 2049;
Expand All @@ -19,7 +19,7 @@ pub struct Decoder<S> {
decoder: Option<ContentDecoder>,
stream: S,
eof: bool,
fut: Option<CpuFuture<(Option<Bytes>, ContentDecoder), io::Error>>,
fut: Option<JoinHandle<Result<(Option<Bytes>, ContentDecoder), io::Error>>>,
}

impl<S> Decoder<S>
Expand Down Expand Up @@ -80,8 +80,13 @@ where
loop {
if let Some(ref mut fut) = self.fut {
let (chunk, decoder) = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
Ok(Ok(item)) => item,
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(BlockingError::Canceled.into())))
}
};
self.decoder = Some(decoder);
self.fut.take();
Expand All @@ -105,7 +110,7 @@ where
return Poll::Ready(Some(Ok(chunk)));
}
} else {
self.fut = Some(run(move || {
self.fut = Some(spawn_blocking(move || {
let chunk = decoder.feed_data(chunk)?;
Ok((chunk, decoder))
}));
Expand Down
18 changes: 13 additions & 5 deletions actix-http/src/encoding/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::{self, Write};
use std::pin::Pin;
use std::task::{Context, Poll};

use actix_threadpool::{run, CpuFuture};
use actix_rt::task::{spawn_blocking, JoinHandle};
use brotli2::write::BrotliEncoder;
use bytes::Bytes;
use flate2::write::{GzEncoder, ZlibEncoder};
Expand All @@ -17,6 +17,7 @@ use crate::http::{HeaderValue, StatusCode};
use crate::{Error, ResponseHead};

use super::Writer;
use crate::error::BlockingError;

const INPLACE: usize = 1024;

Expand All @@ -26,7 +27,7 @@ pub struct Encoder<B> {
#[pin]
body: EncoderBody<B>,
encoder: Option<ContentEncoder>,
fut: Option<CpuFuture<ContentEncoder, io::Error>>,
fut: Option<JoinHandle<Result<ContentEncoder, io::Error>>>,
}

impl<B: MessageBody> Encoder<B> {
Expand Down Expand Up @@ -136,8 +137,15 @@ impl<B: MessageBody> MessageBody for Encoder<B> {

if let Some(ref mut fut) = this.fut {
let mut encoder = match ready!(Pin::new(fut).poll(cx)) {
Ok(item) => item,
Err(e) => return Poll::Ready(Some(Err(e.into()))),
Ok(Ok(item)) => item,
Ok(Err(e)) => {
return Poll::Ready(Some(Err(BlockingError::Error(e).into())))
}
Err(_) => {
return Poll::Ready(Some(Err(
BlockingError::<io::Error>::Canceled.into(),
)))
}
};
let chunk = encoder.take();
*this.encoder = Some(encoder);
Expand All @@ -160,7 +168,7 @@ impl<B: MessageBody> MessageBody for Encoder<B> {
return Poll::Ready(Some(Ok(chunk)));
}
} else {
*this.fut = Some(run(move || {
*this.fut = Some(spawn_blocking(move || {
encoder.write(&chunk)?;
Ok(encoder)
}));
Expand Down
18 changes: 14 additions & 4 deletions actix-http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use std::string::FromUtf8Error;
use std::{fmt, io, result};

use actix_codec::{Decoder, Encoder};
pub use actix_threadpool::BlockingError;
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
use actix_utils::timeout::TimeoutError;
use bytes::BytesMut;
Expand Down Expand Up @@ -190,9 +189,6 @@ impl ResponseError for DeError {
/// `InternalServerError` for `Canceled`
impl ResponseError for Canceled {}

/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}

/// Return `BAD_REQUEST` for `Utf8Error`
impl ResponseError for Utf8Error {
fn status_code(&self) -> StatusCode {
Expand Down Expand Up @@ -304,6 +300,20 @@ impl From<httparse::Error> for ParseError {
}
}

#[derive(Debug, Display)]
/// A set of errors that can occur running blocking tasks in thread pool.
fakeshadow marked this conversation as resolved.
Show resolved Hide resolved
pub enum BlockingError<E: fmt::Debug> {
#[display(fmt = "{:?}", _0)]
Error(E),
#[display(fmt = "Thread pool is gone")]
Canceled,
}

impl<E: fmt::Debug> std::error::Error for BlockingError<E> {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

derive_more Error would delcare E the source

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from the actix-threadpool code and I believe it's explictly added by a PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay can do it later if needed


/// `InternalServerError` for `BlockingError`
impl<E: fmt::Debug> ResponseError for BlockingError<E> {}

#[derive(Display, Debug)]
/// A set of errors that can occur during payload parsing
pub enum PayloadError {
Expand Down
8 changes: 7 additions & 1 deletion src/web.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,5 +280,11 @@ where
I: Send + 'static,
E: Send + std::fmt::Debug + 'static,
{
actix_threadpool::run(f).await
// map error to BlockingError. this is for not breaking the existing api.
// preferably this should just return actix_rt::task::JoinHandle for a more
// flexible control of when the task is awaited and/or the abort(if needed).
robjtede marked this conversation as resolved.
Show resolved Hide resolved
match actix_rt::task::spawn_blocking(f).await {
Ok(res) => res.map_err(BlockingError::Error),
Err(_) => Err(BlockingError::Canceled),
}
}