Skip to content

Commit

Permalink
Add Windows Implementation for sync server and client
Browse files Browse the repository at this point in the history
Adds the windows functionality for PipeListener, PipeConnection, and ClientConnection and does the few other changes required to build and run the example projects. This includes adding feature support to the examples so they wouldn't build the async projects (as the unix specific code hasn't been removed yet). Namedpipes are used as Containerd is one of the main use cases for this project on Windows and containerd only supports namedpipes.

Signed-off-by: James Sturtevant <jstur@microsoft.com>
  • Loading branch information
jsturtevant committed Mar 17, 2023
1 parent 914b5b5 commit e957414
Show file tree
Hide file tree
Showing 23 changed files with 521 additions and 113 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/bvt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand All @@ -22,7 +22,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
os: [ubuntu-latest, macos-latest, windows-latest]
steps:
- name: Checkout
uses: actions/checkout@v3
Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,22 @@ nix = "0.23.0"
log = "0.4"
byteorder = "1.3.2"
thiserror = "1.0"

async-trait = { version = "0.1.31", optional = true }
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
futures = { version = "0.3", optional = true }

[target.'cfg(windows)'.dependencies]
windows-sys = {version = "0.45", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}

[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
tokio-vsock = { version = "0.3.1", optional = true }

[build-dependencies]
protobuf-codegen = "3.1.0"

[dev-dependencies]
assert_cmd = "2.0.7"

[features]
default = ["sync"]
async = ["async-trait", "tokio", "futures", "tokio-vsock"]
Expand Down
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@ build: debug

.PHONY: test
test:
ifeq ($OS,Windows_NT)
# async isn't enabled for windows, don't test that feature
cargo test --verbose
else
cargo test --all-features --verbose

endif

.PHONY: check
check:
cargo fmt --all -- --check
Expand Down
9 changes: 8 additions & 1 deletion example/async-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

mod protocols;
mod utils;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
let c = Client::connect(utils::SOCK_ADDR).unwrap();
Expand Down
13 changes: 12 additions & 1 deletion example/async-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ use std::sync::Arc;

use log::LevelFilter;

#[cfg(unix)]
use protocols::r#async::{agent, agent_ttrpc, health, health_ttrpc, types};
#[cfg(unix)]
use ttrpc::asynchronous::Server;
use ttrpc::error::{Error, Result};
use ttrpc::proto::{Code, Status};

#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct HealthService;

#[cfg(unix)]
#[async_trait]
impl health_ttrpc::Health for HealthService {
async fn check(
Expand Down Expand Up @@ -58,7 +63,7 @@ impl health_ttrpc::Health for HealthService {
}

struct AgentService;

#[cfg(unix)]
#[async_trait]
impl agent_ttrpc::AgentService for AgentService {
async fn list_interfaces(
Expand All @@ -82,6 +87,12 @@ impl agent_ttrpc::AgentService for AgentService {
}
}

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Trace);
Expand Down
15 changes: 14 additions & 1 deletion example/async-stream-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@

mod protocols;
mod utils;

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
use ttrpc::context::{self, Context};
#[cfg(unix)]
use ttrpc::r#async::Client;

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(log::LevelFilter::Info);
Expand Down Expand Up @@ -48,6 +55,7 @@ fn default_ctx() -> Context {
ctx
}

#[cfg(unix)]
async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
let echo1 = streaming::EchoPayload {
seq: 1,
Expand All @@ -59,6 +67,7 @@ async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(resp.seq, echo1.seq + 1);
}

#[cfg(unix)]
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();

Expand All @@ -81,6 +90,7 @@ async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
}

#[cfg(unix)]
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();

Expand Down Expand Up @@ -108,6 +118,7 @@ async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(ssum.num, sum.num);
}

#[cfg(unix)]
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
let expected = streaming::Sum {
sum: 392,
Expand All @@ -127,6 +138,7 @@ async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(actual.num, expected.num);
}

#[cfg(unix)]
async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
let mut stream = cli.echo_null(default_ctx()).await.unwrap();

Expand All @@ -142,6 +154,7 @@ async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
assert_eq!(res, empty::Empty::new());
}

#[cfg(unix)]
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();

Expand Down
11 changes: 11 additions & 0 deletions example/async-stream-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,20 @@ use std::sync::Arc;

use log::{info, LevelFilter};

#[cfg(unix)]
use protocols::r#async::{empty, streaming, streaming_ttrpc};
#[cfg(unix)]
use ttrpc::asynchronous::Server;

#[cfg(unix)]
use async_trait::async_trait;
#[cfg(unix)]
use tokio::signal::unix::{signal, SignalKind};
use tokio::time::sleep;

struct StreamingService;

