Skip to content

Commit

Permalink
NewTcpArchitecture
Browse files Browse the repository at this point in the history
  • Loading branch information
amigin committed Jan 16, 2024
1 parent 3e818d6 commit e100960
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 71 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ edition = "2021"

[dependencies]

my-service-bus = { tag = "1.0.4", git = "https://github.com/MyJetTools/my-service-bus-sdk.git", features = [
my-service-bus = { branch = "main", git = "https://github.com/MyJetTools/my-service-bus-sdk.git", features = [
"tcp_contracts",
"shared",

Expand All @@ -20,7 +20,7 @@ my-http-server = { tag = "0.7.0", git = "https://github.com/MyJetTools/my-http-s
] }


my-tcp-sockets = { tag = "0.1.9", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
my-tcp-sockets = { branch = "main", git = "https://github.com/MyJetTools/my-tcp-sockets.git" }
rust-extensions = { tag = "0.1.4", git = "https://github.com/MyJetTools/rust-extensions.git", features = [
"with-tokio",
"base64",
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ async fn main() {

tcp_server
.start(
Arc::new(my_service_bus::tcp_contracts::SbTcpSerializerMetadataFactory),
Arc::new(TcpServerEvents::new(app.clone())),
app.states.clone(),
my_logger::LOGGER.clone(),
Expand Down
6 changes: 3 additions & 3 deletions src/operations/delivery/delivery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod tests {
subscriber::TopicQueueType,
};
use my_service_bus::shared::protobuf_models::MessageProtobufModel;
use my_service_bus::tcp_contracts::{MySbSerializerMetadata, TcpContract};
use my_service_bus::tcp_contracts::{MySbSerializerMetadata, MySbTcpContract};
use rust_extensions::date_time::DateTimeAsMicroseconds;

use crate::{
Expand Down Expand Up @@ -239,7 +239,7 @@ mod tests {

let packet = result_packets.remove(0);

if let TcpContract::Raw(_) = packet {
if let MySbTcpContract::Raw(_) = packet {
} else {
panic!("Should not be here")
}
Expand Down Expand Up @@ -329,7 +329,7 @@ mod tests {
my_service_bus::tcp_contracts::tcp_serializers::convert_from_raw(packet, &meta_data)
.await;

if let TcpContract::NewMessages {
if let MySbTcpContract::NewMessages {
topic_id,
queue_id,
confirmation_id,
Expand Down
4 changes: 2 additions & 2 deletions src/operations/delivery/subscriber_package_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use my_service_bus::abstractions::SbMessageHeaders;
use my_service_bus::abstractions::{queue_with_intervals::QueueWithIntervals, MyServiceBusMessage};

use my_service_bus::tcp_contracts::{
delivery_package_builder::DeliverTcpPacketBuilder, TcpContract,
delivery_package_builder::DeliverTcpPacketBuilder, MySbTcpContract,
};
use rust_extensions::ShortString;

Expand Down Expand Up @@ -39,7 +39,7 @@ impl<'s> MyServiceBusMessage for PacketToSendWrapper<'s> {
pub enum SendNewMessagesResult {
Send {
session: Arc<MyServiceBusSession>,
tcp_contract: TcpContract,
tcp_contract: MySbTcpContract,
queue_id: ShortString,
messages_on_delivery: QueueWithIntervals,
},
Expand Down
2 changes: 1 addition & 1 deletion src/operations/send_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use super::delivery::{SendNewMessagesResult, SubscriberPackageBuilder};

pub fn send_package(
session: Arc<MyServiceBusSession>,
tcp_packet: my_service_bus::tcp_contracts::TcpContract,
tcp_packet: my_service_bus::tcp_contracts::MySbTcpContract,
) {
let _handle = tokio::spawn(async move {
match &session.connection {
Expand Down
6 changes: 3 additions & 3 deletions src/sessions/tcp_connection_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use std::sync::{
};

use my_service_bus::tcp_contracts::{
MySbSerializerMetadata, MySbTcpSerializer, PacketProtVer, TcpContract,
MySbSerializerMetadata, MySbTcpContract, MySbTcpSerializer, PacketProtVer,
};
use my_tcp_sockets::tcp_connection::TcpSocketConnection;

use crate::sessions::ConnectionMetricsSnapshot;

pub struct TcpConnectionData {
pub connection:
Arc<TcpSocketConnection<TcpContract, MySbTcpSerializer, MySbSerializerMetadata>>,
Arc<TcpSocketConnection<MySbTcpContract, MySbTcpSerializer, MySbSerializerMetadata>>,
protocol_version: i32,
delivery_packet_version: AtomicU8,
pub name: String,
Expand All @@ -23,7 +23,7 @@ pub struct TcpConnectionData {
impl TcpConnectionData {
pub fn new(
connection: Arc<
TcpSocketConnection<TcpContract, MySbTcpSerializer, MySbSerializerMetadata>,
TcpSocketConnection<MySbTcpContract, MySbTcpSerializer, MySbSerializerMetadata>,
>,
name: String,
version: Option<String>,
Expand Down
8 changes: 4 additions & 4 deletions src/sessions/test_connection_data.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use my_service_bus::tcp_contracts::TcpContract;
use my_service_bus::tcp_contracts::MySbTcpContract;
use tokio::sync::Mutex;

use super::SessionId;
Expand All @@ -7,7 +7,7 @@ pub struct TestConnectionData {
pub id: SessionId,
pub ip: String,
pub connected: std::sync::atomic::AtomicBool,
pub sent_packets: Mutex<Vec<TcpContract>>,
pub sent_packets: Mutex<Vec<MySbTcpContract>>,
pub name: String,
pub version: Option<String>,
}
Expand All @@ -24,12 +24,12 @@ impl TestConnectionData {
}
}

pub async fn send_packet(&self, tcp_contract: TcpContract) {
pub async fn send_packet(&self, tcp_contract: MySbTcpContract) {
let mut write_access = self.sent_packets.lock().await;
write_access.push(tcp_contract);
}

pub async fn get_list_of_packets_and_clear_them(&self) -> Vec<TcpContract> {
pub async fn get_list_of_packets_and_clear_them(&self) -> Vec<MySbTcpContract> {
let mut write_access = self.sent_packets.lock().await;
let mut result = Vec::new();
std::mem::swap(&mut *write_access, &mut result);
Expand Down
48 changes: 25 additions & 23 deletions src/tcp/incoming_packets.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::sync::Arc;

use my_service_bus::abstractions::queue_with_intervals::QueueWithIntervals;
use my_service_bus::tcp_contracts::{MySbSerializerMetadata, MySbTcpSerializer, TcpContract};
use my_service_bus::tcp_contracts::{MySbSerializerMetadata, MySbTcpContract, MySbTcpSerializer};
use my_tcp_sockets::tcp_connection::TcpSocketConnection;

use crate::sessions::TcpConnectionData;
Expand All @@ -11,16 +11,18 @@ use super::error::MySbSocketError;

pub async fn handle(
app: &Arc<AppContext>,
tcp_contract: TcpContract,
connection: Arc<TcpSocketConnection<TcpContract, MySbTcpSerializer, MySbSerializerMetadata>>,
tcp_contract: MySbTcpContract,
connection: &Arc<
TcpSocketConnection<MySbTcpContract, MySbTcpSerializer, MySbSerializerMetadata>,
>,
) -> Result<(), MySbSocketError> {
match tcp_contract {
TcpContract::Ping {} => {
connection.send(&TcpContract::Pong).await;
MySbTcpContract::Ping {} => {
connection.send(&MySbTcpContract::Pong).await;
Ok(())
}
TcpContract::Pong {} => Ok(()),
TcpContract::Greeting {
MySbTcpContract::Pong {} => Ok(()),
MySbTcpContract::Greeting {
name,
protocol_version,
} => {
Expand All @@ -43,7 +45,7 @@ pub async fn handle(

app.sessions
.add_tcp(TcpConnectionData::new(
connection,
connection.clone(),
connection_name.unwrap(),
version,
protocol_version,
Expand All @@ -52,7 +54,7 @@ pub async fn handle(

Ok(())
}
TcpContract::Publish {
MySbTcpContract::Publish {
topic_id,
request_id,
persist_immediately,
Expand All @@ -74,25 +76,25 @@ pub async fn handle(

if let Err(err) = result {
connection
.send(&TcpContract::Reject {
.send(&MySbTcpContract::Reject {
message: format!("{:?}", err),
})
.await;
} else {
connection
.send(&TcpContract::PublishResponse { request_id })
.send(&MySbTcpContract::PublishResponse { request_id })
.await;
}
}

Ok(())
}

TcpContract::PublishResponse { request_id: _ } => {
MySbTcpContract::PublishResponse { request_id: _ } => {
//This is a client packet
Ok(())
}
TcpContract::Subscribe {
MySbTcpContract::Subscribe {
topic_id,
queue_id,
queue_type,
Expand All @@ -106,18 +108,18 @@ pub async fn handle(

Ok(())
}
TcpContract::SubscribeResponse {
MySbTcpContract::SubscribeResponse {
topic_id: _,
queue_id: _,
} => {
//This is a client packet
Ok(())
}
TcpContract::Raw(_) => {
MySbTcpContract::Raw(_) => {
//This is a client packet
Ok(())
}
TcpContract::NewMessagesConfirmation {
MySbTcpContract::NewMessagesConfirmation {
topic_id,
queue_id,
confirmation_id,
Expand All @@ -132,7 +134,7 @@ pub async fn handle(

Ok(())
}
TcpContract::CreateTopicIfNotExists { topic_id } => {
MySbTcpContract::CreateTopicIfNotExists { topic_id } => {
if let Some(session) = app.sessions.get_by_tcp_connection_id(connection.id).await {
operations::publisher::create_topic_if_not_exists(
app,
Expand All @@ -144,7 +146,7 @@ pub async fn handle(

Ok(())
}
TcpContract::IntermediaryConfirm {
MySbTcpContract::IntermediaryConfirm {
packet_version: _,
topic_id,
queue_id,
Expand All @@ -162,7 +164,7 @@ pub async fn handle(

Ok(())
}
TcpContract::PacketVersions { packet_versions } => {
MySbTcpContract::PacketVersions { packet_versions } => {
if let Some(version) =
packet_versions.get(&my_service_bus::tcp_contracts::tcp_message_id::NEW_MESSAGES)
{
Expand All @@ -173,11 +175,11 @@ pub async fn handle(

Ok(())
}
TcpContract::Reject { message: _ } => {
MySbTcpContract::Reject { message: _ } => {
//This is a client packet
Ok(())
}
TcpContract::AllMessagesConfirmedAsFail {
MySbTcpContract::AllMessagesConfirmedAsFail {
topic_id,
queue_id,
confirmation_id,
Expand All @@ -192,7 +194,7 @@ pub async fn handle(
Ok(())
}

TcpContract::ConfirmSomeMessagesAsOk {
MySbTcpContract::ConfirmSomeMessagesAsOk {
packet_version: _,
topic_id,
queue_id,
Expand All @@ -210,7 +212,7 @@ pub async fn handle(

Ok(())
}
TcpContract::NewMessages {
MySbTcpContract::NewMessages {
topic_id: _,
queue_id: _,
confirmation_id: _,
Expand Down
57 changes: 24 additions & 33 deletions src/tcp/socket_events.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use async_trait::async_trait;
use my_logger::LogEventCtx;
use my_tcp_sockets::{ConnectionEvent, SocketEventCallback};
use my_tcp_sockets::SocketEventCallback;
use std::sync::Arc;

use my_service_bus::tcp_contracts::{MySbSerializerMetadata, MySbTcpSerializer, TcpContract};
use my_service_bus::tcp_contracts::{
MySbSerializerMetadata, MySbTcpConnection, MySbTcpContract, MySbTcpSerializer,
};

use crate::app::AppContext;

Expand All @@ -18,39 +20,28 @@ impl TcpServerEvents {
}

#[async_trait]
impl SocketEventCallback<TcpContract, MySbTcpSerializer, MySbSerializerMetadata>
impl SocketEventCallback<MySbTcpContract, MySbTcpSerializer, MySbSerializerMetadata>
for TcpServerEvents
{
async fn handle(
&self,
connection_event: ConnectionEvent<TcpContract, MySbTcpSerializer, MySbSerializerMetadata>,
) {
match connection_event {
ConnectionEvent::Connected(_) => {
self.app.prometheus.mark_new_tcp_connection();
}
ConnectionEvent::Disconnected(connection) => {
self.app.prometheus.mark_new_tcp_disconnection();
if let Some(session) = self.app.sessions.remove_tcp(connection.id).await {
crate::operations::sessions::disconnect(self.app.as_ref(), session.as_ref())
.await;
}
}
ConnectionEvent::Payload {
connection,
payload,
} => {
let connection_id = connection.id;
if let Err(err) =
super::incoming_packets::handle(&self.app, payload, connection).await
{
my_logger::LOGGER.write_error(
"Handle Tcp Payload".to_string(),
format!("{:?}", err),
LogEventCtx::new().add("connectionId", connection_id.to_string()),
);
}
}
async fn connected(&self, _connection: Arc<MySbTcpConnection>) {
self.app.prometheus.mark_new_tcp_connection();
}

async fn disconnected(&self, connection: Arc<MySbTcpConnection>) {
self.app.prometheus.mark_new_tcp_disconnection();
if let Some(session) = self.app.sessions.remove_tcp(connection.id).await {
crate::operations::sessions::disconnect(self.app.as_ref(), session.as_ref()).await;
}
}

async fn payload(&self, connection: &Arc<MySbTcpConnection>, contract: MySbTcpContract) {
let connection_id = connection.id;
if let Err(err) = super::incoming_packets::handle(&self.app, contract, connection).await {
my_logger::LOGGER.write_error(
"Handle Tcp Payload".to_string(),
format!("{:?}", err),
LogEventCtx::new().add("connectionId", connection_id.to_string()),
);
}
}
}

0 comments on commit e100960

Please sign in to comment.