Skip to content

Commit

Permalink
fix server events
Browse files Browse the repository at this point in the history
  • Loading branch information
cbournhonesque-sc committed Dec 22, 2023
1 parent 7323c2d commit f71b6b3
Show file tree
Hide file tree
Showing 20 changed files with 350 additions and 222 deletions.
33 changes: 31 additions & 2 deletions NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,41 @@
- maybe we should separate Connection and MessageManager into a Send and Receive part?
- server recv and client send need MessageWithMetadata
- client recv and server send need Message

- DEBUGGING SIMPLE_BOX:
- on the client prepare spawn, another component gets added, similar to ShouldBePredicted. Or just replicate Replicate entirely, once...
-
- Server authority:
- OPTION 1
- client spawns entity, server receives it and stores the mapping client_id=remote_id, server_id=local_id.
- then server starts replicating back to the client, with a server_id
- upon reception, the client needs to know that the server_id will map to its own local client_id.
- i.e. the server's message will contain the client id
- if the server message contains client-id, instead of spawn we update the local mapping on client
- OPTION 2:
- client spawns a predicted entity (Predicted) but Confirmed is nil. id=PC1
- server receives it and spawns an entity,
- If jitter is too big, or there is packet loss? it looks like inputs keep getting sent to client 1.
- the cube goes all the way to the left and exits the screen. There is continuous rollback fails
- on interest management, we still have this problem where interpolation is stuck at the beginning and doesn't move. Probably
because start tick or end tick are not updated correctly in some edge cases.
- A: client-authoritative replication:
Client 1 spawns an entity C1, which gets replicated on the client as S1 (and can then be further replicated to other clients).
S1 doesn't get replicated back to client 1, but only to other clients. For example, we want to replicate client 1's cursor
position to other clients.


- B: client spawning a Predicted entity.
For example client 1 spawns a predicted entity P1 (a shared UI menu). Server receives that notification; spawns an
entity S1 that gets replicated to client 1. Client 1 spawns a confirmed entity C1. But instead of spawning a new
corresponding predicted entity, it re-uses the existing P1. From there on prediction with rollback can proceed as usual.

- C: client spawning a Confirmed entity.
Client 1 spawns a confirmed entity C1. It gets replicated to server, which spawns S1. Then that entity can get
replicated to other clients AND to client 1. When client 1 receives the replication of S1, it knows that it corresponds
to its confirmed entity C1. From there on it's normal replication.

I'm not actually sure that this is useful; because the entity first spawns instantly on client (i.e. is on the client timeline),
but then if we fallback to normal replication, the entity then moves to the server's timeline, so there would be a jarring update.



- add PredictionGroup and InterpolationGroup?
Expand Down
19 changes: 17 additions & 2 deletions examples/client_replication/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ impl Plugin for MyClientPlugin {
Update,
(
cursor_movement,
receive_message1,
receive_message,
send_message,
handle_predicted_spawn,
handle_interpolated_spawn,
),
Expand Down Expand Up @@ -177,12 +178,26 @@ fn window_relative_mouse_position(window: &Window) -> Option<Vec2> {
}

// System to receive messages on the client
pub(crate) fn receive_message1(mut reader: EventReader<MessageEvent<Message1>>) {
pub(crate) fn receive_message(mut reader: EventReader<MessageEvent<Message1>>) {
for event in reader.read() {
info!("Received message: {:?}", event.message());
}
}

/// Send messages from server to clients
pub(crate) fn send_message(mut client: ResMut<Client<MyProtocol>>, input: Res<Input<KeyCode>>) {
if input.pressed(KeyCode::M) {
let message = Message1(5);
info!("Send message: {:?}", message);
// the message will be re-broadcasted by the server to all clients
client
.send_message_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
}
}

// When the predicted copy of the client-owned entity is spawned, do stuff
// - assign it a different saturation
// - keep track of it in the Global resource
Expand Down
27 changes: 25 additions & 2 deletions examples/client_replication/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ impl Plugin for MyServerPlugin {
app.add_systems(Startup, init);
// the physics/FixedUpdates systems that consume inputs should be run in this set
app.add_systems(FixedUpdate, movement.in_set(FixedUpdateSet::Main));
app.add_systems(Update, (handle_connections, send_message));
app.add_systems(
Update,
(handle_connections, replicate_cursors, send_message),
);
}
}

Expand Down Expand Up @@ -121,14 +124,34 @@ pub(crate) fn movement(
}
}

pub(crate) fn replicate_cursors(
mut commands: Commands,
mut cursor_spawn_reader: EventReader<ComponentInsertEvent<CursorPosition>>,
) {
for event in cursor_spawn_reader.read() {
info!("received cursor spawn event: {:?}", event);
let client_id = event.context();
let entity = event.entity();

// for all cursors we have received, add a Replicate component so that we can start replicating it
// to other clients
if let Some(mut e) = commands.get_entity(*entity) {
e.insert(Replicate {
replication_target: NetworkTarget::AllExcept(vec![*client_id]),
..default()
});
}
}
}