#[cfg(unix)]
#[async_trait]
impl streaming_ttrpc::Streaming for StreamingService {
async fn echo(
Expand Down Expand Up @@ -131,6 +136,12 @@ impl streaming_ttrpc::Streaming for StreamingService {
}
}

#[cfg(windows)]
fn main() {
println!("This example only works on Unix-like OSes");
}

#[cfg(unix)]
#[tokio::main(flavor = "current_thread")]
async fn main() {
simple_logging::log_to_stderr(LevelFilter::Info);
Expand Down
4 changes: 2 additions & 2 deletions example/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ fn main() {
async_all: true,
..Default::default()
})
.rust_protobuf_customize(protobuf_customized.clone())
.rust_protobuf_customize(protobuf_customized)
.run()
.expect("Gen async code failed.");

Expand Down Expand Up @@ -75,7 +75,7 @@ fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std

let new_contents = contents.replace(from, to);

let mut dst = File::create(&file_name)?;
let mut dst = File::create(file_name)?;
dst.write(new_contents.as_bytes())?;

Ok(())
Expand Down
5 changes: 3 additions & 2 deletions example/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
//
// SPDX-License-Identifier: Apache-2.0
//

#[cfg(unix)]
pub mod asynchronous;
pub mod sync;
#[cfg(unix)]
pub use asynchronous as r#async;
pub mod sync;
20 changes: 15 additions & 5 deletions example/utils.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
#![allow(dead_code)]
use std::fs;
use std::io::Result;
use std::path::Path;

pub const SOCK_ADDR: &str = "unix:///tmp/ttrpc-test";
#[cfg(unix)]
pub const SOCK_ADDR: &str = r"unix:///tmp/ttrpc-test";

#[cfg(windows)]
pub const SOCK_ADDR: &str = r"\\.\pipe\ttrpc-test";

#[cfg(unix)]
pub fn remove_if_sock_exist(sock_addr: &str) -> Result<()> {
let path = sock_addr
.strip_prefix("unix://")
.expect("socket address is not expected");

if Path::new(path).exists() {
fs::remove_file(&path)?;
if std::path::Path::new(path).exists() {
std::fs::remove_file(path)?;
}

Ok(())
}

#[cfg(windows)]
pub fn remove_if_sock_exist(_sock_addr: &str) -> Result<()> {
//todo force close file handle?

Ok(())
}
2 changes: 1 addition & 1 deletion src/asynchronous/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl ReaderDelegate for ClientReader {
};
resp_tx
.send(Err(Error::Others(format!(
"Recver got malformed packet {msg:?}"
"Receiver got malformed packet {msg:?}"
))))
.await
.unwrap_or_else(|_e| error!("The request has returned"));
Expand Down
2 changes: 1 addition & 1 deletion src/asynchronous/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ where
async fn _recv(rx: &mut ResultReceiver) -> Result<GenMessage> {
rx.recv().await.unwrap_or_else(|| {
Err(Error::Others(
"Receive packet from recver error".to_string(),
"Receive packet from Receiver error".to_string(),
))
})
}
Expand Down
23 changes: 2 additions & 21 deletions src/common.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#![cfg(not(windows))]
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

//! Common functions and macros.
//! Common functions.
use crate::error::{Error, Result};
#[cfg(any(
Expand Down Expand Up @@ -173,26 +174,6 @@ pub(crate) unsafe fn client_connect(sockaddr: &str) -> Result<RawFd> {
Ok(fd)
}

macro_rules! cfg_sync {
($($item:item)*) => {
$(
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
$item
)*
}
}

macro_rules! cfg_async {
($($item:item)*) => {
$(
#[cfg(feature = "async")]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
$item
)*
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 5 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ pub enum Error {
#[error("rpc status: {0:?}")]
RpcStatus(Status),

#[cfg(unix)]
#[error("Nix error: {0}")]
Nix(#[from] nix::Error),

#[cfg(windows)]
#[error("Windows error: {0}")]
Windows(i32),

#[error("ttrpc err: local stream closed")]
LocalClosed,

Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ pub mod error;
#[macro_use]
mod common;

#[macro_use]
mod macros;

pub mod context;

pub mod proto;
Expand Down
26 changes: 26 additions & 0 deletions src/macros.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//

//! macro functions.
macro_rules! cfg_sync {
($($item:item)*) => {
$(
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(cfg(feature = "sync")))]
$item
)*
}
}

macro_rules! cfg_async {
($($item:item)*) => {
$(
#[cfg(all(feature = "async", target_family="unix"))]
#[cfg_attr(docsrs, doc(cfg(feature = "async")))]
$item
)*
}
}
Loading

0 comments on commit e957414

Please sign in to comment.