Skip to content

Commit

Permalink
convert Server::bind to accept a normal service factory
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Oct 23, 2021
1 parent 81421c2 commit d43a0c4
Show file tree
Hide file tree
Showing 16 changed files with 336 additions and 124 deletions.
2 changes: 2 additions & 0 deletions actix-server/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## Unreleased - 2021-xx-xx
* Rename `Server` to `ServerHandle`. [#???]
* Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#???]
* Remove wrapper `service::ServiceFactory` trait. [#???]
* `Server::bind` and related methods now take a regular `ServiceFactory` (from actix-service crate). [#???]
* Minimum supported Rust version (MSRV) is now 1.52.

[#???]: https://github.com/actix/actix-net/pull/???
Expand Down
2 changes: 1 addition & 1 deletion actix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ actix-rt = "2.0.0"
bytes = "1"
env_logger = "0.9"
futures-util = { version = "0.3.7", default-features = false, features = ["sink"] }
tokio = { version = "1.5.1", features = ["io-util"] }
tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] }
25 changes: 25 additions & 0 deletions actix-server/examples/startup-fail.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use std::io;

use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::fn_service;
use log::info;

#[actix_rt::main]
async fn main() -> io::Result<()> {
env_logger::Builder::from_env(env_logger::Env::new().default_filter_or("trace,mio=info"))
.init();

let addr = ("127.0.0.1", 8080);
info!("starting server on port: {}", &addr.0);

Server::build()
.bind(
"startup-fail",
addr,
fn_service(move |mut _stream: TcpStream| async move { Ok::<u32, u32>(42) }),
)?
.workers(2)
.run()
.await
}
5 changes: 2 additions & 3 deletions actix-server/examples/tcp-echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use actix_rt::net::TcpStream;
use actix_server::Server;
use actix_service::{fn_service, ServiceFactoryExt as _};
use bytes::BytesMut;
use futures_util::future::ok;
use log::{error, info};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand All @@ -39,7 +38,7 @@ async fn main() -> io::Result<()> {
// logical CPU cores as the worker count. For this reason, the closure passed to bind needs
// to return a service *factory*; so it can be created once per worker.
Server::build()
.bind("echo", addr, move || {
.bind("echo", addr, {
let count = Arc::clone(&count);
let num2 = Arc::clone(&count);

Expand Down Expand Up @@ -81,7 +80,7 @@ async fn main() -> io::Result<()> {
.and_then(move |(_, size)| {
let num = num2.load(Ordering::SeqCst);
info!("[{}] total bytes read: {}", num, size);
ok(size)
async move { Ok(size) }
})
})?
.workers(1)
Expand Down
80 changes: 48 additions & 32 deletions actix-server/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::{
fmt,
future::Future,
io, mem,
pin::Pin,
Expand All @@ -7,32 +8,25 @@ use std::{
};

use actix_rt::{self as rt, net::TcpStream, time::sleep, System};
use actix_service::ServiceFactory;
use log::{error, info};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedReceiver},
oneshot,
};

use crate::accept::AcceptLoop;
use crate::join_all;
use crate::server::{ServerCommand, ServerHandle};
use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService};
use crate::signals::{Signal, Signals};
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
use crate::socket::{MioTcpListener, MioTcpSocket};
use crate::waker_queue::{WakerInterest, WakerQueue};
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer};

#[derive(Debug)]
#[non_exhaustive]
pub struct Server;

impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}
use crate::{
accept::AcceptLoop,
join_all,
server::{ServerCommand, ServerHandle},
service::{InternalServiceFactory, StreamNewService},
signals::{Signal, Signals},
socket::{
MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs,
},
waker_queue::{WakerInterest, WakerQueue},
worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer},
};

/// Server builder
pub struct ServerBuilder {
Expand Down Expand Up @@ -169,38 +163,48 @@ impl ServerBuilder {
/// Binds to all network interface addresses that resolve from the `addr` argument.
/// Eg. using `localhost` might bind to both IPv4 and IPv6 addresses. Bind to multiple distinct
/// interfaces at the same time by passing a list of socket addresses.
pub fn bind<F, U, N: AsRef<str>>(mut self, name: N, addr: U, factory: F) -> io::Result<Self>
pub fn bind<F, U, InitErr>(
mut self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
U: ToSocketAddrs,
{
let sockets = bind_addr(addr, self.backlog)?;

for lst in sockets {
let token = self.next_token();

self.services.push(StreamNewService::create(
name.as_ref().to_string(),
token,
factory.clone(),
lst.local_addr()?,
));

self.sockets
.push((token, name.as_ref().to_string(), MioListener::Tcp(lst)));
}

Ok(self)
}

/// Bind server to existing TCP listener.
///
/// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen<F, N: AsRef<str>>(
pub fn listen<F, InitErr>(
mut self,
name: N,
name: impl AsRef<str>,
lst: StdTcpListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<TcpStream>,
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{
lst.set_nonblocking(true)?;

Expand Down Expand Up @@ -259,7 +263,7 @@ impl ServerBuilder {
Signals::start(self.server.clone());
}

// start http server actor
// start http server
let server = self.server.clone();
rt::spawn(self);
server
Expand Down Expand Up @@ -402,11 +406,19 @@ impl ServerBuilder {
#[cfg(unix)]
impl ServerBuilder {
/// Add new unix domain service to the server.
pub fn bind_uds<F, U, N>(self, name: N, addr: U, factory: F) -> io::Result<Self>
pub fn bind_uds<F, U, InitErr>(
self,
name: impl AsRef<str>,
addr: U,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
N: AsRef<str>,
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
U: AsRef<std::path::Path>,
InitErr: fmt::Debug + Send + 'static,
{
// The path must not exist when we try to bind.
// Try to remove it to avoid bind error.
Expand All @@ -424,14 +436,18 @@ impl ServerBuilder {
/// Add new unix domain service to the server.
///
/// Useful when running as a systemd service and a socket FD can be passed to the process.
pub fn listen_uds<F, N: AsRef<str>>(
pub fn listen_uds<F, InitErr>(
mut self,
name: N,
name: impl AsRef<str>,
lst: crate::socket::StdUnixListener,
factory: F,
) -> io::Result<Self>
where
F: ServiceFactory<actix_rt::net::UnixStream>,
F: ServiceFactory<actix_rt::net::UnixStream, Config = (), InitError = InitErr>
+ Send
+ Clone
+ 'static,
InitErr: fmt::Debug + Send + 'static,
{
use std::net::{IpAddr, Ipv4Addr};

Expand Down
5 changes: 2 additions & 3 deletions actix-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ mod test_server;
mod waker_queue;
mod worker;

pub use self::builder::{Server, ServerBuilder};
pub use self::server::{ServerHandle};
pub use self::service::ServiceFactory;
pub use self::builder::ServerBuilder;
pub use self::server::{Server, ServerHandle};
pub use self::test_server::TestServer;

#[doc(hidden)]
Expand Down
13 changes: 12 additions & 1 deletion actix-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@ use std::task::{Context, Poll};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::oneshot;

use crate::signals::Signal;
use crate::{signals::Signal, ServerBuilder};

#[derive(Debug)]
#[non_exhaustive]
pub struct Server;

impl Server {
/// Start server building process.
pub fn build() -> ServerBuilder {
ServerBuilder::default()
}
}

#[derive(Debug)]
pub(crate) enum ServerCommand {
Expand Down
59 changes: 25 additions & 34 deletions actix-server/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::task::{Context, Poll};

use actix_service::{Service, ServiceFactory as BaseServiceFactory};
use std::{
fmt,
marker::PhantomData,
net::SocketAddr,
task::{Context, Poll},
};

use actix_service::{Service, ServiceFactory};
use actix_utils::future::{ready, Ready};
use futures_core::future::LocalBoxFuture;
use log::error;

use crate::socket::{FromStream, MioStream};
use crate::worker::WorkerCounterGuard;

pub trait ServiceFactory<Stream: FromStream>: Send + Clone + 'static {
type Factory: BaseServiceFactory<Stream, Config = ()>;

fn create(&self) -> Self::Factory;
}
use crate::{
socket::{FromStream, MioStream},
worker::WorkerCounterGuard,
};

pub(crate) trait InternalServiceFactory: Send {
fn name(&self, token: usize) -> &str;
Expand Down Expand Up @@ -80,17 +79,18 @@ where
}
}

pub(crate) struct StreamNewService<F: ServiceFactory<Io>, Io: FromStream> {
pub(crate) struct StreamNewService<F, Io, InitErr> {
name: String,
inner: F,
token: usize,
addr: SocketAddr,
_t: PhantomData<Io>,
_t: PhantomData<(Io, InitErr)>,
}

impl<F, Io> StreamNewService<F, Io>
impl<F, Io, InitErr> StreamNewService<F, Io, InitErr>
where
F: ServiceFactory<Io>,
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static,
{
pub(crate) fn create(
Expand All @@ -109,9 +109,10 @@ where
}
}

impl<F, Io> InternalServiceFactory for StreamNewService<F, Io>
impl<F, Io, InitErr> InternalServiceFactory for StreamNewService<F, Io, InitErr>
where
F: ServiceFactory<Io>,
F: ServiceFactory<Io, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
Io: FromStream + Send + 'static,
{
fn name(&self, _: usize) -> &str {
Expand All @@ -130,28 +131,18 @@ where

fn create(&self) -> LocalBoxFuture<'static, Result<(usize, BoxedServerService), ()>> {
let token = self.token;
let fut = self.inner.create().new_service(());
let fut = self.inner.new_service(());
Box::pin(async move {
match fut.await {
Ok(inner) => {
let service = Box::new(StreamService::new(inner)) as _;
Ok((token, service))
}
Err(_) => Err(()),
Err(err) => {
error!("{:?}", err);
Err(())
}
}
})
}
}

impl<F, T, I> ServiceFactory<I> for F
where
F: Fn() -> T + Send + Clone + 'static,
T: BaseServiceFactory<I, Config = ()>,
I: FromStream,
{
type Factory = T;

fn create(&self) -> T {
(self)()
}
}
11 changes: 8 additions & 3 deletions actix-server/src/test_server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::mpsc;
use std::{net, thread};
use std::{fmt, net, thread};

use actix_rt::{net::TcpStream, System};
use actix_service::ServiceFactory;

use crate::{Server, ServerBuilder, ServiceFactory};
use crate::{Server, ServerBuilder};

/// A testing server.
///
Expand Down Expand Up @@ -64,7 +65,11 @@ impl TestServer {
}

/// Start new test server with application factory.
pub fn with<F: ServiceFactory<TcpStream>>(factory: F) -> TestServerRuntime {
pub fn with<F, InitErr>(factory: F) -> TestServerRuntime
where
F: ServiceFactory<TcpStream, Config = (), InitError = InitErr> + Send + Clone + 'static,
InitErr: fmt::Debug + Send + 'static,
{
let (tx, rx) = mpsc::channel();

// run server in separate thread
Expand Down
Loading

0 comments on commit d43a0c4

Please sign in to comment.