Skip to content

Commit

Permalink
use new codec signature in actix-utils
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed May 18, 2020
1 parent 46d79b8 commit 0caaf91
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 49 deletions.
94 changes: 46 additions & 48 deletions actix-utils/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,28 @@ use std::{fmt, mem};

use actix_codec::{AsyncRead, AsyncWrite, Decoder, Encoder, Framed};
use actix_service::{IntoService, Service};
use futures_util::{future::Future, FutureExt, stream::Stream};
use futures_util::{future::Future, stream::Stream, FutureExt};
use log::debug;

use crate::mpsc;

type Request<U> = <U as Decoder>::Item;
type Response<U> = <U as Encoder>::Item;

/// Framed transport errors
pub enum DispatcherError<E, U: Encoder + Decoder> {
pub enum DispatcherError<E, U: Encoder<I> + Decoder, I> {
Service(E),
Encoder(<U as Encoder>::Error),
Encoder(<U as Encoder<I>>::Error),
Decoder(<U as Decoder>::Error),
}

impl<E, U: Encoder + Decoder> From<E> for DispatcherError<E, U> {
impl<E, U: Encoder<I> + Decoder, I> From<E> for DispatcherError<E, U, I> {
fn from(err: E) -> Self {
DispatcherError::Service(err)
}
}

impl<E, U: Encoder + Decoder> fmt::Debug for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Debug for DispatcherError<E, U, I>
where
E: fmt::Debug,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -42,10 +39,10 @@ where
}
}

impl<E, U: Encoder + Decoder> fmt::Display for DispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> fmt::Display for DispatcherError<E, U, I>
where
E: fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -62,60 +59,61 @@ pub enum Message<T> {
Close,
}

/// FramedTransport - is a future that reads frames from Framed object
/// and pass then to the service.
/// Dispatcher is a future that reads frames from Framed object
/// and passes them to the service.
#[pin_project::pin_project]
pub struct Dispatcher<S, T, U>
pub struct Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Encoder + Decoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Encoder<I> + Decoder,
// I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
service: S,
state: State<S, U>,
state: State<S, U, I>,
#[pin]
framed: Framed<T, U>,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
tx: mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
tx: mpsc::Sender<Result<Message<I>, S::Error>>,
}

enum State<S: Service, U: Encoder + Decoder> {
enum State<S: Service, U: Encoder<I> + Decoder, I> {
Processing,
Error(DispatcherError<S::Error, U>),
FramedError(DispatcherError<S::Error, U>),
Error(DispatcherError<S::Error, U, I>),
FramedError(DispatcherError<S::Error, U, I>),
FlushAndStop,
Stopping,
}

impl<S: Service, U: Encoder + Decoder> State<S, U> {
fn take_error(&mut self) -> DispatcherError<S::Error, U> {
impl<S: Service, U: Encoder<I> + Decoder, I> State<S, U, I> {
fn take_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::Error(err) => err,
_ => panic!(),
}
}

fn take_framed_error(&mut self) -> DispatcherError<S::Error, U> {
fn take_framed_error(&mut self) -> DispatcherError<S::Error, U, I> {
match mem::replace(self, State::Processing) {
State::FramedError(err) => err,
_ => panic!(),
}
}
}

impl<S, T, U> Dispatcher<S, T, U>
impl<S, T, U, I> Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
// I: 'static,
<U as Decoder>::Error: std::fmt::Debug,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
pub fn new<F: IntoService<S>>(framed: Framed<T, U>, service: F) -> Self {
let (tx, rx) = mpsc::channel();
Expand All @@ -132,7 +130,7 @@ where
pub fn with_rx<F: IntoService<S>>(
framed: Framed<T, U>,
service: F,
rx: mpsc::Receiver<Result<Message<<U as Encoder>::Item>, S::Error>>,
rx: mpsc::Receiver<Result<Message<I>, S::Error>>,
) -> Self {
let tx = rx.sender();
Dispatcher {
Expand All @@ -145,7 +143,7 @@ where
}

/// Get sink
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<<U as Encoder>::Item>, S::Error>> {
pub fn get_sink(&self) -> mpsc::Sender<Result<Message<I>, S::Error>> {
self.tx.clone()
}

Expand All @@ -172,13 +170,13 @@ where

fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let this = self.as_mut().project();
Expand Down Expand Up @@ -214,13 +212,13 @@ where
/// write to framed object
fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> bool
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
{
loop {
let mut this = self.as_mut().project();
Expand Down Expand Up @@ -263,18 +261,18 @@ where
}
}

impl<S, T, U> Future for Dispatcher<S, T, U>
impl<S, T, U, I> Future for Dispatcher<S, T, U, I>
where
S: Service<Request = Request<U>, Response = Response<U>>,
S: Service<Request = <U as Decoder>::Item, Response = I>,
S::Error: 'static,
S::Future: 'static,
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
<U as Encoder>::Item: 'static,
<U as Encoder>::Error: std::fmt::Debug,
U: Decoder + Encoder<I>,
I: 'static,
<U as Encoder<I>>::Error: std::fmt::Debug,
<U as Decoder>::Error: std::fmt::Debug,
{
type Output = Result<(), DispatcherError<S::Error, U>>;
type Output = Result<(), DispatcherError<S::Error, U, I>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
Expand Down
2 changes: 1 addition & 1 deletion actix-utils/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Actix utils - various helper services
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]

mod cell;
Expand Down

0 comments on commit 0caaf91

Please sign in to comment.