Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

In host-server mode, send messages from server to the local client #542

Merged
merged 2 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lightyear/src/channel/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::channel::stats::send::ChannelSendStats;
use crate::prelude::ChannelKind;

/// A ChannelContainer is a struct that implements the [`Channel`] trait
#[derive(Debug)]
pub struct ChannelContainer {
pub setting: ChannelSettings,
pub(crate) receiver: ChannelReceiver,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/fragment_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::prelude::Tick;
use crate::shared::time_manager::WrappedTime;

/// `FragmentReceiver` is used to reconstruct fragmented messages
#[derive(Debug)]
pub struct FragmentReceiver {
fragment_messages: HashMap<MessageId, FragmentConstructor>,
}
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub trait ChannelReceive {
}

/// This enum contains the various types of receivers available
#[derive(Debug)]
#[enum_dispatch(ChannelReceive)]
pub enum ChannelReceiver {
UnorderedUnreliable(unordered_unreliable::UnorderedUnreliableReceiver),
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/ordered_reliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use crate::shared::time_manager::TimeManager;

/// Ordered Reliable receiver: make sure that all messages are received,
/// and return them in order
#[derive(Debug)]
pub struct OrderedReliableReceiver {
/// Next message id that we are waiting to receive
/// The channel is reliable so we should see all message ids sequentially.
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/sequenced_reliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::shared::time_manager::TimeManager;

/// Sequenced Reliable receiver: make sure that all messages are received,
/// do not return them in order, but ignore the messages that are older than the most recent one received
#[derive(Debug)]
pub struct SequencedReliableReceiver {
// TODO: optimize via ring buffer?
// TODO: actually do we even need a buffer? we might just need a buffer of 1
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/sequenced_unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const DISCARD_AFTER: chrono::Duration = chrono::Duration::milliseconds(3000);

/// Sequenced Unreliable receiver:
/// do not return messages in order, but ignore the messages that are older than the most recent one received
#[derive(Debug)]
pub struct SequencedUnreliableReceiver {
/// Buffer of the messages that we received, but haven't processed yet
recv_message_buffer: VecDeque<(Tick, Bytes)>,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/unordered_reliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::shared::time_manager::TimeManager;

/// Unordered Reliable receiver: make sure that all messages are received,
/// and return them in any order
#[derive(Debug)]
pub struct UnorderedReliableReceiver {
/// Next message id that we are waiting to receive
/// The channel is reliable so we should see all message ids.
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/receivers/unordered_unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::shared::time_manager::{TimeManager, WrappedTime};

const DISCARD_AFTER: chrono::Duration = chrono::Duration::milliseconds(3000);

#[derive(Debug)]
pub struct UnorderedUnreliableReceiver {
recv_message_buffer: VecDeque<(Tick, Bytes)>,
fragment_receiver: FragmentReceiver,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/senders/fragment_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::serialize::SerializationError;
use crate::shared::tick_manager::Tick;

/// `FragmentReceiver` is used to reconstruct fragmented messages
#[derive(Debug)]
pub(crate) struct FragmentSender {
pub(crate) fragment_size: usize,
}
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/senders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub trait ChannelSend {
}

/// Enum dispatch lets us derive ChannelSend on each enum variant
#[derive(Debug)]
#[enum_dispatch(ChannelSend)]
pub enum ChannelSender {
UnorderedUnreliableWithAcks(unordered_unreliable_with_acks::UnorderedUnreliableWithAcksSender),
Expand Down
4 changes: 4 additions & 0 deletions lightyear/src/channel/senders/reliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use crate::shared::ping::manager::PingManager;
use crate::shared::tick_manager::TickManager;
use crate::shared::time_manager::{TimeManager, WrappedTime};

#[derive(Debug)]
pub struct FragmentAck {
data: FragmentData,
acked: bool,
last_sent: Option<WrappedTime>,
}

/// A message that has not been acked yet
#[derive(Debug)]
pub enum UnackedMessage {
Single {
bytes: Bytes,
Expand All @@ -33,13 +35,15 @@ pub enum UnackedMessage {
Fragmented(Vec<FragmentAck>),
}

#[derive(Debug)]
pub struct UnackedMessageWithPriority {
pub unacked_message: UnackedMessage,
pub base_priority: f32,
pub accumulated_priority: f32,
}

/// A sender that makes sure to resend messages until it receives an ack
#[derive(Debug)]
pub struct ReliableSender {
/// Settings for reliability
reliable_settings: ReliableSettings,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/senders/sequenced_unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::shared::time_manager::TimeManager;

/// A sender that simply sends the messages without checking if they were received
/// Same as UnorderedUnreliableSender, but includes ordering information (MessageId)
#[derive(Debug)]
pub struct SequencedUnreliableSender {
/// list of single messages that we want to fit into packets and send
single_messages_to_send: VecDeque<SendMessage>,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/channel/senders/unordered_unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::shared::time_manager::TimeManager;

/// A sender that simply sends the messages without checking if they were received
/// Does not include any ordering information
#[derive(Debug)]
pub struct UnorderedUnreliableSender {
/// list of single messages that we want to fit into packets and send
single_messages_to_send: VecDeque<SendMessage>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const DISCARD_AFTER: chrono::Duration = chrono::Duration::milliseconds(3000);
/// A sender that simply sends the messages without applying any reliability or unordered
/// Same as UnorderedUnreliableSender, but includes a message id to each message,
/// Which can let us track if a message was acked
#[derive(Debug)]
pub struct UnorderedUnreliableWithAcksSender {
/// list of single messages that we want to fit into packets and send
single_messages_to_send: VecDeque<SendMessage>,
Expand Down
32 changes: 29 additions & 3 deletions lightyear/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use super::sync::SyncManager;
/// connection.send_message_to_target::<MyChannel, MyMessage>("Hello, server!", NetworkTarget::Single(2));
/// }
/// ```
#[derive(Resource)]
#[derive(Resource, Debug)]
pub struct ConnectionManager {
pub(crate) component_registry: ComponentRegistry,
pub(crate) message_registry: MessageRegistry,
Expand Down Expand Up @@ -372,7 +372,6 @@ impl ConnectionManager {
tick_manager: &TickManager,
) -> Result<(), ClientError> {
let _span = trace_span!("receive").entered();
let message_registry = world.resource::<MessageRegistry>();
self.message_manager
.channels
.iter_mut()
Expand Down Expand Up @@ -420,10 +419,11 @@ impl ConnectionManager {
let updates = EntityUpdatesMessage::from_bytes(&mut reader)?;
self.replication_receiver.recv_updates(updates, tick);
} else {
// TODO: this code is copy-pasted from self.receive_message because of borrow checker limitations
// identify the type of message
let net_id = NetId::from_bytes(&mut reader)?;
let single_data = reader.consume();
match message_registry.message_type(net_id) {
match self.message_registry.message_type(net_id) {
#[cfg(feature = "leafwing")]
MessageType::LeafwingInput => {
self.received_leafwing_input_messages
Expand Down Expand Up @@ -461,6 +461,32 @@ impl ConnectionManager {
Ok(())
}

/// Receive a message from the server
pub(crate) fn receive_message(&mut self, mut reader: Reader) -> Result<(), SerializationError> {
// identify the type of message
let net_id = NetId::from_bytes(&mut reader)?;
let single_data = reader.consume();
match self.message_registry.message_type(net_id) {
#[cfg(feature = "leafwing")]
MessageType::LeafwingInput => {
self.received_leafwing_input_messages
.entry(net_id)
.or_default()
.push(single_data);
}
MessageType::NativeInput => {
todo!()
}
MessageType::Normal => {
self.received_messages
.entry(net_id)
.or_default()
.push(single_data);
}
}
Ok(())
}

pub(crate) fn recv_packet(
&mut self,
packet: RecvPayload,
Expand Down
8 changes: 6 additions & 2 deletions lightyear/src/client/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,15 @@ impl Plugin for ClientNetworkingPlugin {
.configure_sets(
PreUpdate,
(
InternalMainSet::<ClientMarker>::Receive.in_set(MainSet::Receive),
InternalMainSet::<ClientMarker>::Receive
.in_set(MainSet::Receive)
// do not receive packets when running in host-server mode
.run_if(not(is_host_server)),
// we still want to emit events when running in host-server mode
InternalMainSet::<ClientMarker>::EmitEvents.in_set(MainSet::EmitEvents),
)
.chain()
.run_if(not(is_host_server.or_else(is_disconnected))),
.run_if(not(is_disconnected)),
)
.configure_sets(
PostUpdate,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ impl SentPacketStore {

/// In charge of syncing the client's tick/time with the server's tick/time
/// right after the connection is established
#[derive(Debug)]
pub struct SyncManager {
config: SyncConfig,
prediction_config: PredictionConfig,
Expand Down
3 changes: 2 additions & 1 deletion lightyear/src/packet/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ const MAX_NACK_SECONDS: i64 = 3;

/// Keeps track of sent and received packets to be able to write the packet headers correctly
/// For more information: [GafferOnGames](https://gafferongames.com/post/reliability_ordering_and_congestion_avoidance_over_udp/)
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PacketHeaderManager {
// Local packet id which we'll bump each time we send a new packet over the network.
// (we always increment the packet_id, even when we resend a lost packet)
Expand Down Expand Up @@ -262,6 +262,7 @@ impl PacketHeaderManager {
}

/// Data structure to keep track of the ids of the received packets
#[derive(Debug)]
pub struct ReceiveBuffer {
/// The packet id of the most recent packet received
last_recv_packet_id: Option<PacketId>,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/packet/message_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub const DEFAULT_MESSAGE_PRIORITY: f32 = 1.0;

/// Wrapper to: send/receive messages via channels to a remote address
/// By splitting the data into packets and sending them through a given transport
#[derive(Debug)]
pub struct MessageManager {
/// Handles sending/receiving packets (including acks)
packet_manager: PacketBuilder,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/packet/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub type RecvPayload = Bytes;

/// `PacketBuilder` handles the process of creating a packet (writing the header and packing the
/// messages into packets)
#[derive(Debug)]
pub(crate) struct PacketBuilder {
pub(crate) header_manager: PacketHeaderManager,
current_packet: Option<Packet>,
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/packet/priority_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl From<crate::server::config::PacketConfig> for PriorityConfig {
}
}

#[derive(Debug)]
pub(crate) struct PriorityManager {
pub(crate) config: PriorityConfig,
// TODO: can I do without this limiter?
Expand Down
3 changes: 2 additions & 1 deletion lightyear/src/packet/stats_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ pub(crate) mod packet {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
struct FinalStats {
packet_loss: f32,
}

#[derive(Debug)]
pub(crate) struct PacketStatsManager {
stats_buffer: PacketStatsBuffer,
/// sum of the stats over the stats_buffer
Expand Down
1 change: 1 addition & 0 deletions lightyear/src/serialize/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use bytes::{BufMut, Bytes, BytesMut};
use std::io::Write;

#[derive(Debug)]
pub struct Writer(bytes::buf::Writer<BytesMut>);

impl Write for Writer {
Expand Down
16 changes: 14 additions & 2 deletions lightyear/src/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl ConnectionManager {
client_id: ClientId,
message: &M,
) -> Result<(), ServerError> {
self.send_message_to_target::<C, M>(message, NetworkTarget::Only(vec![client_id]))
self.send_message_to_target::<C, M>(message, NetworkTarget::Single(client_id))
}

/// Update the priority of a `ReplicationGroup` that is replicated to a given client
Expand Down Expand Up @@ -294,7 +294,16 @@ impl ConnectionManager {
.iter_mut()
.filter(|(id, _)| target.targets(id))
// NOTE: this clone is O(1), it just increments the reference count
.try_for_each(|(_, c)| c.buffer_message(message.clone(), channel))
.try_for_each(|(_, c)| {
// for local clients, we don't want to buffer messages in the MessageManager since
// there is no io
if c.is_local_client() {
c.local_messages_to_send.push(message.clone())
} else {
c.buffer_message(message.clone(), channel)?;
}
Ok::<(), ServerError>(())
})
}

pub(crate) fn erased_send_message_to_target<M: Message>(
Expand Down Expand Up @@ -410,6 +419,8 @@ pub struct Connection {
pub(crate) messages_to_rebroadcast: Vec<(Bytes, NetworkTarget, ChannelKind)>,
/// True if this connection corresponds to a local client when running in host-server mode
is_local_client: bool,
/// Messages to send to the local client (we don't buffer them in the MessageManager because there is no io)
pub(crate) local_messages_to_send: Vec<Bytes>,
}

impl Connection {
Expand Down Expand Up @@ -462,6 +473,7 @@ impl Connection {
writer: Writer::with_capacity(MAX_PACKET_SIZE),
messages_to_rebroadcast: vec![],
is_local_client: false,
local_messages_to_send: vec![],
}
}

Expand Down
Loading
Loading