/// Send messages from server to clients
pub(crate) fn send_message(mut server: ResMut<Server<MyProtocol>>, input: Res<Input<KeyCode>>) {
if input.pressed(KeyCode::M) {
// TODO: add way to send message to all
let message = Message1(5);
info!("Send message: {:?}", message);
server
.send_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.send_message_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
Expand Down
2 changes: 1 addition & 1 deletion examples/client_replication/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Plugin for SharedPlugin {

// Generate pseudo-random color from id
pub(crate) fn color_from_id(client_id: ClientId) -> Color {
let h = (((client_id * 12345) % 360) as f32) / 360.0;
let h = (((client_id * 45) % 360) as f32) / 360.0;
let s = 0.8;
let l = 0.5;
Color::hsl(h, s, l)
Expand Down
2 changes: 1 addition & 1 deletion examples/interest_management/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub(crate) fn send_message(mut server: ResMut<Server<MyProtocol>>, input: Res<In
let message = Message1(5);
info!("Send message: {:?}", message);
server
.send_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.send_message_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
Expand Down
2 changes: 1 addition & 1 deletion examples/simple_box/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub(crate) fn send_message(mut server: ResMut<Server<MyProtocol>>, input: Res<In
let message = Message1(5);
info!("Send message: {:?}", message);
server
.send_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.send_message_to_target::<Channel1, Message1>(Message1(5), NetworkTarget::All)
.unwrap_or_else(|e| {
error!("Failed to send message: {:?}", e);
});
Expand Down
6 changes: 3 additions & 3 deletions lightyear/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,11 @@ impl<P: Protocol> Connection<P> {
// self.sync_manager.update(time_manager);
}

pub fn buffer_message(
pub(crate) fn buffer_message(
&mut self,
message: P::Message,
channel: ChannelKind,
// target: NetworkTarget,
target: NetworkTarget,
) -> Result<()> {
// TODO: i know channel names never change so i should be able to get them as static
// TODO: just have a channel registry enum as well?
Expand All @@ -103,7 +103,7 @@ impl<P: Protocol> Connection<P> {
.name(&channel)
.unwrap_or("unknown")
.to_string();
let message = ClientMessage::<P>::Message(message, NetworkTarget::None);
let message = ClientMessage::<P>::Message(message, target);
message.emit_send_logs(&channel_name);
self.message_manager.buffer_send(message, channel)?;
Ok(())
Expand Down
8 changes: 0 additions & 8 deletions lightyear/src/client/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,6 @@
use crate::connection::events::ConnectionEvents;
use crate::protocol::Protocol;

pub struct ClientEvents<P: Protocol> {
// cannot include connection/disconnection directly into ConnectionEvents, because we remove
// the connection event upon disconnection
connection: bool,
disconnection: bool,
events: ConnectionEvents<P>,
}

pub type ConnectEvent = crate::shared::events::ConnectEvent<()>;
pub type DisconnectEvent = crate::shared::events::DisconnectEvent<()>;
pub type InputEvent<I> = crate::shared::events::InputEvent<I, ()>;
Expand Down
2 changes: 1 addition & 1 deletion lightyear/src/client/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ fn prepare_input_message<P: Protocol>(mut client: ResMut<Client<P>>) {
// TODO: should we provide variants of each user-facing function, so that it pushes the error
// to the ConnectionEvents?
client
.buffer_send::<InputChannel, _>(message)
.send_message::<InputChannel, _>(message)
.unwrap_or_else(|err| {
error!("Error while sending input message: {:?}", err);
})
Expand Down
24 changes: 24 additions & 0 deletions lightyear/src/client/plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::client::prediction::plugin::{is_in_rollback, PredictionPlugin};
use crate::client::prediction::Rollback;
use crate::client::resource::{Authentication, Client};
use crate::client::systems::{is_ready_to_send, receive, send, sync_update};
use crate::prelude::ReplicationSet;
use crate::protocol::component::ComponentProtocol;
use crate::protocol::message::MessageProtocol;
use crate::protocol::Protocol;
Expand Down Expand Up @@ -102,6 +103,29 @@ impl<P: Protocol> PluginType for ClientPlugin<P> {
)
.chain(),
)
// NOTE: it's ok to run the replication systems less frequently than every frame
// because bevy's change detection detects changes since the last time the system ran (not since the last frame)
.configure_sets(
PostUpdate,
(
(
ReplicationSet::SendEntityUpdates,
ReplicationSet::SendComponentUpdates,
ReplicationSet::ReplicationSystems,
)
.in_set(ReplicationSet::All),
(
ReplicationSet::SendEntityUpdates,
ReplicationSet::SendComponentUpdates,
MainSet::SendPackets,
)
.chain()
.in_set(MainSet::Send),
// ReplicationSystems runs once per frame, so we cannot put it in the `Send` set
// which runs every send_interval
(ReplicationSet::ReplicationSystems, MainSet::SendPackets).chain(),
),
)
.configure_sets(
PostUpdate,
(MainSet::Send.run_if(is_ready_to_send::<P>), MainSet::Sync),
Expand Down
19 changes: 17 additions & 2 deletions lightyear/src/client/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,28 @@ impl<P: Protocol> Client<P> {
}
}

/// Send a message to the server, the message should be re-broadcasted according to the `target`
pub fn send_message_to_target<C: Channel, M: Message>(
&mut self,
message: M,
target: NetworkTarget,
) -> Result<()>
where
P::Message: From<M>,
{
let channel = ChannelKind::of::<C>();
self.connection
.buffer_message(message.into(), channel, target)
}

/// Send a message to the server
pub fn buffer_send<C: Channel, M: Message>(&mut self, message: M) -> Result<()>
pub fn send_message<C: Channel, M: Message>(&mut self, message: M) -> Result<()>
where
P::Message: From<M>,
{
let channel = ChannelKind::of::<C>();
self.connection.buffer_message(message.into(), channel)
self.connection
.buffer_message(message.into(), channel, NetworkTarget::None)
}

/// Receive messages from the server
Expand Down
46 changes: 21 additions & 25 deletions lightyear/src/connection/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,10 +286,10 @@ pub trait IterComponentUpdateEvent<P: Protocol, Ctx: EventContext = ()> {
where
C: IntoKind<P::ComponentKinds>;

/// Find all the updates of component C for a given entity
fn get_component_update<C: Component>(&self, entity: Entity) -> Option<Ctx>
where
C: IntoKind<P::ComponentKinds>;
// /// Find all the updates of component C for a given entity
// fn get_component_update<C: Component>(&self, entity: Entity) -> Option<Ctx>
// where
// C: IntoKind<P::ComponentKinds>;
}

impl<P: Protocol> IterComponentUpdateEvent<P> for ConnectionEvents<P> {
Expand Down Expand Up @@ -320,23 +320,23 @@ impl<P: Protocol> IterComponentUpdateEvent<P> for ConnectionEvents<P> {
// self.components_with_updates.contains(&C::into_kind())
}

// TODO: is it possible to receive multiple updates for the same component/entity?
// it shouldn't be possible for a Sequenced channel,
// maybe just take the first value that matches, then?
fn get_component_update<C: Component>(&self, entity: Entity) -> Option<()>
where
C: IntoKind<P::ComponentKinds>,
{
todo!()
// self.component_updates
// .get(&entity)
// .map(|updates| updates.get(&C::into_kind()).cloned())
// .flatten()
}
// // TODO: is it possible to receive multiple updates for the same component/entity?
// // it shouldn't be possible for a Sequenced channel,
// // maybe just take the first value that matches, then?
// fn get_component_update<C: Component>(&self, entity: Entity) -> Option<()>
// where
// C: IntoKind<P::ComponentKinds>,
// {
// todo!()
// // self.component_updates
// // .get(&entity)
// // .map(|updates| updates.get(&C::into_kind()).cloned())
// // .flatten()
// }
}

pub trait IterComponentRemoveEvent<P: Protocol, Ctx: EventContext = ()> {
fn into_iter_component_remove<C: Component>(
fn iter_component_remove<C: Component>(
&mut self,
) -> Box<dyn Iterator<Item = (Entity, Ctx)> + '_>
where
Expand All @@ -348,9 +348,7 @@ pub trait IterComponentRemoveEvent<P: Protocol, Ctx: EventContext = ()> {

// TODO: move these implementations to client?
impl<P: Protocol> IterComponentRemoveEvent<P> for ConnectionEvents<P> {
fn into_iter_component_remove<C: Component>(
&mut self,
) -> Box<dyn Iterator<Item = (Entity, ())> + '_>
fn iter_component_remove<C: Component>(&mut self) -> Box<dyn Iterator<Item = (Entity, ())> + '_>
where
C: IntoKind<P::ComponentKinds>,
{
Expand All @@ -371,7 +369,7 @@ impl<P: Protocol> IterComponentRemoveEvent<P> for ConnectionEvents<P> {
}

pub trait IterComponentInsertEvent<P: Protocol, Ctx: EventContext = ()> {
fn into_iter_component_insert<C: Component>(
fn iter_component_insert<C: Component>(
&mut self,
) -> Box<dyn Iterator<Item = (Entity, Ctx)> + '_>
where
Expand All @@ -382,9 +380,7 @@ pub trait IterComponentInsertEvent<P: Protocol, Ctx: EventContext = ()> {
}

impl<P: Protocol> IterComponentInsertEvent<P> for ConnectionEvents<P> {
fn into_iter_component_insert<C: Component>(
&mut self,
) -> Box<dyn Iterator<Item = (Entity, ())> + '_>
fn iter_component_insert<C: Component>(&mut self) -> Box<dyn Iterator<Item = (Entity, ())> + '_>
where
C: IntoKind<P::ComponentKinds>,
{
Expand Down
Loading

0 comments on commit f71b6b3

Please sign in to comment.