Skip to content

Commit

Permalink
fix send/sync issue #8
Browse files Browse the repository at this point in the history
  • Loading branch information
Joylei committed Apr 16, 2022
1 parent 6c006dd commit e867870
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 80 deletions.
18 changes: 9 additions & 9 deletions cip/src/service/common_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use multiple_packet::MultipleServicePacket;
use rseip_core::codec::{Decode, Encode, SliceContainer};

/// common services
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
pub trait CommonServices: MessageService {
/// invoke the Get_Attribute_All service
#[inline]
Expand All @@ -25,7 +25,7 @@ pub trait CommonServices: MessageService {

/// invoke the Set_Attribute_All service
#[inline]
async fn set_attribute_all<D: Encode>(
async fn set_attribute_all<D: Encode + Send + Sync>(
&mut self,
path: EPath,
attrs: D,
Expand Down Expand Up @@ -65,7 +65,7 @@ pub trait CommonServices: MessageService {
attrs: D,
) -> Result<R, Self::Error>
where
D: Encode,
D: Encode + Send + Sync,
R: Decode<'de> + 'static,
{
send_and_extract(self, 0x04, path, attrs).await
Expand Down Expand Up @@ -93,7 +93,7 @@ pub trait CommonServices: MessageService {
#[inline]
async fn create<'de, D, R>(&mut self, path: EPath, data: D) -> Result<R, Self::Error>
where
D: Encode,
D: Encode + Send + Sync,
R: Decode<'de> + 'static,
{
send_and_extract(self, 0x08, path, data).await
Expand All @@ -109,7 +109,7 @@ pub trait CommonServices: MessageService {
#[inline]
async fn apply_attributes<'de, D, R>(&mut self, path: EPath, data: D) -> Result<R, Self::Error>
where
D: Encode,
D: Encode + Send + Sync,
R: Decode<'de> + 'static,
{
send_and_extract(self, 0x0D, path, data).await
Expand All @@ -126,7 +126,7 @@ pub trait CommonServices: MessageService {

/// invoke the Set_Attribute_Single service
#[inline]
async fn set_attribute_single<D: Encode>(
async fn set_attribute_single<D: Encode + Send + Sync>(
&mut self,
path: EPath,
data: D,
Expand Down Expand Up @@ -165,7 +165,7 @@ pub trait CommonServices: MessageService {
#[inline]
async fn set_member<'de, D, R>(&mut self, path: EPath, data: D) -> Result<R, Self::Error>
where
D: Encode,
D: Encode + Send + Sync,
R: Decode<'de> + 'static,
{
send_and_extract(self, 0x19, path, data).await
Expand All @@ -175,7 +175,7 @@ pub trait CommonServices: MessageService {
#[inline]
async fn insert_member<'de, D, R>(&mut self, path: EPath, data: D) -> Result<R, Self::Error>
where
D: Encode,
D: Encode + Send + Sync,
R: Decode<'de> + 'static,
{
send_and_extract(self, 0x1A, path, data).await
Expand Down Expand Up @@ -205,5 +205,5 @@ pub trait CommonServices: MessageService {
}
}

#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl<T: MessageService> CommonServices for T {}
10 changes: 3 additions & 7 deletions cip/src/service/common_services/multiple_packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ impl<'a, T, P, D> MultipleServicePacket<'a, T, P, D> {
impl<'a, T, P, D> MultipleServicePacket<'a, T, P, D>
where
T: MessageService,
P: Encode,
D: Encode,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
{
/// append service request
pub fn push(mut self, mr: MessageRequest<P, D>) -> Self {
Expand All @@ -40,11 +40,7 @@ where
}

/// append all service requests
pub fn push_all(mut self, items: impl Iterator<Item = MessageRequest<P, D>>) -> Self
where
P: Encode + 'static,
D: Encode + 'static,
{
pub fn push_all(mut self, items: impl Iterator<Item = MessageRequest<P, D>>) -> Self {
for mr in items {
self.items.push(mr);
}
Expand Down
4 changes: 2 additions & 2 deletions cip/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

use rseip_core::Error;

#[async_trait::async_trait(?Send)]
pub trait Heartbeat {
#[async_trait::async_trait]
pub trait Heartbeat: Send + Sync {
type Error: Error;
/// send Heartbeat message to keep underline transport alive
async fn heartbeat(&mut self) -> Result<(), Self::Error>;
Expand Down
14 changes: 7 additions & 7 deletions cip/src/service/message_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ use rseip_core::{
Error,
};

#[async_trait::async_trait(?Send)]
pub trait MessageService {
#[async_trait::async_trait]
pub trait MessageService: Send + Sync {
type Error: Error;
/// send message request
async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R, Self::Error>
where
P: Encode,
D: Encode,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: MessageReplyInterface + Decode<'de> + 'static;

/// close underline transport
Expand All @@ -27,14 +27,14 @@ pub trait MessageService {
fn closed(&self) -> bool;
}

#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl<T: MessageService + Sized> MessageService for &mut T {
type Error = T::Error;
#[inline]
async fn send<'de, P, D, R>(&mut self, mr: MessageRequest<P, D>) -> Result<R, Self::Error>
where
P: Encode,
D: Encode,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: MessageReplyInterface + Decode<'de> + 'static,
{
(**self).send(mr).await
Expand Down
4 changes: 2 additions & 2 deletions cip/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub async fn send_and_extract<'de, S, P, D, R>(
) -> Result<R, S::Error>
where
S: MessageService + ?Sized,
P: Encode,
D: Encode,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: Decode<'de> + 'static,
{
let mr = MessageRequest {
Expand Down
18 changes: 9 additions & 9 deletions src/adapters/eip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use rseip_core::codec::{Decode, Encode};
use rseip_eip::EipContext;
use tokio::io::{AsyncRead, AsyncWrite};

#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl<T> Service for EipContext<T, ClientError>
where
T: AsyncRead + AsyncWrite + Unpin,
T: AsyncRead + AsyncWrite + Unpin + Send + Sync,
{
/// context is open?
fn is_open(&mut self) -> bool {
Expand Down Expand Up @@ -51,9 +51,9 @@ where
request: UnconnectedSend<CP, MessageRequest<P, D>>,
) -> Result<R>
where
CP: Encode,
P: Encode,
D: Encode,
CP: Encode + Send + Sync,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: MessageReplyInterface + Decode<'de> + 'static,
{
let service_code = request.data.service_code;
Expand All @@ -79,8 +79,8 @@ where
request: MessageRequest<P, D>,
) -> Result<R>
where
P: Encode,
D: Encode,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: MessageReplyInterface + Decode<'de> + 'static,
{
let service_code = request.service_code;
Expand All @@ -98,7 +98,7 @@ where
#[inline]
async fn forward_open<P>(&mut self, request: OpenOptions<P>) -> Result<ForwardOpenReply>
where
P: Encode,
P: Encode + Send + Sync,
{
let req: MessageRequest<&[u8], _> = MessageRequest {
service_code: SERVICE_FORWARD_OPEN,
Expand All @@ -118,7 +118,7 @@ where
request: ForwardCloseRequest<P>,
) -> Result<ForwardCloseReply>
where
P: Encode,
P: Encode + Send + Sync,
{
let req: MessageRequest<&[u8], _> = MessageRequest {
service_code: SERVICE_FORWARD_CLOSE,
Expand Down
18 changes: 9 additions & 9 deletions src/adapters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use rseip_core::codec::{Decode, Encode};

/// abstraction for basic CIP services;
/// different transport protocols derive this trait, eg EIP, DF1
#[async_trait::async_trait(?Send)]
pub trait Service {
#[async_trait::async_trait]
pub trait Service: Send + Sync {
/// context is open?
fn is_open(&mut self) -> bool;

Expand All @@ -41,9 +41,9 @@ pub trait Service {
request: UnconnectedSend<CP, MessageRequest<P, D>>,
) -> Result<R>
where
CP: Encode,
P: Encode,
D: Encode,
CP: Encode + Send + Sync,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: MessageReplyInterface + Decode<'de> + 'static;

/// connected send
Expand All @@ -54,20 +54,20 @@ pub trait Service {
request: MessageRequest<P, D>,
) -> Result<R>
where
P: Encode,
D: Encode,
P: Encode + Send + Sync,
D: Encode + Send + Sync,
R: MessageReplyInterface + Decode<'de> + 'static;

/// forward open
async fn forward_open<P>(&mut self, request: OpenOptions<P>) -> Result<ForwardOpenReply>
where
P: Encode;
P: Encode + Send + Sync;

/// forward close
async fn forward_close<P>(
&mut self,
request: ForwardCloseRequest<P>,
) -> Result<ForwardCloseReply>
where
P: Encode;
P: Encode + Send + Sync;
}
2 changes: 1 addition & 1 deletion src/client/ab_eip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl Driver for AbEipDriver {
type Service = EipContext<TcpStream, ClientError>;

#[inline]
fn build_service(addr: &Self::Endpoint) -> BoxFuture<Result<Self::Service>> {
fn build_service(addr: Self::Endpoint) -> BoxFuture<'static, Result<Self::Service>> {
EipDriver::build_service(addr)
}
}
Expand Down
24 changes: 12 additions & 12 deletions src/client/ab_eip/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use bytes::{BufMut, BytesMut};
use rseip_core::codec::{Encode, Encoder};

/// AB related operations
#[async_trait::async_trait(?Send)]
pub trait AbService {
#[async_trait::async_trait]
pub trait AbService: Send + Sync {
/// Read Tag Service,
/// CIP Data Table Read
///
Expand Down Expand Up @@ -42,7 +42,7 @@ pub trait AbService {
/// ```
async fn read_tag<'de, P, R>(&mut self, req: P) -> Result<R>
where
P: Into<TagRequest>,
P: Into<TagRequest> + Send + Sync,
R: Decode<'de> + 'static;

/// Write Tag Service,
Expand All @@ -68,7 +68,7 @@ pub trait AbService {
/// ```
async fn write_tag<D>(&mut self, tag: EPath, value: D) -> Result<()>
where
D: Encode;
D: Encode + Send + Sync;

/// Read Tag Fragmented Service, enables client applications to read a tag
/// with data that does not fit into a single packet (approximately 500 bytes)
Expand All @@ -79,7 +79,7 @@ pub trait AbService {

/// Write Tag Fragmented Service, enables client applications to write to a tag
/// in the controller whose data will not fit into a single packet (approximately 500 bytes)
async fn write_tag_fragmented<D: Encode>(
async fn write_tag_fragmented<D: Encode + Send + Sync>(
&mut self,
req: WriteFragmentedRequest<D>,
) -> Result<bool>;
Expand Down Expand Up @@ -109,14 +109,14 @@ pub trait AbService {

macro_rules! impl_service {
($t:ty) => {
#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl AbService for $t {
/// Read Tag Service,
/// CIP Data Table Read
#[inline]
async fn read_tag<'de, P, R>(&mut self, req: P) -> Result<R>
where
P: Into<TagRequest>,
P: Into<TagRequest> + Send + Sync,
R: Decode<'de> + 'static,
{
let res = ab_read_tag(self, req).await?;
Expand All @@ -128,7 +128,7 @@ macro_rules! impl_service {
#[inline]
async fn write_tag<D>(&mut self, tag: EPath, value: D) -> Result<()>
where
D: Encode,
D: Encode + Send + Sync,
{
ab_write_tag(self, tag, value).await?;
Ok(())
Expand All @@ -147,7 +147,7 @@ macro_rules! impl_service {
/// Write Tag Fragmented Service, enables client applications to write to a tag
/// in the controller whose data will not fit into a single packet (approximately 500 bytes)
#[inline]
async fn write_tag_fragmented<D: Encode>(
async fn write_tag_fragmented<D: Encode + Send + Sync>(
&mut self,
req: WriteFragmentedRequest<D>,
) -> Result<bool> {
Expand Down Expand Up @@ -197,7 +197,7 @@ impl_service!(MaybeConnected<AbEipDriver>);
async fn ab_read_tag<'de, C, P, R>(client: &mut C, req: P) -> Result<R>
where
C: MessageService<Error = ClientError>,
P: Into<TagRequest>,
P: Into<TagRequest> + Send + Sync,
R: Decode<'de> + 'static,
{
let req: TagRequest = req.into();
Expand All @@ -212,7 +212,7 @@ where
async fn ab_write_tag<C, D>(client: &mut C, tag: EPath, value: D) -> Result<()>
where
C: MessageService<Error = ClientError>,
D: Encode,
D: Encode + Send + Sync,
{
let mr = MessageRequest::new(SERVICE_WRITE_TAG, tag, value);
let resp: MessageReply<()> = client.send(mr).await?;
Expand Down Expand Up @@ -247,7 +247,7 @@ async fn ab_write_tag_fragmented<C, D>(
) -> Result<bool>
where
C: MessageService<Error = ClientError>,
D: Encode,
D: Encode + Send + Sync,
{
debug_assert!(req.count >= 1);

Expand Down
4 changes: 2 additions & 2 deletions src/client/ab_eip/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use rseip_core::{
use smallvec::SmallVec;
use std::collections::HashMap;

#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
pub trait AbTemplateService {
/// fetch template instance for specified instance id
async fn find_template(&mut self, instance_id: u16) -> Result<Template, ClientError>;
Expand All @@ -40,7 +40,7 @@ pub trait AbTemplateService {
Self: Sized;
}

#[async_trait::async_trait(?Send)]
#[async_trait::async_trait]
impl<T: MessageService<Error = ClientError>> AbTemplateService for T {
/// fetch template instance for specified instance id
async fn find_template(&mut self, instance_id: u16) -> Result<Template, ClientError> {
Expand Down
Loading

0 comments on commit e867870

Please sign in to comment.