Skip to content

Commit

Permalink
Merge pull request #717 from benesch/tokio-10
Browse files Browse the repository at this point in the history
deps: upgrade to tokio v1.0 ecosystem
  • Loading branch information
sfackler authored Dec 25, 2020
2 parents c395b97 + f1729e4 commit 31de758
Show file tree
Hide file tree
Showing 12 changed files with 63 additions and 30 deletions.
6 changes: 3 additions & 3 deletions postgres-native-tls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies]
futures = "0.3"
native-tls = "0.2"
tokio = "0.3"
tokio-native-tls = "0.2"
tokio = "1.0"
tokio-native-tls = "0.3"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false }

[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
postgres = { version = "0.18.0", path = "../postgres" }
6 changes: 3 additions & 3 deletions postgres-openssl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ runtime = ["tokio-postgres/runtime"]
[dependencies]
futures = "0.3"
openssl = "0.10"
tokio = "0.3"
tokio-openssl = "0.5"
tokio = "1.0"
tokio-openssl = "0.6"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres", default-features = false }

[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
postgres = { version = "0.18.0", path = "../postgres" }
50 changes: 42 additions & 8 deletions postgres-openssl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,18 @@ use openssl::hash::MessageDigest;
use openssl::nid::Nid;
#[cfg(feature = "runtime")]
use openssl::ssl::SslConnector;
use openssl::ssl::{ConnectConfiguration, SslRef};
use std::fmt::Debug;
use openssl::ssl::{self, ConnectConfiguration, SslRef};
use openssl::x509::X509VerifyResult;
use std::error::Error;
use std::fmt::{self, Debug};
use std::future::Future;
use std::io;
use std::pin::Pin;
#[cfg(feature = "runtime")]
use std::sync::Arc;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio_openssl::{HandshakeError, SslStream};
use tokio_openssl::SslStream;
use tokio_postgres::tls;
#[cfg(feature = "runtime")]
use tokio_postgres::tls::MakeTlsConnect;
Expand Down Expand Up @@ -131,23 +133,55 @@ impl TlsConnector {

impl<S> TlsConnect<S> for TlsConnector
where
S: AsyncRead + AsyncWrite + Unpin + Debug + 'static + Sync + Send,
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
type Stream = TlsStream<S>;
type Error = HandshakeError<S>;
type Error = Box<dyn Error + Send + Sync>;
#[allow(clippy::type_complexity)]
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, HandshakeError<S>>> + Send>>;
type Future = Pin<Box<dyn Future<Output = Result<TlsStream<S>, Self::Error>> + Send>>;

fn connect(self, stream: S) -> Self::Future {
let future = async move {
let stream = tokio_openssl::connect(self.ssl, &self.domain, stream).await?;
Ok(TlsStream(stream))
let ssl = self.ssl.into_ssl(&self.domain)?;
let mut stream = SslStream::new(ssl, stream)?;
match Pin::new(&mut stream).connect().await {
Ok(()) => Ok(TlsStream(stream)),
Err(error) => Err(Box::new(ConnectError {
error,
verify_result: stream.ssl().verify_result(),
}) as _),
}
};

Box::pin(future)
}
}

#[derive(Debug)]
struct ConnectError {
error: ssl::Error,
verify_result: X509VerifyResult,
}

impl fmt::Display for ConnectError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(&self.error, fmt)?;

if self.verify_result != X509VerifyResult::OK {
fmt.write_str(": ")?;
fmt::Display::fmt(&self.verify_result, fmt)?;
}

Ok(())
}
}

impl Error for ConnectError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
Some(&self.error)
}
}

/// The stream returned by `TlsConnector`.
pub struct TlsStream<S>(SslStream<S>);

Expand Down
2 changes: 1 addition & 1 deletion postgres-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ readme = "../README.md"
[dependencies]
base64 = "0.13"
byteorder = "1.0"
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
hmac = "0.10"
md5 = "0.7"
Expand Down
2 changes: 1 addition & 1 deletion postgres-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ with-uuid-0_8 = ["uuid-08"]
with-time-0_2 = ["time-02"]

[dependencies]
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-derive = { version = "0.4.0", optional = true, path = "../postgres-derive" }
Expand Down
1 change: 0 additions & 1 deletion postgres-types/src/serde_json_1.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::{FromSql, IsNull, ToSql, Type};
use bytes::buf::BufMutExt;
use bytes::{BufMut, BytesMut};
use serde_1::{Deserialize, Serialize};
use serde_json_1::Value;
Expand Down
4 changes: 2 additions & 2 deletions postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
with-time-0_2 = ["tokio-postgres/with-time-0_2"]

