Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server bind accepts types impling regular service traits #403

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
6 changes: 6 additions & 0 deletions actix-server/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
# Changes

## Unreleased - 2021-xx-xx
* Rename `Server` to `ServerHandle`. [#403]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#403]
* Remove wrapper `service::ServiceFactory` trait. [#403]
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#403]
* Minimum supported Rust version (MSRV) is now 1.52.

[#403]: https://github.com/actix/actix-net/pull/403


## 2.0.0-beta.6 - 2021-10-11
* Add experimental (semver-exempt) `io-uring` feature for enabling async file I/O on linux. [#374]
Expand Down
2 changes: 1 addition & 1 deletion actix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ actix-rt = "2.0.0"
bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
33 changes: 33 additions & 0 deletions actix-server/examples/startup-fail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::io;

use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_factory, fn_service};
use log::info;

#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
.init();

let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);

Server::build()
.bind(
"startup-fail",
addr,
fn_factory(|| async move {
if 1 > 2 {
Ok(fn_service(move |mut _stream: TcpStream| async move {
Ok::<u32, u32>(0)
}))
} else {
Err(42)
}
}),
)?
.workers(2)
.run()
.await
}
88 changes: 50 additions & 38 deletions actix-server/examples/tcp-echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use std::{

use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use actix_service::{fn_factory, fn_service, ServiceExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand All @@ -39,52 +38,65 @@ async fn main() -> io::Result<()> {
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker.
Server::build()
.bind("echo", addr, move || {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);

fn_service(move |mut stream: TcpStream| {
.bind("echo", addr, {
fn_factory::<_, (), _, _, _, _>(move || {
let count = Arc::clone(&count);

async move {
let num = count.fetch_add(1, Ordering::SeqCst);
let num = num + 1;

let mut size = 0;
let mut buf = BytesMut::new();

loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,

// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}
let count = Arc::clone(&count);
let count2 = Arc::clone(&count);

let svc = fn_service(move |mut stream: TcpStream| {
let count = Arc::clone(&count);

let num = count.fetch_add(1, Ordering::SeqCst) + 1;

info!(
"[{}] accepting connection from: {}",
num,
stream.peer_addr().unwrap()
);

// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
async move {
let mut size = 0;
let mut buf = BytesMut::new();

loop {
match stream.read_buf(&mut buf).await {
// end of stream; bail from loop
Ok(0) => break,

// more bytes to process
Ok(bytes_read) => {
info!("[{}] read {} bytes", num, bytes_read);
stream.write_all(&buf[size..]).await.unwrap();
size += bytes_read;
}

// stream error; bail from loop with error
Err(err) => {
error!("Stream Error: {:?}", err);
return Err(());
}
}
}

// send data down service pipeline
Ok((buf.freeze(), size))
}
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = count2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
async move { Ok(size) }
});

// send data down service pipeline
Ok((buf.freeze(), size))
Ok::<_, ()>(svc.clone())
}
})
.map_err(|err| error!("Service Error: {:?}", err))
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
})
})?
.workers(1)
.workers(2)
.run()
.await
}
12 changes: 6 additions & 6 deletions actix-server/src/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use actix_rt::{
use log::{debug, error, info};
use mio::{Interest, Poll, Token as MioToken};

use crate::server::Server;
use crate::server::ServerHandle;
use crate::socket::MioListener;
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
use crate::worker::{Conn, WorkerHandleAccept};
Expand All @@ -30,13 +30,13 @@ struct ServerSocketInfo {
///
/// It would also listen to `ServerCommand` and push interests to `WakerQueue`.
pub(crate) struct AcceptLoop {
srv: Option<Server>,
srv: Option<ServerHandle>,
poll: Option<Poll>,
waker: WakerQueue,
}

impl AcceptLoop {
pub fn new(srv: Server) -> Self {
pub fn new(srv: ServerHandle) -> Self {
let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e));
let waker = WakerQueue::new(poll.registry())
.unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e));
Expand Down Expand Up @@ -74,7 +74,7 @@ struct Accept {
poll: Poll,
waker: WakerQueue,
handles: Vec<WorkerHandleAccept>,
srv: Server,
srv: ServerHandle,
next: usize,
avail: Availability,
paused: bool,
Expand Down Expand Up @@ -153,7 +153,7 @@ impl Accept {
poll: Poll,
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
srv: Server,
srv: ServerHandle,
handles: Vec<WorkerHandleAccept>,
) {
// Accept runs in its own thread and would want to spawn additional futures to current
Expand All @@ -176,7 +176,7 @@ impl Accept {
waker: WakerQueue,
socks: Vec<(usize, MioListener)>,
handles: Vec<WorkerHandleAccept>,
srv: Server,
srv: ServerHandle,
) -> (Accept, Vec<ServerSocketInfo>) {
let sockets = socks
.into_iter()
Expand Down
Loading