[dependencies]
bytes = "0.5"
bytes = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
tokio-postgres = { version = "0.6.0", path = "../tokio-postgres" }

tokio = { version = "0.3", features = ["rt", "time"] }
tokio = { version = "1.0", features = ["rt", "time"] }
log = "0.4"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion postgres/src/copy_out_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl BufRead for CopyOutReader<'_> {
};
}

Ok(self.cur.bytes())
Ok(&self.cur)
}

fn consume(&mut self, amt: usize) {
Expand Down
9 changes: 5 additions & 4 deletions postgres/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::connection::ConnectionRef;
use crate::{Error, Notification};
use fallible_iterator::FallibleIterator;
use futures::{ready, FutureExt};
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;
use tokio::time::{self, Instant, Sleep};
Expand Down Expand Up @@ -64,7 +65,7 @@ impl<'a> Notifications<'a> {
/// This iterator may start returning `Some` after previously returning `None` if more notifications are received.
pub fn timeout_iter(&mut self, timeout: Duration) -> TimeoutIter<'_> {
TimeoutIter {
delay: self.connection.enter(|| time::sleep(timeout)),
delay: Box::pin(self.connection.enter(|| time::sleep(timeout))),
timeout,
connection: self.connection.as_ref(),
}
Expand Down Expand Up @@ -124,7 +125,7 @@ impl<'a> FallibleIterator for BlockingIter<'a> {
/// A time-limited blocking iterator over pending notifications.
pub struct TimeoutIter<'a> {
connection: ConnectionRef<'a>,
delay: Sleep,
delay: Pin<Box<Sleep>>,
timeout: Duration,
}

Expand All @@ -134,7 +135,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {

fn next(&mut self) -> Result<Option<Self::Item>, Self::Error> {
if let Some(notification) = self.connection.notifications_mut().pop_front() {
self.delay.reset(Instant::now() + self.timeout);
self.delay.as_mut().reset(Instant::now() + self.timeout);
return Ok(Some(notification));
}

Expand All @@ -143,7 +144,7 @@ impl<'a> FallibleIterator for TimeoutIter<'a> {
self.connection.poll_block_on(|cx, notifications, done| {
match notifications.pop_front() {
Some(notification) => {
delay.reset(Instant::now() + timeout);
delay.as_mut().reset(Instant::now() + timeout);
return Poll::Ready(Ok(Some(notification)));
}
None if done => return Poll::Ready(Ok(None)),
Expand Down
8 changes: 4 additions & 4 deletions tokio-postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ with-time-0_2 = ["postgres-types/with-time-0_2"]

[dependencies]
async-trait = "0.1"
bytes = "0.5"
bytes = "1.0"
byteorder = "1.0"
fallible-iterator = "0.2"
futures = "0.3"
Expand All @@ -50,11 +50,11 @@ phf = "0.8"
postgres-protocol = { version = "0.5.0", path = "../postgres-protocol" }
postgres-types = { version = "0.1.2", path = "../postgres-types" }
socket2 = "0.3"
tokio = { version = "0.3", features = ["io-util"] }
tokio-util = { version = "0.4", features = ["codec"] }
tokio = { version = "1.0", features = ["io-util"] }
tokio-util = { version = "0.6", features = ["codec"] }

[dev-dependencies]
tokio = { version = "0.3", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
env_logger = "0.8"
criterion = "0.3"

Expand Down
2 changes: 1 addition & 1 deletion tokio-postgres/src/binary_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Stream for BinaryCopyOutStream {
Some(header) => header.has_oids,
None => {
check_remaining(&chunk, HEADER_LEN)?;
if &chunk.bytes()[..MAGIC.len()] != MAGIC {
if !chunk.chunk().starts_with(MAGIC) {
return Poll::Ready(Some(Err(Error::parse(io::Error::new(
io::ErrorKind::InvalidData,
"invalid magic value",
Expand Down
1 change: 0 additions & 1 deletion tokio-postgres/src/copy_in.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::client::{InnerClient, Responses};
use crate::codec::FrontendMessage;
use crate::connection::RequestMessages;
use crate::{query, slice_iter, Error, Statement};
use bytes::buf::BufExt;
use bytes::{Buf, BufMut, BytesMut};
use futures::channel::mpsc;
use futures::future;
Expand Down

0 comments on commit 31de758

Please sign in to comment.