diff --git a/NOTES.md b/NOTES.md index 283055b2e..05441b2f1 100644 --- a/NOTES.md +++ b/NOTES.md @@ -10,15 +10,57 @@ - the cube goes all the way to the left and exits the screen. There is continuous rollback fails - on interest management, we still have this problem where interpolation is stuck at the beginning and doesn't move. Probably because start tick or end tick are not updated correctly in some edge cases. - - -- TODO: - - implement Message for common bevy transforms - - maybe have a ClientReplicate component to transfer all the replication data that is useful to clients? (group, prediciton, interpolation, etc.) - - +- add PredictionGroup and InterpolationGroup. + - on top of ReplicationGroup? + - or do we just re-use the replication group id (that usually will have a remote entity id) and use it to see the prediction/interpolation group? + - then we add the prediction group id on the Confirmed or Predicted components? +- Then we don't really need the Confirmed/Predicted components anymore, we could just have resources on the Prediction or Interpolation plugin +- The resource needs: + - confirmed<->predicted mapping + - for a given prediction-group, the dependency graph of the entities (using confirmed entities?) +- The prediction systems will: + - iterate through the dependency graph of the prediction group + - for each entity, fetch the confirmed/predicted entity + - do entity mapping if needed +- users can add their own entities in the prediction group (even if thre ) + + + +- DEBUGGING REPLICATION BOX: + - the sync from confirmed to predict might not only be for replicated components, but also components that were + spawned on confirmed world directly. + - which means it's not to apply topological sort during replication; we need to apply it on prediction level directly + - create a 'PredictionGroup': all predicted entities must be in the same group, and we apply topological sort on that group + - we actually could have different prediction groups, for entities that we know are not interacting at all! + - each group has a dependency graph as well + - maybe maintain a topological sort for each predicted replication group? + - what about adding new entities to the prediction group? because that's the main problem, otherwise if all the entities + are known at the beginning we are good! + - maybe don't need toplogical sort but can just use the vec from the replication to have the order + - but then how do we iterate through the entities in that order? + - the components during prediction sync need to be mapped! + - do we need to introduce the concept of a PredictionGroup, which is a super-set of a replicationGroup? (because some of the entities + might not come from replication?) + - how to get smooth interpolation when there can be diagonal movements? + - allow custom interpolation, so that we can make sure that interpolation respects corners as well. The interpolation follows the path + - WEIRD: when we just do normal interpolation for player heads, and just use 'interp=start' for tails, it actually looks really smooth! + - TODO: tried to make my custom interpolation logic working, but there still seems to be edge cases that are not handled well. + - there's weird panics now, and sometimes the interpolated entity doesn't move at all + - INTERP TIME is sometimes too late; i.e. we receive updates that are way after interp time. + - SYNC: + - seems to not work well for at the beginning.. + - PREDICTION; rollback is weird and fucked + - looks like sending pings more frequently fixed the issue?, to investigate.. + - is it that we don't receive the inputs on time at the client? + - imagine we lost some packets and server didn't receive the inputs... then when server receives a later packet it should receive the past 15 inputs. + server should then use this to rollback? + - server should ask client to speed up (via a message), when we have lost inputs (to increase the buffer size) + - it should re-use the previous input as a best guess estimate + - it looks like our input buffer on server is too big; it contains like 15 ticks worth of inputs, even though the client messages should arrive right before. + is it just because of the margin we took? + - applied a best guess estimate for lost inputs that uses the last input sent as fallback, works well! - FINAL CHOICE: - send all actions per group on an reliable unordered channel @@ -362,6 +404,10 @@ ROUGH EDGES: TODO: +- Inputs: + - instead of sending the last 15 inputs, send all inputs until the last acked input message (with a max) + - also remember to keep around inputs that we might need for prediction! + - Serialization: - have a NetworkMessage macro that all network messages must derive (Input, Message, Component) - DONE: all network messages derive Message diff --git a/book/src/concepts/advanced_replication/interpolation.md b/book/src/concepts/advanced_replication/interpolation.md index e69de29bb..37e008098 100644 --- a/book/src/concepts/advanced_replication/interpolation.md +++ b/book/src/concepts/advanced_replication/interpolation.md @@ -0,0 +1,84 @@ +# Interpolation + +## Introduction +Interpolation means that we will store replicated entities in a buffer, and then interpolate between the last two states to get a smoother movement. + +See this excellent explanation from Valve: [link](https://developer.valvesoftware.com/wiki/Source_Multiplayer_Networking) +or this one from Gabriel Gambetta: [link](https://www.gabrielgambetta.com/entity-interpolation.html) + + +## Implementation + +In lightyear, interpolation can be automatically managed for you. + +Every replicated entity can specify to which clients it should be interpolated to: +```rust,noplayground +Replicate { + interpolation_target: NetworkTarget::AllExcept(vec![id]), + ..default() +}, +``` + +This means that all clients except for the one with id `id` will interpolate this entity. +In practice, it means that they will store in a buffer the history for all components that are enabled for Interpolation. + + +## Component Sync Mode + +Not all components in the protocol are necessarily interpolated. +Each component can implement a `ComponentSyncMode` that defines how it gets handled for the `Predicted` and `Interpolated` entities. + +Only components that have `ComponentSyncMode::Full` will be interpolated. + + +## Interpolation function + +By default, the implementation function for a given component will be linear interpolation. +It is also possibly to override this behaviour by implementing a custom interpolation function. + +Here is an example: + +```rust,noplayground + #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone)] + pub struct Component1(pub f32); + #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone)] + pub struct Component2(pub f32); + + #[component_protocol(protocol = "MyProtocol")] + pub enum MyComponentProtocol { + #[sync(full)] + Component1(Component1), + #[sync(full, lerp = "MyCustomInterpFn")] + Component2(Component2), + } + + // custom interpolation logic + pub struct MyCustomInterpFn; + impl InterpFn for MyCustomInterpFn { + fn lerp(start: C, _other: C, _t: f32) -> C { + start + } + } +``` + +You will have to add the attribute `lerp = "TYPE_NAME"` to the component. +The `TYPE_NAME` must be a type that implements the `InterpFn` trait. +```rust,noplayground +pub trait InterpFn { + fn lerp(start: C, other: C, t: f32) -> C; +} +``` + + +## Complex interpolation + +In some cases, the interpolation logic can be more complex than a simple linear interpolation. +For example, we might want to have different interpolation functions for different entities, even if they have the same component type. +Or we might want to do interpolation based on multiple comments (applying some cubic spline interpolation that relies not only on the position, +but also on the velocity and acceleration). + +In those cases, you can disable the default per-component interpolation logic and provide your own custom logic. +```rust,noplayground``` + + + diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b4105161a..ac559e01b 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -17,6 +17,10 @@ publish = false name = "interest_management" path = "interest_management/main.rs" +[[example]] +name = "replication_groups" +path = "replication_groups/main.rs" + [[example]] name = "simple_box" path = "simple_box/main.rs" diff --git a/examples/interest_management/protocol.rs b/examples/interest_management/protocol.rs index 87aa7a014..befc8eb78 100644 --- a/examples/interest_management/protocol.rs +++ b/examples/interest_management/protocol.rs @@ -1,8 +1,10 @@ use bevy::prelude::{default, Bundle, Color, Component, Deref, DerefMut, Entity, Vec2}; +use bevy::utils::EntityHashSet; use derive_more::{Add, Mul}; use lightyear::prelude::*; use lightyear::shared::replication::components::ReplicationMode; use serde::{Deserialize, Serialize}; +use tracing::info; // Player #[derive(Bundle)] @@ -56,9 +58,15 @@ pub struct Circle; #[message(custom_map)] pub struct PlayerParent(Entity); -impl MapEntities for PlayerParent { - fn map_entities(&mut self, entity_map: &EntityMap) { - self.0.map_entities(entity_map); +impl<'a> MapEntities<'a> for PlayerParent { + fn map_entities(&mut self, entity_mapper: Box) { + info!("mapping parent entity {:?}", self.0); + self.0.map_entities(entity_mapper); + info!("After mapping: {:?}", self.0); + } + + fn entities(&self) -> EntityHashSet { + EntityHashSet::from_iter(vec![self.0]) } } @@ -84,7 +92,7 @@ pub struct Channel1; #[derive(Message, Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Message1(pub usize); -#[message_protocol(protocol = "MyProtocol", derive(Debug))] +#[message_protocol(protocol = "MyProtocol")] pub enum Messages { Message1(Message1), } diff --git a/examples/replication_groups/README.md b/examples/replication_groups/README.md new file mode 100644 index 000000000..c732e2206 --- /dev/null +++ b/examples/replication_groups/README.md @@ -0,0 +1,24 @@ +# Replication groups + +This is an example that shows how to make Lightyear replicate multiple entities in a single message, +to make sure that they are always in a consistent state (i.e. that entities in a group are all replicated on the same tick). + +Without a replication group, it is possible that one entity is replicated with the server's tick 10, and another entity +is replicated with the server's tick 11. This is not a problem if the entities are independent, but if they depend on each other (for example +for client prediction) it could cause issues. + +This is especially useful if you have an entity that depends on another entity (e.g. a player and its weapon), +the weapon might have a component `Parent(owner: Entity)` which references the player entity. +In which case we **need** the player entity to be spawned before the weapon entity, otherwise `Parent` component +will reference an entity that does not exist. + + +## Running the example + +To start the server, run `cargo run --example replication_groups server` + +Then you can launch multiple clients with the commands: + +- `cargo run --example replication_groups client -c 1` + +- `cargo run --example replication_groups client -c 2 --client-port 2000` diff --git a/examples/replication_groups/client.rs b/examples/replication_groups/client.rs new file mode 100644 index 000000000..7a6ff17fc --- /dev/null +++ b/examples/replication_groups/client.rs @@ -0,0 +1,427 @@ +use crate::protocol::Direction; +use crate::protocol::*; +use crate::shared::{shared_config, shared_movement_behaviour, shared_tail_behaviour}; +use crate::{Transports, KEY, PROTOCOL_ID}; +use bevy::prelude::*; +use lightyear::client::interpolation::plugin::InterpolationSet; +use lightyear::client::prediction::plugin::PredictionSet; +use lightyear::inputs::input_buffer::InputBuffer; +use lightyear::prelude::client::*; +use lightyear::prelude::*; +use std::collections::VecDeque; +use std::net::{Ipv4Addr, SocketAddr}; +use std::str::FromStr; +use std::time::Duration; + +#[derive(Resource, Clone, Copy)] +pub struct MyClientPlugin { + pub(crate) client_id: ClientId, + pub(crate) client_port: u16, + pub(crate) server_port: u16, + pub(crate) transport: Transports, +} + +impl Plugin for MyClientPlugin { + fn build(&self, app: &mut App) { + let server_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.server_port); + let auth = Authentication::Manual { + server_addr, + client_id: self.client_id, + private_key: KEY, + protocol_id: PROTOCOL_ID, + }; + let client_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.client_port); + let link_conditioner = LinkConditionerConfig { + incoming_latency: Duration::from_millis(200), + incoming_jitter: Duration::from_millis(40), + incoming_loss: 0.05, + }; + // let link_conditioner = LinkConditionerConfig { + // incoming_latency: Duration::from_millis(0), + // incoming_jitter: Duration::from_millis(0), + // incoming_loss: 0.00, + // }; + let transport = match self.transport { + Transports::Udp => TransportConfig::UdpSocket(client_addr), + Transports::Webtransport => TransportConfig::WebTransportClient { + client_addr, + server_addr, + }, + }; + let io = Io::from_config( + &IoConfig::from_transport(transport).with_conditioner(link_conditioner), + ); + let config = ClientConfig { + shared: shared_config().clone(), + input: InputConfig::default(), + netcode: Default::default(), + ping: PingConfig::default(), + sync: SyncConfig::default(), + prediction: PredictionConfig::default(), + // we are sending updates every frame (60fps), let's add a delay of 6 network-ticks + interpolation: InterpolationConfig { + delay: InterpolationDelay::default().with_send_interval_ratio(2.0), + // let's us completely override the interpolation logic + custom_interpolation_logic: true, + }, + }; + let plugin_config = PluginConfig::new(config, io, protocol(), auth); + app.add_plugins(ClientPlugin::new(plugin_config)); + app.add_plugins(crate::shared::SharedPlugin); + app.insert_resource(self.clone()); + app.add_systems(Startup, init); + // app.add_systems( + // PostUpdate, + // debug_interpolate + // .before(InterpolationSet::PrepareInterpolation) + // .after(InterpolationSet::DespawnFlush), + // ); + // app.add_systems( + // PreUpdate, + // debug_prediction_pre_rollback + // .after(PredictionSet::SpawnHistoryFlush) + // .before(PredictionSet::CheckRollback), + // ); + // app.add_systems( + // PreUpdate, + // debug_prediction_post_rollback + // .after(PredictionSet::CheckRollback) + // .before(PredictionSet::Rollback), + // ); + app.add_systems( + PostUpdate, + interpolate.in_set(InterpolationSet::Interpolate), + ); + app.add_systems( + FixedUpdate, + buffer_input.in_set(InputSystemSet::BufferInputs), + ); + app.add_systems( + FixedUpdate, + (movement, shared_tail_behaviour) + .chain() + .in_set(FixedUpdateSet::Main), + ); + app.add_systems(Update, (handle_predicted_spawn, handle_interpolated_spawn)); + } +} + +// Startup system for the client +pub(crate) fn init( + mut commands: Commands, + mut client: ResMut>, + plugin: Res, +) { + commands.spawn(Camera2dBundle::default()); + commands.spawn(TextBundle::from_section( + format!("Client {}", plugin.client_id), + TextStyle { + font_size: 30.0, + color: Color::WHITE, + ..default() + }, + )); + client.connect(); + // client.set_base_relative_speed(0.001); +} + +// System that reads from peripherals and adds inputs to the buffer +pub(crate) fn buffer_input(mut client: ResMut>, keypress: Res>) { + if keypress.pressed(KeyCode::W) || keypress.pressed(KeyCode::Up) { + return client.add_input(Inputs::Direction(Direction::Up)); + } + if keypress.pressed(KeyCode::S) || keypress.pressed(KeyCode::Down) { + return client.add_input(Inputs::Direction(Direction::Down)); + } + if keypress.pressed(KeyCode::A) || keypress.pressed(KeyCode::Left) { + return client.add_input(Inputs::Direction(Direction::Left)); + } + if keypress.pressed(KeyCode::D) || keypress.pressed(KeyCode::Right) { + return client.add_input(Inputs::Direction(Direction::Right)); + } + if keypress.pressed(KeyCode::Delete) { + // currently, inputs is an enum and we can only add one input per tick + return client.add_input(Inputs::Delete); + } + if keypress.pressed(KeyCode::Space) { + return client.add_input(Inputs::Spawn); + } + return client.add_input(Inputs::None); +} + +// The client input only gets applied to predicted entities that we own +// This works because we only predict the user's controlled entity. +// If we were predicting more entities, we would have to only apply movement to the player owned one. +pub(crate) fn movement( + // TODO: maybe make prediction mode a separate component!!! + mut position_query: Query<&mut PlayerPosition, With>, + mut input_reader: EventReader>, +) { + if PlayerPosition::mode() != ComponentSyncMode::Full { + return; + } + for input in input_reader.read() { + if let Some(input) = input.input() { + for mut position in position_query.iter_mut() { + shared_movement_behaviour(&mut position, input); + } + } + } +} + +// When the predicted copy of the client-owned entity is spawned, do stuff +// - assign it a different saturation +// - keep track of it in the Global resource +pub(crate) fn handle_predicted_spawn(mut predicted: Query<&mut PlayerColor, Added>) { + for mut color in predicted.iter_mut() { + color.0.set_s(0.3); + } +} + +// When the predicted copy of the client-owned entity is spawned, do stuff +// - assign it a different saturation +// - keep track of it in the Global resource +pub(crate) fn handle_interpolated_spawn( + mut interpolated: Query<&mut PlayerColor, Added>, +) { + for mut color in interpolated.iter_mut() { + color.0.set_s(0.1); + } +} + +pub(crate) fn debug_prediction_pre_rollback( + client: Res>, + parent_query: Query<&PredictionHistory>, + tail_query: Query<(&PlayerParent, &PredictionHistory)>, +) { + info!(tick = ?client.tick(), + inputs = ?client.get_input_buffer(), + "prediction pre rollback debug"); + for (parent, tail_history) in tail_query.iter() { + let parent_history = parent_query + .get(parent.0) + .expect("Tail entity has no parent entity!"); + info!(?parent_history, "parent"); + info!(?tail_history, "tail"); + } +} + +pub(crate) fn debug_prediction_post_rollback( + client: Res>, + parent_query: Query<&PredictionHistory>, + tail_query: Query<(&PlayerParent, &PredictionHistory)>, +) { + info!(tick = ?client.tick(), "prediction post rollback debug"); + for (parent, tail_history) in tail_query.iter() { + let parent_history = parent_query + .get(parent.0) + .expect("Tail entity has no parent entity!"); + info!(?parent_history, "parent"); + info!(?tail_history, "tail"); + } +} + +pub(crate) fn debug_interpolate( + client: Res>, + parent_query: Query<( + &InterpolateStatus, + &ConfirmedHistory, + )>, + tail_query: Query<( + &PlayerParent, + &InterpolateStatus, + &ConfirmedHistory, + )>, +) { + info!(tick = ?client.tick(), "interpolation debug"); + for (parent, tail_status, tail_history) in tail_query.iter() { + let (parent_status, parent_history) = parent_query + .get(parent.0) + .expect("Tail entity has no parent entity!"); + info!(?parent_status, ?parent_history, "parent"); + info!(?tail_status, ?tail_history, "tail"); + } +} + +// Here, we want to have a custom interpolation logic, because we need to query two components +// at once to do the interpolation correctly. +// We want the interpolated entity to stay on the tail path of the confirmed entity at all times. +// The `InterpolateStatus` provides the start and end tick + component value, making it easy to perform interpolation. +pub(crate) fn interpolate( + mut parent_query: Query<(&mut PlayerPosition, &InterpolateStatus)>, + mut tail_query: Query<( + &PlayerParent, + &TailLength, + &mut TailPoints, + &InterpolateStatus, + )>, +) { + 'outer: for (parent, tail_length, mut tail, tail_status) in tail_query.iter_mut() { + let (mut parent_position, parent_status) = parent_query + .get_mut(parent.0) + .expect("Tail entity has no parent entity!"); + info!( + ?parent_position, + ?tail, + ?parent_status, + ?tail_status, + "interpolate situation" + ); + // the ticks should be the same for both components + if let Some((start_tick, tail_start_value)) = &tail_status.start { + if parent_status.start.is_none() { + // the parent component has not been confirmed yet, so we can't interpolate + continue; + } + let pos_start = &parent_status.start.as_ref().unwrap().1; + if tail_status.current == *start_tick { + assert_eq!(tail_status.current, parent_status.current); + *tail = tail_start_value.clone(); + *parent_position = pos_start.clone(); + info!( + ?tail, + ?parent_position, + "after interpolation; CURRENT = START" + ); + continue; + } + + if let Some((end_tick, tail_end_value)) = &tail_status.end { + if parent_status.end.is_none() { + // the parent component has not been confirmed yet, so we can't interpolate + continue; + } + let pos_end = &parent_status.end.as_ref().unwrap().1; + if tail_status.current == *end_tick { + assert_eq!(tail_status.current, parent_status.current); + *tail = tail_end_value.clone(); + *parent_position = pos_end.clone(); + info!( + ?tail, + ?parent_position, + "after interpolation; CURRENT = END" + ); + continue; + } + if start_tick != end_tick { + // the new tail will be similar to the old tail, with some added points at the front + *tail = tail_start_value.clone(); + *parent_position = pos_start.clone(); + + // interpolation ratio + let t = (tail_status.current - *start_tick) as f32 + / (*end_tick - *start_tick) as f32; + + let mut tail_diff_length = 0.0; + // find in which end tail segment the previous head_position is + + // deal with the first segment separately + if let Some(ratio) = + pos_start.is_between(tail_end_value.0.front().unwrap().0, pos_end.0) + { + // we might need to add a new point to the tail + if tail_end_value.0.front().unwrap().0 + != tail_start_value.0.front().unwrap().0 + { + tail.0.push_front(tail_end_value.0.front().unwrap().clone()); + info!("ADD POINT"); + } + // the path is straight! just move the head and adjust the tail + *parent_position = + PlayerPosition::lerp(pos_start.clone(), pos_end.clone(), t); + tail.shorten_back(parent_position.0, tail_length.0); + info!( + ?tail, + ?parent_position, + "after interpolation; FIRST SEGMENT" + ); + continue; + } + + // else, the final head position is not on the first segment + tail_diff_length += + segment_length(pos_end.0, tail_end_value.0.front().unwrap().0); + + // amount of distance we need to move the player by, while remaining on the path + let mut pos_distance_to_do = 0.0; + // segment [segment_idx-1, segment_idx] is the segment where the starting pos is. + let mut segment_idx = 0; + // else, keep trying to find in the remaining segments + for i in 1..tail_end_value.0.len() { + let segment_length = + segment_length(tail_end_value.0[i].0, tail_end_value.0[i - 1].0); + if let Some(ratio) = + pos_start.is_between(tail_end_value.0[i].0, tail_end_value.0[i - 1].0) + { + if ratio == 0.0 { + // need to add a new point + tail.0.push_front(tail_end_value.0[i].clone()); + } + // we found the segment where the starting pos is. + // let's find the total amount that the tail moved + tail_diff_length += (1.0 - ratio) * segment_length; + pos_distance_to_do = t * tail_diff_length; + segment_idx = i; + break; + } else { + tail_diff_length += segment_length; + } + } + + // now move the head by `pos_distance_to_do` while remaining on the tail path + for i in (0..segment_idx).rev() { + let dist = segment_length(parent_position.0, tail_end_value.0[i].0); + info!( + ?i, + ?dist, + ?pos_distance_to_do, + ?tail_diff_length, + ?segment_idx, + "in other segments" + ); + if pos_distance_to_do < 1000.0 * f32::EPSILON { + info!(?tail, ?parent_position, "after interpolation; ON POINT"); + // no need to change anything + continue 'outer; + } + // if (dist - pos_distance_to_do) < 1000.0 * f32::EPSILON { + // // the head is on a point (do not add a new point yet) + // parent_position.0 = tail_end_value.0[i].0; + // pos_distance_to_do -= dist; + // tail.shorten_back(parent_position.0, tail_length.0); + // info!(?tail, ?parent_position, "after interpolation; ON POINT"); + // continue; + // } else if dist > pos_distance_to_do { + if dist < pos_distance_to_do { + // the head must go through this tail point + parent_position.0 = tail_end_value.0[i].0; + tail.0.push_front(tail_end_value.0[i]); + pos_distance_to_do -= dist; + } else { + // we found the final segment where the head will be + parent_position.0 = tail + .0 + .front() + .unwrap() + .1 + .get_tail(tail_end_value.0[i].0, dist - pos_distance_to_do); + tail.shorten_back(parent_position.0, tail_length.0); + info!(?tail, ?parent_position, "after interpolation; ELSE"); + continue 'outer; + } + } + // the final position is on the first segment + let dist = segment_length(pos_end.0, tail_end_value.0.front().unwrap().0); + parent_position.0 = tail + .0 + .front() + .unwrap() + .1 + .get_tail(pos_end.0, dist - pos_distance_to_do); + tail.shorten_back(parent_position.0, tail_length.0); + info!(?tail, ?parent_position, "after interpolation; ELSE FIRST"); + } + } + } + } +} diff --git a/examples/replication_groups/main.rs b/examples/replication_groups/main.rs new file mode 100644 index 000000000..7b2f11a7d --- /dev/null +++ b/examples/replication_groups/main.rs @@ -0,0 +1,96 @@ +#![allow(unused_imports)] +#![allow(unused_variables)] +#![allow(dead_code)] + +//! Run with +//! - `cargo run --example bevy_cli server` +//! - `cargo run --example bevy_cli client` +mod client; +mod protocol; +mod server; +mod shared; + +use std::str::FromStr; + +use bevy::log::LogPlugin; +use bevy::prelude::*; +use bevy::DefaultPlugins; +use clap::{Parser, ValueEnum}; +use serde::{Deserialize, Serialize}; +use tracing_subscriber::fmt::format::FmtSpan; + +use crate::client::MyClientPlugin; +use crate::server::MyServerPlugin; +use lightyear::netcode::{ClientId, Key}; +use lightyear::prelude::TransportConfig; + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + let mut app = App::new(); + app.add_plugins(DefaultPlugins.build().disable::()); + setup(&mut app, cli); + + app.run(); +} + +pub const CLIENT_PORT: u16 = 6000; +pub const SERVER_PORT: u16 = 5000; +pub const PROTOCOL_ID: u64 = 0; + +pub const KEY: Key = [0; 32]; + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +pub enum Transports { + Udp, + Webtransport, +} + +#[derive(Parser, PartialEq, Debug)] +enum Cli { + SinglePlayer, + Server { + #[arg(short, long, default_value_t = SERVER_PORT)] + port: u16, + + #[arg(short, long, value_enum, default_value_t = Transports::Udp)] + transport: Transports, + }, + Client { + #[arg(short, long, default_value_t = ClientId::default())] + client_id: ClientId, + + #[arg(long, default_value_t = CLIENT_PORT)] + client_port: u16, + + #[arg(short, long, default_value_t = SERVER_PORT)] + server_port: u16, + + #[arg(short, long, value_enum, default_value_t = Transports::Udp)] + transport: Transports, + }, +} + +fn setup(app: &mut App, cli: Cli) { + match cli { + Cli::SinglePlayer => {} + Cli::Server { port, transport } => { + let server_plugin = MyServerPlugin { port, transport }; + app.add_plugins(server_plugin); + } + Cli::Client { + client_id, + client_port, + server_port, + transport, + } => { + let client_plugin = MyClientPlugin { + client_id, + client_port, + server_port, + transport, + }; + app.add_plugins(client_plugin); + } + } +} diff --git a/examples/replication_groups/protocol.rs b/examples/replication_groups/protocol.rs new file mode 100644 index 000000000..30ddb0d55 --- /dev/null +++ b/examples/replication_groups/protocol.rs @@ -0,0 +1,307 @@ +use bevy::prelude::{default, Bundle, Color, Component, Deref, DerefMut, Entity, Reflect, Vec2}; +use bevy::utils::EntityHashSet; +use derive_more::{Add, Mul}; +use lightyear::prelude::client::InterpFn; +use lightyear::prelude::*; +use lightyear::shared::replication::components::ReplicationGroup; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use tracing::{info, trace}; + +// Player +#[derive(Bundle)] +pub(crate) struct PlayerBundle { + id: PlayerId, + position: PlayerPosition, + color: PlayerColor, + replicate: Replicate, +} + +// Tail +#[derive(Bundle)] +pub(crate) struct TailBundle { + parent: PlayerParent, + points: TailPoints, + length: TailLength, + replicate: Replicate, +} + +impl PlayerBundle { + pub(crate) fn new(id: ClientId, position: Vec2, color: Color) -> Self { + Self { + id: PlayerId(id), + position: PlayerPosition(position), + color: PlayerColor(color), + replicate: Replicate { + // prediction_target: NetworkTarget::None, + prediction_target: NetworkTarget::Only(vec![id]), + // interpolation_target: NetworkTarget::None, + interpolation_target: NetworkTarget::AllExcept(vec![id]), + // this is the default: the replication group id is a u64 value generated from the entity (`entity.to_bits()`) + replication_group: ReplicationGroup::FromEntity, + ..default() + }, + } + } +} + +impl TailBundle { + pub(crate) fn new(id: ClientId, parent: Entity, parent_position: Vec2, length: f32) -> Self { + let default_direction = Direction::default(); + let tail = default_direction.get_tail(parent_position, length); + let mut points = VecDeque::new(); + points.push_front((tail, default_direction)); + Self { + parent: PlayerParent(parent), + points: TailPoints(points), + length: TailLength(length), + replicate: Replicate { + // prediction_target: NetworkTarget::None, + prediction_target: NetworkTarget::Only(vec![id]), + // interpolation_target: NetworkTarget::None, + interpolation_target: NetworkTarget::AllExcept(vec![id]), + // replicate this entity within the same replication group as the parent + replication_group: ReplicationGroup::Group(parent.to_bits()), + ..default() + }, + } + } +} + +// Components + +#[derive(Component, Message, Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct PlayerId(ClientId); + +#[derive( + Component, Message, Serialize, Deserialize, Clone, Debug, PartialEq, Deref, DerefMut, Add, Mul, +)] +pub struct PlayerPosition(pub(crate) Vec2); + +impl PlayerPosition { + /// Checks if the position is between two other positions. + /// (the positions must have the same x or y) + /// Will return None if it's not in between, otherwise will return where it is between a and b + pub(crate) fn is_between(&self, a: Vec2, b: Vec2) -> Option { + if a.x == b.x { + if self.x != a.x { + return None; + } + if a.y < b.y { + if a.y <= self.y && self.y <= b.y { + return Some((self.y - a.y) / (b.y - a.y)); + } else { + return None; + } + } else { + if b.y <= self.y && self.y <= a.y { + return Some((a.y - self.y) / (a.y - b.y)); + } else { + return None; + } + } + } else if a.y == b.y { + if self.y != a.y { + return None; + } + if a.x < b.x { + if a.x <= self.x && self.x <= b.x { + return Some((self.x - a.x) / (b.x - a.x)); + } else { + return None; + } + } else { + if b.x <= self.x && self.x <= a.x { + return Some((a.x - self.x) / (a.x - b.x)); + } else { + return None; + } + } + } + unreachable!("a and b should be on the same x or y") + } +} + +#[derive(Component, Message, Deserialize, Serialize, Clone, Debug, PartialEq)] +pub struct PlayerColor(pub(crate) Color); + +#[derive(Component, Message, Deserialize, Serialize, Clone, Debug, PartialEq)] +pub struct TailLength(pub(crate) f32); + +#[derive(Component, Message, Deserialize, Serialize, Clone, Debug, PartialEq)] +// tail inflection points, from front (point closest to the head) to back (tail end point) +pub struct TailPoints(pub(crate) VecDeque<(Vec2, Direction)>); + +pub fn segment_length(from: Vec2, to: Vec2) -> f32 { + (from - to).length() +} +impl TailPoints { + /// Make sure that the tail is exactly `length` long + pub(crate) fn shorten_back(&mut self, head: Vec2, length: f32) { + // find the index of the first point to modify (all points after that needs to be discarded) + + // treat the first point separately + let mut current_length = segment_length(head, self.0.front().unwrap().0); + if current_length >= length { + trace!("shortening first segment"); + let direction = self.0.front().unwrap().1; + let new_point = direction.get_tail(head, length); + self.0 = VecDeque::new(); + self.0.push_front((new_point, direction)); + return; + } + for i in 1..self.0.len() { + let segment_length = segment_length(self.0[i - 1].0, self.0[i].0); + current_length += segment_length; + if current_length > length { + trace!("shortening tail"); + let direction = self.0[i].1; + let new_segment_length = segment_length - (current_length - length); + + // shorten the segment, and drop the rest + if new_segment_length > 0.0 { + let new_point = direction + .get_tail(self.0[i - 1].0, segment_length - (current_length - length)); + // drop all elements from [i, ..[ + let _ = self.0.split_off(i); + self.0.push_back((new_point, direction)); + } else { + // drop all elements from [i, ..[ + let _ = self.0.split_off(i); + } + trace!("new tail: {:?}", self.0); + return; + } + } + } +} + +// Example of a component that contains an entity. +// This component, when replicated, needs to have the inner entity mapped from the Server world +// to the client World. +// This can be done by adding a `#[message(custom_map)]` attribute to the component, and then +// deriving the `MapEntities` trait for the component. +#[derive(Component, Message, Deserialize, Serialize, Clone, Debug, PartialEq)] +#[message(custom_map)] +pub struct PlayerParent(pub(crate) Entity); + +impl<'a> MapEntities<'a> for PlayerParent { + fn map_entities(&mut self, entity_mapper: Box) { + info!("mapping parent entity {:?}", self.0); + self.0.map_entities(entity_mapper); + info!("After mapping: {:?}", self.0); + } + + fn entities(&self) -> EntityHashSet { + EntityHashSet::from_iter(vec![self.0]) + } +} + +#[component_protocol(protocol = "MyProtocol")] +pub enum Components { + #[sync(once)] + PlayerId(PlayerId), + #[sync(full)] + PlayerPosition(PlayerPosition), + #[sync(once)] + PlayerColor(PlayerColor), + #[sync(once)] + TailLength(TailLength), + // we set the interpolation function to NoInterpolation because we are using our own custom interpolation logic + // (by default it would use LinearInterpolation, which requires Add and Mul bounds on this component) + #[sync(full, lerp = "NoInterpolation")] + TailPoints(TailPoints), + #[sync(once)] + PlayerParent(PlayerParent), +} + +// Channels + +#[derive(Channel)] +pub struct Channel1; + +// Messages + +#[derive(Message, Serialize, Deserialize, Clone, Debug, PartialEq)] +pub struct Message1(pub usize); + +#[message_protocol(protocol = "MyProtocol")] +pub enum Messages { + Message1(Message1), +} + +// Inputs + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone, Copy, Default)] +// To simplify, we only allow one direction at a time +pub enum Direction { + #[default] + Up, + Down, + Left, + Right, +} + +impl Direction { + // Get the direction from `from` to `to` (doesn't handle diagonals) + pub fn from_points(from: Vec2, to: Vec2) -> Option { + if from.x != to.x && from.y != to.y { + trace!(?from, ?to, "diagonal"); + return None; + } + if from.y < to.y { + return Some(Self::Up); + } + if from.y > to.y { + return Some(Self::Down); + } + if from.x > to.x { + return Some(Self::Left); + } + if from.x < to.x { + return Some(Self::Right); + } + return None; + } + + // Get the position of the point that would become `head` if we applied `length` * `self` + pub fn get_tail(&self, head: Vec2, length: f32) -> Vec2 { + match self { + Direction::Up => Vec2::new(head.x, head.y - length), + Direction::Down => Vec2::new(head.x, head.y + length), + Direction::Left => Vec2::new(head.x + length, head.y), + Direction::Right => Vec2::new(head.x - length, head.y), + } + } +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] +pub enum Inputs { + Direction(Direction), + Delete, + Spawn, + // NOTE: the server MUST be able to distinguish between an input saying "the user is not doing any actions" and + // "we haven't received the input for this tick", which means that the client must send inputs every tick + // even if the user is not doing anything. + None, +} + +impl UserInput for Inputs {} + +// Protocol + +protocolize! { + Self = MyProtocol, + Message = Messages, + Component = Components, + Input = Inputs, +} + +pub(crate) fn protocol() -> MyProtocol { + let mut protocol = MyProtocol::default(); + protocol.add_channel::(ChannelSettings { + mode: ChannelMode::OrderedReliable(ReliableSettings::default()), + direction: ChannelDirection::Bidirectional, + }); + protocol +} diff --git a/examples/replication_groups/server.rs b/examples/replication_groups/server.rs new file mode 100644 index 000000000..b23526461 --- /dev/null +++ b/examples/replication_groups/server.rs @@ -0,0 +1,146 @@ +use crate::protocol::*; +use crate::shared::{shared_config, shared_movement_behaviour, shared_tail_behaviour}; +use crate::{shared, Transports, KEY, PROTOCOL_ID}; +use bevy::prelude::*; +use lightyear::inputs::input_buffer::InputBuffer; +use lightyear::prelude::server::*; +use lightyear::prelude::*; +use std::collections::HashMap; +use std::net::{Ipv4Addr, SocketAddr}; +use std::time::Duration; + +pub struct MyServerPlugin { + pub(crate) port: u16, + pub(crate) transport: Transports, +} + +impl Plugin for MyServerPlugin { + fn build(&self, app: &mut App) { + let server_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.port); + let netcode_config = NetcodeConfig::default() + .with_protocol_id(PROTOCOL_ID) + .with_key(KEY); + let link_conditioner = LinkConditionerConfig { + incoming_latency: Duration::from_millis(200), + incoming_jitter: Duration::from_millis(40), + incoming_loss: 0.05, + }; + let transport = match self.transport { + Transports::Udp => TransportConfig::UdpSocket(server_addr), + Transports::Webtransport => TransportConfig::WebTransportServer { + server_addr, + certificate: Certificate::self_signed(&["localhost"]), + }, + }; + let io = Io::from_config( + &IoConfig::from_transport(transport).with_conditioner(link_conditioner), + ); + let config = ServerConfig { + shared: shared_config().clone(), + netcode: netcode_config, + ping: PingConfig::default(), + }; + let plugin_config = PluginConfig::new(config, io, protocol()); + app.add_plugins(server::ServerPlugin::new(plugin_config)); + app.add_plugins(shared::SharedPlugin); + app.init_resource::(); + app.add_systems(Startup, init); + // the physics/FixedUpdates systems that consume inputs should be run in this set + app.add_systems( + FixedUpdate, + (movement, shared_tail_behaviour) + .chain() + .in_set(FixedUpdateSet::Main), + ); + app.add_systems(Update, handle_connections); + // app.add_systems(Update, debug_inputs); + } +} + +#[derive(Resource, Default)] +pub(crate) struct Global { + pub client_id_to_entity_id: HashMap, +} + +pub(crate) fn init(mut commands: Commands) { + commands.spawn(Camera2dBundle::default()); + commands.spawn(TextBundle::from_section( + "Server", + TextStyle { + font_size: 30.0, + color: Color::WHITE, + ..default() + }, + )); +} + +/// Server connection system, create a player upon connection +pub(crate) fn handle_connections( + mut connections: EventReader, + mut disconnections: EventReader, + mut global: ResMut, + mut commands: Commands, +) { + for connection in connections.read() { + let client_id = connection.context(); + // Generate pseudo random color from client id. + let h = (((client_id * 30) % 360) as f32) / 360.0; + let s = 0.8; + let l = 0.5; + let player_position = Vec2::ZERO; + let player_entity = commands + .spawn(PlayerBundle::new( + *client_id, + player_position, + Color::hsl(h, s, l), + )) + .id(); + let tail_length = 300.0; + let tail_entity = commands.spawn(TailBundle::new( + *client_id, + player_entity, + player_position, + tail_length, + )); + // Add a mapping from client id to entity id + global + .client_id_to_entity_id + .insert(*client_id, player_entity); + } + for disconnection in disconnections.read() { + let client_id = disconnection.context(); + if let Some(entity) = global.client_id_to_entity_id.remove(client_id) { + // TODO: also despawn tail, maybe by emitting an event? + commands.entity(entity).despawn(); + } + } +} + +/// Read client inputs and move players +pub(crate) fn movement( + mut position_query: Query<&mut PlayerPosition>, + mut input_reader: EventReader>, + global: Res, + server: Res>, +) { + for input in input_reader.read() { + let client_id = input.context(); + if let Some(input) = input.input() { + debug!( + "Receiving input: {:?} from client: {:?} on tick: {:?}", + input, + client_id, + server.tick() + ); + if let Some(player_entity) = global.client_id_to_entity_id.get(client_id) { + if let Ok(mut position) = position_query.get_mut(*player_entity) { + shared_movement_behaviour(&mut position, input); + } + } + } + } +} + +pub(crate) fn debug_inputs(server: Res>) { + info!(tick = ?server.tick(), inputs = ?server.get_input_buffer(1), "debug"); +} diff --git a/examples/replication_groups/shared.rs b/examples/replication_groups/shared.rs new file mode 100644 index 000000000..9eb08ab02 --- /dev/null +++ b/examples/replication_groups/shared.rs @@ -0,0 +1,134 @@ +use crate::protocol::Direction; +use crate::protocol::*; +use bevy::prelude::*; +use bevy_inspector_egui::quick::WorldInspectorPlugin; +use lightyear::prelude::client::{Confirmed, Interpolated}; +use lightyear::prelude::*; +use std::time::Duration; +use tracing::Level; + +pub fn shared_config() -> SharedConfig { + SharedConfig { + enable_replication: true, + client_send_interval: Duration::default(), + // server_send_interval: Duration::from_millis(40), + server_send_interval: Duration::from_millis(100), + tick: TickConfig { + tick_duration: Duration::from_secs_f64(1.0 / 64.0), + }, + log: LogConfig { + level: Level::WARN, + filter: "wgpu=error,wgpu_hal=error,naga=warn,bevy_app=info,bevy_render=warn,quinn=warn" + .to_string(), + }, + } +} + +pub struct SharedPlugin; + +impl Plugin for SharedPlugin { + fn build(&self, app: &mut App) { + app.add_plugins(WorldInspectorPlugin::new()); + app.add_systems(Update, draw_snakes); + } +} + +// head +// snake + +// This system defines how we update the player's positions when we receive an input +pub(crate) fn shared_movement_behaviour(position: &mut PlayerPosition, input: &Inputs) { + const MOVE_SPEED: f32 = 10.0; + match input { + Inputs::Direction(direction) => match direction { + Direction::Up => position.y += MOVE_SPEED, + Direction::Down => position.y -= MOVE_SPEED, + Direction::Left => position.x -= MOVE_SPEED, + Direction::Right => position.x += MOVE_SPEED, + }, + _ => {} + } +} + +// This system defines how we update the player's tails when the head is updated +// Note: we only apply logic for the Predicted entity on the client (Interpolated is updated +// during interpolation, and Confirmed is just replicated from Server) +pub(crate) fn shared_tail_behaviour( + player_position: Query, (Without, Without)>, + mut tails: Query< + (&mut TailPoints, &PlayerParent, &TailLength), + (Without, Without), + >, +) { + for (mut points, parent, length) in tails.iter_mut() { + let Ok(parent_position) = player_position.get(parent.0) else { + error!("Tail entity has no parent entity!"); + continue; + }; + // if the parent position didn't change, we don't need to update the tail + // (also makes sure we don't trigger change detection for the tail! which would mean we add + // new elements to the tail's history buffer) + if !parent_position.is_changed() { + continue; + } + // Update the front if the head turned + let (front_pos, front_dir) = points.0.front().unwrap().clone(); + // NOTE: we do not deal with diagonal directions in this example + let front_direction = Direction::from_points(front_pos, parent_position.0); + // if the head is going in a new direction, add a new point to the front + if front_direction.map_or(true, |dir| dir != front_dir) { + trace!( + old_front_dir = ?front_dir, + new_front_dir = ?front_direction, + "creating new inflection point"); + let inflection_pos = match front_dir { + Direction::Up | Direction::Down => Vec2::new(front_pos.x, parent_position.y), + Direction::Left | Direction::Right => Vec2::new(parent_position.x, front_pos.y), + }; + let new_front_dir = Direction::from_points(inflection_pos, parent_position.0).unwrap(); + points.0.push_front((inflection_pos, new_front_dir)); + trace!(?points, "new points"); + } + + // Update the back + // remove the back points that are above the length + points.shorten_back(parent_position.0, length.0); + } +} + +/// System that draws the boxed of the player positions. +/// The components should be replicated from the server to the client +pub(crate) fn draw_snakes( + mut gizmos: Gizmos, + players: Query<(&PlayerPosition, &PlayerColor), Without>, + tails: Query<(&PlayerParent, &TailPoints), Without>, +) { + for (parent, points) in tails.iter() { + debug!("drawing snake with parent: {:?}", parent.0); + let Ok((position, color)) = players.get(parent.0) else { + error!("Tail entity has no parent entity!"); + continue; + }; + // draw the head + gizmos.rect( + Vec3::new(position.x, position.y, 0.0), + Quat::IDENTITY, + Vec2::ONE * 20.0, + color.0, + ); + // draw the first line + gizmos.line_2d(position.0, points.0.front().unwrap().0, color.0); + if position.0.x != points.0.front().unwrap().0.x + && position.0.y != points.0.front().unwrap().0.y + { + info!("DIAGONAL"); + } + // draw the rest of the lines + for (start, end) in points.0.iter().zip(points.0.iter().skip(1)) { + gizmos.line_2d(start.0, end.0, color.0); + if start.0.x != end.0.x && start.0.y != end.0.y { + info!("DIAGONAL"); + } + } + } +} diff --git a/examples/simple_box/protocol.rs b/examples/simple_box/protocol.rs index f8dd1b757..21917a711 100644 --- a/examples/simple_box/protocol.rs +++ b/examples/simple_box/protocol.rs @@ -1,4 +1,5 @@ use bevy::prelude::{default, Bundle, Color, Component, Deref, DerefMut, Entity, Vec2}; +use bevy::utils::EntityHashSet; use derive_more::{Add, Mul}; use lightyear::prelude::*; use serde::{Deserialize, Serialize}; @@ -50,13 +51,16 @@ pub struct PlayerColor(pub(crate) Color); #[message(custom_map)] pub struct PlayerParent(Entity); -impl MapEntities for PlayerParent { - fn map_entities(&mut self, entity_map: &EntityMap) { - self.0.map_entities(entity_map); +impl<'a> MapEntities<'a> for PlayerParent { + fn map_entities(&mut self, entity_mapper: Box) { + self.0.map_entities(entity_mapper); + } + + fn entities(&self) -> EntityHashSet { + EntityHashSet::from_iter(vec![self.0]) } } -// #[component_protocol(protocol = "MyProtocol", derive(Debug))] #[component_protocol(protocol = "MyProtocol")] pub enum Components { #[sync(once)] @@ -77,7 +81,7 @@ pub struct Channel1; #[derive(Message, Serialize, Deserialize, Clone, Debug, PartialEq)] pub struct Message1(pub usize); -#[message_protocol(protocol = "MyProtocol", derive(Debug))] +#[message_protocol(protocol = "MyProtocol")] pub enum Messages { Message1(Message1), } diff --git a/examples/src/protocol.rs b/examples/src/protocol.rs index 360b0d06b..be21a62e5 100644 --- a/examples/src/protocol.rs +++ b/examples/src/protocol.rs @@ -11,7 +11,7 @@ pub struct Message1(pub String); #[derive(Message, Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct Message2(pub u32); -#[message_protocol(protocol = "MyProtocol", derive(Debug))] +#[message_protocol(protocol = "MyProtocol")] pub enum MyMessageProtocol { Message1(Message1), Message2(Message2), @@ -27,7 +27,6 @@ pub struct Component2(pub f32); #[derive(Component, Message, Serialize, Deserialize, Clone, Debug, PartialEq, Add, Mul)] pub struct Component3(pub f32); -// #[component_protocol(protocol = "MyProtocol", derive(Debug))] #[component_protocol(protocol = "MyProtocol")] pub enum MyComponentsProtocol { #[sync(full)] diff --git a/lightyear/Cargo.toml b/lightyear/Cargo.toml index 425aa30c2..40e6a32e2 100644 --- a/lightyear/Cargo.toml +++ b/lightyear/Cargo.toml @@ -13,7 +13,6 @@ license = "MIT OR Apache-2.0" exclude = ["/tests"] [features] -debug = [] metrics = [ "dep:metrics", "metrics-util", @@ -75,6 +74,7 @@ tracing-subscriber = { version = "0.3.17", features = [ # server crossbeam-channel = { version = "0.5.8", features = [] } +#petgraph = "0.6.4" # for replication # metrics metrics = { version = "0.21", optional = true } diff --git a/lightyear/src/client/components.rs b/lightyear/src/client/components.rs index 25120dc91..ea6e4f5be 100644 --- a/lightyear/src/client/components.rs +++ b/lightyear/src/client/components.rs @@ -2,7 +2,10 @@ Defines components that are used for the client-side prediction and interpolation */ +use crate::prelude::{Named, TypeNamed}; +use bevy::ecs::component::TableStorage; use bevy::prelude::{Component, Entity}; +use std::fmt::{Debug, Formatter}; /// Marks an entity that contains the server-updates that are received from the Server /// (this entity is a copy of Predicted that is RTT ticks behind) @@ -12,11 +15,11 @@ pub struct Confirmed { pub interpolated: Option, } -pub trait SyncComponent: Component + Clone + PartialEq { +pub trait SyncComponent: Component + Clone + PartialEq + Named { fn mode() -> ComponentSyncMode; } -#[derive(Debug, PartialEq)] +#[derive(Debug, Default, PartialEq)] /// Defines how a predicted or interpolated component will be replicated from confirmed to predicted/interpolated /// /// We use a single enum instead of 2 separate enums because we want to be able to use the same enum for both predicted and interpolated components @@ -37,4 +40,8 @@ pub enum ComponentSyncMode { /// The component will be copied only-once from the confirmed to the interpolated/predicted entity, and then won't stay in sync /// Useful for components that you want to modify yourself on the predicted/interpolated entity Once, + + #[default] + /// The component is not copied from the Confirmed entity to the interpolated/predicted entity + None, } diff --git a/lightyear/src/client/connection.rs b/lightyear/src/client/connection.rs index ded3c6314..1c5ece6f8 100644 --- a/lightyear/src/client/connection.rs +++ b/lightyear/src/client/connection.rs @@ -65,13 +65,14 @@ impl Connection

{ let tick = self.base.recv_packet(reader)?; debug!("Received server packet with tick: {:?}", tick); if tick >= self.sync_manager.latest_received_server_tick { + trace!("new last recv server tick: {:?}", tick); self.sync_manager.latest_received_server_tick = tick; // TODO: add 'received_new_server_tick' ? // we probably actually physically received the packet some time between our last `receive` and now. // Let's add delta / 2 as a compromise self.sync_manager.duration_since_latest_received_server_tick = Duration::default(); // self.sync_manager.duration_since_latest_received_server_tick = time_manager.delta() / 2; - self.sync_manager.update_current_server_time( + self.sync_manager.update_server_time_estimate( tick_manager.config.tick_duration, self.base.ping_manager.rtt(), ); diff --git a/lightyear/src/client/input.rs b/lightyear/src/client/input.rs index 4b26b0103..45c11b313 100644 --- a/lightyear/src/client/input.rs +++ b/lightyear/src/client/input.rs @@ -188,10 +188,17 @@ fn prepare_input_message(mut client: ResMut>) { error!("Error while sending input message: {:?}", err); }) } + // NOTE: actually we keep the input values! because they might be needed when we rollback for client prediction + // TODO: figure out when we can delete old inputs. Basically when the oldest prediction group tick has passed? + // maybe at interpolation_tick(), since it's before any latest server update we receive? + // delete old input values - client - .get_mut_input_buffer() - .pop(current_tick - (message_len + 1)); + let interpolation_tick = client + .connection + .sync_manager + .interpolation_tick(&client.tick_manager); + client.get_mut_input_buffer().pop(interpolation_tick); + // .pop(current_tick - (message_len + 1)); } // on the client: diff --git a/lightyear/src/client/interpolation/interpolate.rs b/lightyear/src/client/interpolation/interpolate.rs index a02809839..ad19d155c 100644 --- a/lightyear/src/client/interpolation/interpolate.rs +++ b/lightyear/src/client/interpolation/interpolate.rs @@ -63,11 +63,21 @@ pub(crate) fn update_interpolate_status( } } + // TODO: do we need to call this if status.end is set? probably not because the updates are sequenced? + + // TODO: CAREFUL, we need to always leave a value in the history, so that we can compute future values? + // maybe not, because for interpolation we don't care about the value at a given specific tick + // clear all values with a tick <= current_interpolate_tick, and get the last cleared value // (we need to call this even if status.start is set, because a new more recent server update could have been received) let new_start = history.pop_until_tick(current_interpolate_tick); if let Some((new_tick, _)) = new_start { - if start.as_ref().map_or(true, |(tick, _)| *tick < new_tick) { + if start.as_ref().map_or(true, |(tick, _)| *tick <= new_tick) { + trace!( + ?current_interpolate_tick, + old_start = ?start.as_ref().map(|(tick, _)| tick), + new_start = ?new_tick, + "found more recent tick between start and interpolation tick"); start = new_start; } } @@ -119,16 +129,24 @@ pub(crate) fn update_interpolate_status( // } // end = temp_end; - // If it's been too long since we received an update, reset the server tick to None + // If it's been too long since we received an update, reset the start tick to None // (so that we wait again until interpolation_tick is between two server updates) - let temp_start = std::mem::take(&mut start); - if let Some((start_tick, _)) = temp_start { - if current_interpolate_tick - start_tick < send_interval_delta_tick { - start = temp_start; + // otherwise the interpolation will seem weird because the start tick is very old + // Only do this when end_tick is None, otherwise it could affect the currently running + // interpolation + if end.is_none() { + let temp_start = std::mem::take(&mut start); + if let Some((start_tick, _)) = temp_start { + if current_interpolate_tick - start_tick < send_interval_delta_tick { + start = temp_start; + } + // else (if it's been too long), reset the server tick to None } } - trace!(?current_interpolate_tick, + trace!( + component = ?component.name(), + ?current_interpolate_tick, last_received_server_tick = ?client.latest_received_server_tick(), start_tick = ?start.as_ref().map(|(tick, _)| tick), end_tick = ?end.as_ref().map(|(tick, _) | tick), @@ -145,19 +163,29 @@ pub(crate) fn update_interpolate_status( } } -pub(crate) fn interpolate( +pub(crate) fn interpolate>( mut query: Query<(&mut C, &InterpolateStatus)>, ) { for (mut component, status) in query.iter_mut() { - if let (Some((start_tick, start_value)), Some((end_tick, end_value))) = - (&status.start, &status.end) - { - // info!(?start_tick, ?end_tick, "doing interpolation!"); - if start_tick != end_tick { - let t = (status.current - *start_tick) as f32 / (*end_tick - *start_tick) as f32; - *component = C::lerp(start_value.clone(), end_value.clone(), t); - } else { + // NOTE: it is possible that we reach start_tick when end_tick is not set + if let Some((start_tick, start_value)) = &status.start { + if status.current == *start_tick { *component = start_value.clone(); + continue; + } + if let Some((end_tick, end_value)) = &status.end { + // info!(?start_tick, ?end_tick, "doing interpolation!"); + if status.current == *end_tick { + *component = end_value.clone(); + continue; + } + if start_tick != end_tick { + let t = + (status.current - *start_tick) as f32 / (*end_tick - *start_tick) as f32; + *component = C::lerp(start_value.clone(), end_value.clone(), t); + } else { + *component = start_value.clone(); + } } } } diff --git a/lightyear/src/client/interpolation/interpolation_history.rs b/lightyear/src/client/interpolation/interpolation_history.rs index 4ff67ff69..d955a7e5f 100644 --- a/lightyear/src/client/interpolation/interpolation_history.rs +++ b/lightyear/src/client/interpolation/interpolation_history.rs @@ -65,7 +65,6 @@ impl ConfirmedHistory { /// Get the value of the component at the specified tick. /// Clears the history buffer of all ticks older or equal than the specified tick. /// NOTE: doesn't pop the last value! - /// Returns None /// CAREFUL: /// the component history will only contain the ticks where the component got updated, and otherwise /// contains gaps. Therefore, we need to always leave a value in the history buffer so that we can @@ -95,7 +94,7 @@ pub(crate) fn add_component_history( ComponentSyncMode::Full => { debug!("spawn interpolation history"); interpolated_entity_mut.insert(( - confirmed_component.deref().clone(), + // confirmed_component.deref().clone(), history, InterpolateStatus:: { start: None, @@ -106,7 +105,7 @@ pub(crate) fn add_component_history( } _ => { debug!("copy interpolation component"); - interpolated_entity_mut.insert(confirmed_component.deref().clone()); + // interpolated_entity_mut.insert(confirmed_component.deref().clone()); } } } @@ -129,7 +128,7 @@ pub(crate) fn apply_confirmed_update( for (confirmed_entity, confirmed, confirmed_component) in confirmed_entities.iter() { if let Some(p) = confirmed.interpolated { if confirmed_component.is_changed() { - if let Ok((mut interpolated_component, history_option)) = + if let Ok((interpolated_component, history_option)) = interpolated_entities.get_mut(p) { match T::mode() { @@ -151,17 +150,19 @@ pub(crate) fn apply_confirmed_update( ); continue; }; - trace!(tick = ?channel.latest_tick, "adding confirmed update to history"); + info!(component = ?confirmed_component.name(), tick = ?channel.latest_tick, "adding confirmed update to history"); // assign the history at the value that the entity currently is + // TODO: think about mapping entities! history .buffer .add_item(channel.latest_tick, confirmed_component.deref().clone()); } // for sync-components, we just match the confirmed component ComponentSyncMode::Simple => { - *interpolated_component = confirmed_component.deref().clone(); + // TODO: think about mapping entities! + // *interpolated_component = confirmed_component.deref().clone(); } - ComponentSyncMode::Once => {} + _ => {} } } } diff --git a/lightyear/src/client/interpolation/mod.rs b/lightyear/src/client/interpolation/mod.rs index 2fba606c8..f43c9cfbc 100644 --- a/lightyear/src/client/interpolation/mod.rs +++ b/lightyear/src/client/interpolation/mod.rs @@ -6,9 +6,9 @@ use tracing::info; pub use interpolate::InterpolateStatus; pub use interpolation_history::ConfirmedHistory; -pub use plugin::{add_interpolation_systems, add_lerp_systems}; +pub use plugin::{add_interpolation_systems, add_prepare_interpolation_systems}; -use crate::client::components::{Confirmed, SyncComponent}; +use crate::client::components::{ComponentSyncMode, Confirmed, SyncComponent}; use crate::client::interpolation::despawn::InterpolationMapping; use crate::shared::replication::components::ShouldBeInterpolated; @@ -35,29 +35,62 @@ pub mod plugin; // TODO: maybe merge this with PredictedComponent? // basically it is a HistoryComponent. And we can have modes for Prediction or Interpolation -// TODO: only require the Add/Mul bounds if we're using the linear lerp function -pub trait InterpolatedComponent: - SyncComponent + Mul + Add + Sized +pub trait InterpFn { + fn lerp(start: C, other: C, t: f32) -> C; +} + +pub struct LinearInterpolation; +impl InterpFn for LinearInterpolation +where + C: Mul + Add, { - /// Which interpolation function to use - /// By default, it will be a linear interpolation - fn lerp_mode() -> LerpMode; - - fn lerp(start: Self, other: Self, t: f32) -> Self { - match Self::lerp_mode() { - LerpMode::Linear => start * (1.0 - t) + other * t, - LerpMode::Custom(lerp) => lerp(start, other, t), - } + fn lerp(start: C, other: C, t: f32) -> C { + start * (1.0 - t) + other * t + } +} + +/// Use this if you don't want to use an interpolation function for this component. +/// (For example if you are running your own interpolation logic) +pub struct NoInterpolation; +impl InterpFn for NoInterpolation { + fn lerp(start: C, _other: C, _t: f32) -> C { + start } } -#[derive(Debug)] -pub enum LerpMode { - Linear, - // TODO: change this to a trait object? - Custom(fn(C, C, f32) -> C), +pub trait InterpolatedComponent: SyncComponent { + type Fn: InterpFn; + + fn lerp(start: C, other: C, t: f32) -> C { + Self::Fn::lerp(start, other, t) + } } +// pub trait InterpolatedComponent: SyncComponent + Sized { +// type Fn: InterpFn; +// /// Which interpolation function to use +// /// By default, it will be a linear interpolation +// fn lerp_mode() -> LerpMode; +// +// fn lerp_linear(start: Self, other: Self, t: f32) -> Self +// where +// Self: Mul + Add, +// { +// start * (1.0 - t) + other * t +// } +// +// fn lerp_custom(start: Self, other: Self, t: f32, lerp: fn(Self, Self, f32) -> Self) -> Self { +// lerp(start, other, t) +// } +// } + +// #[derive(Debug)] +// pub enum LerpMode { +// Linear, +// // TODO: change this to a trait object? +// Custom(fn(C, C, f32) -> C), +// } + /// Marks an entity that is being interpolated by the client #[derive(Component, Debug)] pub struct Interpolated { diff --git a/lightyear/src/client/interpolation/plugin.rs b/lightyear/src/client/interpolation/plugin.rs index dc3da63da..63e931d74 100644 --- a/lightyear/src/client/interpolation/plugin.rs +++ b/lightyear/src/client/interpolation/plugin.rs @@ -2,7 +2,8 @@ use std::marker::PhantomData; use std::time::Duration; use bevy::prelude::{ - apply_deferred, App, IntoSystemConfigs, IntoSystemSetConfigs, Plugin, PreUpdate, SystemSet, + apply_deferred, App, IntoSystemConfigs, IntoSystemSetConfigs, Plugin, PostUpdate, PreUpdate, + SystemSet, }; use crate::client::components::SyncComponent; @@ -64,7 +65,10 @@ impl InterpolationDelay { /// This should be #[derive(Clone)] pub struct InterpolationConfig { - pub(crate) delay: InterpolationDelay, + pub delay: InterpolationDelay, + /// If true, disable the interpolation logic (but still keep the internal component history buffers) + /// The user will have to manually implement + pub custom_interpolation_logic: bool, // How long are we keeping the history of the confirmed entities so we can interpolate between them? // pub(crate) interpolation_buffer_size: Duration, } @@ -74,7 +78,7 @@ impl Default for InterpolationConfig { fn default() -> Self { Self { delay: InterpolationDelay::default(), - // TODO: change + custom_interpolation_logic: false, // interpolation_buffer_size: Duration::from_millis(100), } } @@ -126,7 +130,10 @@ pub enum InterpolationSet { /// Set to handle interpolated/confirmed entities/components getting despawned Despawn, DespawnFlush, - /// Update component history, interpolation status, and interpolate between last 2 server states + /// Update component history, interpolation status + PrepareInterpolation, + /// Interpolate between last 2 server states. Has to be overriden if + /// `InterpolationConfig.custom_interpolation_logic` is set to true Interpolate, } @@ -138,10 +145,10 @@ pub enum InterpolationSet { // up to the client tick before we just updated the time. Maybe that's not a problem.. but we do need to keep track of the ticks correctly // the tick we rollback to would not be the current client tick ? -pub fn add_interpolation_systems(app: &mut App) { +pub fn add_prepare_interpolation_systems(app: &mut App) { // TODO: maybe create an overarching prediction set that contains all others? app.add_systems( - PreUpdate, + PostUpdate, ( (add_component_history::).in_set(InterpolationSet::SpawnHistory), (removed_components::).in_set(InterpolationSet::Despawn), @@ -150,31 +157,32 @@ pub fn add_interpolation_systems(app: &mut App) { update_interpolate_status::, ) .chain() - .in_set(InterpolationSet::Interpolate), + .in_set(InterpolationSet::PrepareInterpolation), ), ); } // We add the interpolate system in different function because we don't want the non -// ComponentSyncMode::Full components to need the InterpolatedComponent bounds (in particular Add/Mul) -pub fn add_lerp_systems(app: &mut App) { +// ComponentSyncMode::Full components to need the InterpolatedComponent bounds +pub fn add_interpolation_systems, P: Protocol>(app: &mut App) { app.add_systems( - PreUpdate, - (interpolate:: - .after(update_interpolate_status::) - .in_set(InterpolationSet::Interpolate),), + PostUpdate, + interpolate::.in_set(InterpolationSet::Interpolate), ); } impl Plugin for InterpolationPlugin

{ fn build(&self, app: &mut App) { - P::Components::add_interpolation_systems(app); + P::Components::add_prepare_interpolation_systems(app); + if !self.config.custom_interpolation_logic { + P::Components::add_interpolation_systems(app); + } // RESOURCES app.init_resource::(); // SETS app.configure_sets( - PreUpdate, + PostUpdate, ( MainSet::Receive, InterpolationSet::SpawnInterpolation, @@ -185,13 +193,14 @@ impl Plugin for InterpolationPlugin

{ InterpolationSet::DespawnFlush, // TODO: maybe run in a schedule in-between FixedUpdate and Update? // or maybe run during PostUpdate? + InterpolationSet::PrepareInterpolation, InterpolationSet::Interpolate, ) .chain(), ); // SYSTEMS app.add_systems( - PreUpdate, + PostUpdate, ( // TODO: we want to run these flushes only if something actually happened in the previous set! // because running the flush-system is expensive (needs exclusive world access) @@ -202,9 +211,9 @@ impl Plugin for InterpolationPlugin

{ ), ); app.add_systems( - PreUpdate, + PostUpdate, ( - spawn_interpolated_entity.in_set(InterpolationSet::SpawnInterpolation), + // spawn_interpolated_entity.in_set(InterpolationSet::SpawnInterpolation), despawn_interpolated.in_set(InterpolationSet::Despawn), ), ); diff --git a/lightyear/src/client/prediction/plugin.rs b/lightyear/src/client/prediction/plugin.rs index 754d86431..6d1e21e44 100644 --- a/lightyear/src/client/prediction/plugin.rs +++ b/lightyear/src/client/prediction/plugin.rs @@ -183,10 +183,11 @@ impl Plugin for PredictionPlugin

{ (apply_deferred.in_set(PredictionSet::EntityDespawnFlush),), ); - app.add_systems( - PreUpdate, - spawn_predicted_entity.in_set(PredictionSet::SpawnPrediction), - ); + // no need, since we spawn predicted entities/components in replication + // app.add_systems( + // PreUpdate, + // spawn_predicted_entity.in_set(PredictionSet::SpawnPrediction), + // ); // 2. (in prediction_systems) add ComponentHistory and a apply_deferred after // 3. (in prediction_systems) Check if we should do rollback, clear histories and snap prediction's history to server-state // 4. Potentially do rollback diff --git a/lightyear/src/client/prediction/predicted_history.rs b/lightyear/src/client/prediction/predicted_history.rs index 5d2937d8d..aef55b116 100644 --- a/lightyear/src/client/prediction/predicted_history.rs +++ b/lightyear/src/client/prediction/predicted_history.rs @@ -109,6 +109,8 @@ impl PredictionHistory { // Copy component insert/remove from confirmed to predicted // Currently we will just copy every PredictedComponent +// TODO: how to handle components that are synced between confirmed/predicted but are not replicated? +// I guess they still need to be added here. // TODO: add more options: // - copy component and add component history (for rollback) // - copy component to history and don't add component @@ -140,7 +142,12 @@ pub fn add_component_history( } } - // component got added on confirmed side: + // TODO: need to do entity mapping here as well! + // TODO: need to run this only for components that were not replicated; and run this in topological order. + + // component got added on confirmed side + // - via replication -> the component will also be added to predicted, so it's handled above as well.. + // - without replication -> need to handle here // - full: sync component and add history // - simple/once: sync component if let Some(confirmed_component) = confirmed_component { @@ -156,12 +163,18 @@ pub fn add_component_history( client.tick(), ComponentState::Updated(confirmed_component.deref().clone()), ); - predicted_entity_mut - .insert((confirmed_component.deref().clone(), history)); + predicted_entity_mut.insert(history); + // TODO: we do not insert the component here because we need to map entities! + // also we already added the component during replication + // but we might to handle components that are not replicated... + // for components that are not replicated, no need to apply any mapping! + // so maybe just check if the component existed already? + // predicted_entity_mut + // .insert((confirmed_component.deref().clone(), history)); } _ => { // we only sync the components once, but we don't do rollback so no need for a component history - predicted_entity_mut.insert(confirmed_component.deref().clone()); + // predicted_entity_mut.insert(confirmed_component.deref().clone()); } } } @@ -241,6 +254,9 @@ pub fn update_prediction_history( } } +// TODO: again, +// for replicated components, we could apply the update at replication time directly (for topological sort) +// for non-replicated components, it should be applied here, but we need to query with topological sort. /// When we receive a server update, we might want to apply it to the predicted entity #[allow(clippy::type_complexity)] pub(crate) fn apply_confirmed_update( @@ -271,7 +287,7 @@ pub(crate) fn apply_confirmed_update( ComponentSyncMode::Simple => { *predicted_component = confirmed_component.deref().clone(); } - ComponentSyncMode::Once => {} + _ => {} } } } diff --git a/lightyear/src/client/prediction/rollback.rs b/lightyear/src/client/prediction/rollback.rs index 0c81176f8..e0d97bfaa 100644 --- a/lightyear/src/client/prediction/rollback.rs +++ b/lightyear/src/client/prediction/rollback.rs @@ -159,8 +159,8 @@ pub(crate) fn client_rollback_check( }; if should_rollback { info!( - "Rollback check: mismatch for component between predicted and confirmed {:?}", - confirmed_entity + "Rollback check: mismatch for component between predicted and confirmed {:?} on tick {:?}", + confirmed_entity, tick, ); // we need to clear the history so we can write a new one @@ -242,7 +242,7 @@ pub(crate) fn run_rollback(world: &mut World) { // TODO: might not need to check the state, because we only run this system if we are in rollback if let RollbackState::ShouldRollback { current_tick } = rollback.state { let num_rollback_ticks = client.tick() - current_tick; - debug!( + info!( "Rollback between {:?} and {:?}", current_tick, client.tick() diff --git a/lightyear/src/client/resource.rs b/lightyear/src/client/resource.rs index 97bc60831..ccd95e1ea 100644 --- a/lightyear/src/client/resource.rs +++ b/lightyear/src/client/resource.rs @@ -34,14 +34,14 @@ pub struct Client { // netcode netcode: crate::netcode::Client, // connection - connection: Connection

, + pub(crate) connection: Connection

, // protocol protocol: P, // events events: ConnectionEvents

, // syncing pub(crate) time_manager: TimeManager, - tick_manager: TickManager, + pub(crate) tick_manager: TickManager, } #[allow(clippy::large_enum_variant)] @@ -208,7 +208,7 @@ impl Client

{ /// Update the sync manager. /// We run this at PostUpdate because: /// - client prediction time is computed from ticks, which haven't been updated yet at PreUpdate - /// - server prediction time is computed from time, which has been update via delta + /// - server prediction time is computed from time, which has been updated via delta /// Also server sends the tick after FixedUpdate, so it makes sense that we would compare to the client tick after FixedUpdate /// So instead we update the sync manager at PostUpdate, after both ticks/time have been updated pub(crate) fn sync_update(&mut self) { diff --git a/lightyear/src/client/sync.rs b/lightyear/src/client/sync.rs index b50c26e8c..8d1c5ebf0 100644 --- a/lightyear/src/client/sync.rs +++ b/lightyear/src/client/sync.rs @@ -46,7 +46,7 @@ pub struct SyncConfig { pub speedup_factor: f32, // Integration - current_server_time_smoothing: f32, + server_time_estimate_smoothing: f32, } impl Default for SyncConfig { @@ -57,8 +57,8 @@ impl Default for SyncConfig { handshake_pings: 7, stats_buffer_duration: Duration::from_secs(2), error_margin: 1.0, - speedup_factor: 1.03, - current_server_time_smoothing: 0.1, + speedup_factor: 1.1, + server_time_estimate_smoothing: 0.7, } } } @@ -91,7 +91,7 @@ pub struct SyncManager { pub(crate) synced: bool, // time - current_server_time: WrappedTime, + server_time_estimate: WrappedTime, pub(crate) interpolation_time: WrappedTime, interpolation_speed_ratio: f32, @@ -113,7 +113,7 @@ impl SyncManager { config: config.clone(), synced: false, // time - current_server_time: WrappedTime::default(), + server_time_estimate: WrappedTime::default(), interpolation_time: WrappedTime::default(), interpolation_speed_ratio: 1.0, // server tick @@ -136,7 +136,7 @@ impl SyncManager { server_send_interval: Duration, ) { self.duration_since_latest_received_server_tick += time_manager.delta(); - self.current_server_time += time_manager.delta(); + self.server_time_estimate += time_manager.delta(); self.interpolation_time += time_manager.delta().mul_f32(self.interpolation_speed_ratio); // check if we are ready to finalize the handshake @@ -148,7 +148,11 @@ impl SyncManager { interpolation_delay, server_send_interval, tick_manager, - ) + ); + info!( + "interpolation_tick: {:?}", + self.interpolation_tick(tick_manager) + ); } if self.synced { @@ -185,40 +189,38 @@ impl SyncManager { } /// current server time from server's point of view (using server tick) - pub(crate) fn current_server_time(&self) -> WrappedTime { - // TODO: instead of just using the latest_received_server_tick, there should be some sort - // of integration/smoothing - self.current_server_time + pub(crate) fn server_time_estimate(&self) -> WrappedTime { + self.server_time_estimate } /// Everytime we receive a new server update: /// Update the estimated current server time, computed from the time elapsed since the /// latest received server tick, and our estimate of the RTT - pub(crate) fn update_current_server_time(&mut self, tick_duration: Duration, rtt: Duration) { - let new_current_server_time_estimate = WrappedTime::from_duration( + pub(crate) fn update_server_time_estimate(&mut self, tick_duration: Duration, rtt: Duration) { + let new_server_time_estimate = WrappedTime::from_duration( self.latest_received_server_tick.0 as u32 * tick_duration - + self.duration_since_latest_received_server_tick - + rtt / 2, + + self.duration_since_latest_received_server_tick, + // TODO: the server_time_estimate uses rtt / 2, but we remove it for now so we can + // use this estimate for both prediction and interpolation + // + rtt / 2, ); - // instead of just using the latest_received_server_tick, there should be some sort - // of integration/smoothing + // instead of just using the latest_received_server_tick, we apply some smoothing // (in case the latest server tick is wildly off-base) - if self.current_server_time == WrappedTime::default() { - self.current_server_time = new_current_server_time_estimate; + if self.server_time_estimate == WrappedTime::default() { + self.server_time_estimate = new_server_time_estimate; } else { - self.current_server_time = self.current_server_time - * self.config.current_server_time_smoothing - + new_current_server_time_estimate - * (1.0 - self.config.current_server_time_smoothing); + self.server_time_estimate = self.server_time_estimate + * self.config.server_time_estimate_smoothing + + new_server_time_estimate * (1.0 - self.config.server_time_estimate_smoothing); } } /// time at which the server would receive a packet we send now fn predicted_server_receive_time(&self, rtt: Duration) -> WrappedTime { - self.current_server_time() + rtt / 2 + self.server_time_estimate() + rtt } - /// how far ahead of the server should I be? + /// how far ahead of the server should I be? (for prediction) fn client_ahead_minimum(&self, tick_duration: Duration, jitter: Duration) -> Duration { self.config.jitter_multiple_margin as u32 * jitter + self.config.tick_margin as u32 * tick_duration @@ -236,20 +238,19 @@ impl SyncManager { server_send_interval: Duration, tick_manager: &TickManager, ) -> WrappedTime { - // We want the interpolation time to be just a little bit behind the latest server time - // We add `duration_since_latest_received_server_tick` because we receive them intermittently - // TODO: maybe integrate because of jitter? - let objective_time = WrappedTime::from_duration( - self.latest_received_server_tick.0 as u32 * tick_manager.config.tick_duration - + self.duration_since_latest_received_server_tick, - ); + // // TODO: maybe integrate because of jitter? + // let objective_time = WrappedTime::from_duration( + // self.latest_received_server_tick.0 as u32 * tick_manager.config.tick_duration + // + self.duration_since_latest_received_server_tick, + // ); + // let objective_time = self.server_time_estimate(); // how much we want interpolation time to be behind the latest received server tick? // TODO: use a specified config margin + add std of time_between_server_updates? let objective_delta = chrono::Duration::from_std(interpolation_delay.to_duration(server_send_interval)) .unwrap(); // info!("objective_delta: {:?}", objective_delta); - objective_time - objective_delta + self.server_time_estimate() - objective_delta } pub(crate) fn interpolation_tick(&self, tick_manager: &TickManager) -> Tick { @@ -274,12 +275,19 @@ impl SyncManager { let objective_time = self.interpolation_objective(interpolation_delay, server_update_rate, tick_manager); let delta = objective_time - self.interpolation_time; + trace!( + ?objective_time, + interpolation_time = ?self.interpolation_time, + interpolation_tick = ?self.interpolation_tick(tick_manager), + "interpolation data"); let error_margin = chrono::Duration::milliseconds(10); if delta > error_margin { // interpolation time is too far behind, speed-up! self.interpolation_speed_ratio = 1.0 * self.config.speedup_factor; + trace!("interpolation is too far behind, speed up!"); } else if delta < -error_margin { + trace!("interpolation is too far ahead, slow down!"); self.interpolation_speed_ratio = 1.0 / self.config.speedup_factor; } else { self.interpolation_speed_ratio = 1.0; @@ -368,8 +376,8 @@ impl SyncManager { let tick_duration = tick_manager.config.tick_duration; let rtt = ping_manager.rtt(); let jitter = ping_manager.jitter(); - // recompute the current server time (using the rtt we just computed) - self.update_current_server_time(tick_duration, rtt); + // recompute the server time estimate (using the rtt we just computed) + self.update_server_time_estimate(tick_duration, rtt); // Compute how many ticks the client must be compared to server let client_ideal_time = self.predicted_server_receive_time(rtt) diff --git a/lightyear/src/connection/message.rs b/lightyear/src/connection/message.rs index a4466dc0a..b865779d2 100644 --- a/lightyear/src/connection/message.rs +++ b/lightyear/src/connection/message.rs @@ -3,13 +3,15 @@ Provides a [`ProtocolMessage`] enum that is a wrapper around all the possible me */ use crate::_reexport::{InputMessage, ShouldBeInterpolated, ShouldBePredicted, TickManager}; use crate::connection::events::ConnectionEvents; -use crate::prelude::{EntityMap, MapEntities, Tick}; +use crate::prelude::{EntityMapper, MapEntities, RemoteEntityMap, Tick}; use crate::protocol::channel::ChannelKind; use crate::protocol::Protocol; use crate::shared::ping::message::SyncMessage; use crate::shared::replication::{ReplicationMessage, ReplicationMessageData}; use crate::shared::time_manager::TimeManager; use crate::utils::named::Named; +use bevy::prelude::Entity; +use bevy::utils::EntityHashSet; use serde::{Deserialize, Serialize}; use tracing::{info, info_span, trace, trace_span}; @@ -22,8 +24,7 @@ use tracing::{info, info_span, trace, trace_span}; // ShouldBeInterpolated(ShouldBeInterpolated), // } -#[cfg_attr(feature = "debug", derive(Debug))] -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] pub enum ProtocolMessage { Message(P::Message), Replication(ReplicationMessage), @@ -32,22 +33,6 @@ pub enum ProtocolMessage { Sync(SyncMessage), } -impl MapEntities for ProtocolMessage

{ - fn map_entities(&mut self, entity_map: &EntityMap) { - match self { - ProtocolMessage::Message(x) => { - x.map_entities(entity_map); - } - ProtocolMessage::Replication(x) => { - x.map_entities(entity_map); - } - ProtocolMessage::Sync(x) => { - x.map_entities(entity_map); - } - } - } -} - impl ProtocolMessage

{ pub(crate) fn emit_send_logs(&self, channel_name: &str) { match self { diff --git a/lightyear/src/connection/mod.rs b/lightyear/src/connection/mod.rs index 6f81b9453..3b4924a14 100644 --- a/lightyear/src/connection/mod.rs +++ b/lightyear/src/connection/mod.rs @@ -134,13 +134,21 @@ impl Connection

{ if !messages.is_empty() { trace!(?channel_name, "Received messages"); - for (tick, mut message) in messages.into_iter() { + for (tick, message) in messages.into_iter() { + // TODO: we shouldn't map the entities here! + // - we should: order the entities in a group by topological sort (use MapEntities to check dependencies between entities). + // - apply map_entities when we're in the stage of applying to the world. + // - because then we read the first entity in the group; spawn it, and the next component that refers to that entity can be mapped successfully! // map entities from remote to local - message.map_entities(&self.replication_manager.entity_map); + // message.map_entities(&self.replication_manager.entity_map); // other message-handling logic match message { - ProtocolMessage::Message(message) => { + ProtocolMessage::Message(mut message) => { + // map any entities inside the message + message.map_entities(Box::new( + &self.replication_manager.remote_entity_map, + )); // buffer the message self.events.push_message(channel_kind, message); } diff --git a/lightyear/src/lib.rs b/lightyear/src/lib.rs index f40de4c42..16448bf72 100644 --- a/lightyear/src/lib.rs +++ b/lightyear/src/lib.rs @@ -24,6 +24,14 @@ pub mod _reexport { pub use crate::channel::builder::{ EntityActionsChannel, EntityUpdatesChannel, InputChannel, PingChannel, }; + pub use crate::client::interpolation::{ + add_interpolation_systems, add_prepare_interpolation_systems, InterpolatedComponent, + }; + pub use crate::client::interpolation::{LinearInterpolation, NoInterpolation}; + pub use crate::client::prediction::add_prediction_systems; + pub use crate::connection::events::{ + IterComponentInsertEvent, IterComponentRemoveEvent, IterComponentUpdateEvent, + }; pub use crate::inputs::input_buffer::InputMessage; pub use crate::protocol::component::{ ComponentBehaviour, ComponentKindBehaviour, ComponentProtocol, ComponentProtocolKind, @@ -35,8 +43,15 @@ pub mod _reexport { pub use crate::serialize::wordbuffer::reader::ReadWordBuffer; pub use crate::serialize::wordbuffer::writer::WriteWordBuffer; pub use crate::serialize::writer::WriteBuffer; + pub use crate::shared::events::{ + ComponentInsertEvent, ComponentRemoveEvent, ComponentUpdateEvent, + }; pub use crate::shared::replication::components::{ShouldBeInterpolated, ShouldBePredicted}; + pub use crate::shared::replication::systems::add_per_component_replication_send_systems; pub use crate::shared::replication::ReplicationSend; + pub use crate::shared::systems::events::{ + push_component_insert_events, push_component_remove_events, push_component_update_events, + }; pub use crate::shared::tick_manager::TickManager; pub use crate::shared::time_manager::{TimeManager, WrappedTime}; pub use crate::utils::ready_buffer::ReadyBuffer; @@ -62,8 +77,10 @@ pub mod prelude { pub use crate::shared::log::LogConfig; pub use crate::shared::ping::manager::PingConfig; pub use crate::shared::plugin::SharedPlugin; - pub use crate::shared::replication::components::{NetworkTarget, Replicate, ReplicationMode}; - pub use crate::shared::replication::entity_map::{EntityMap, MapEntities}; + pub use crate::shared::replication::components::{ + NetworkTarget, Replicate, ReplicationGroup, ReplicationMode, + }; + pub use crate::shared::replication::entity_map::{EntityMapper, MapEntities, RemoteEntityMap}; pub use crate::shared::sets::{FixedUpdateSet, MainSet, ReplicationSet}; pub use crate::shared::tick_manager::{Tick, TickConfig}; pub use crate::transport::conditioner::LinkConditionerConfig; @@ -81,7 +98,9 @@ pub mod prelude { pub use crate::client::input::{InputConfig, InputSystemSet}; pub use crate::client::interpolation::interpolation_history::ConfirmedHistory; pub use crate::client::interpolation::plugin::{InterpolationConfig, InterpolationDelay}; - pub use crate::client::interpolation::{InterpolateStatus, Interpolated, LerpMode}; + pub use crate::client::interpolation::{ + InterpFn, InterpolateStatus, Interpolated, InterpolatedComponent, + }; pub use crate::client::plugin::{ClientPlugin, PluginConfig}; pub use crate::client::prediction::plugin::PredictionConfig; pub use crate::client::prediction::predicted_history::{ComponentState, PredictionHistory}; diff --git a/lightyear/src/packet/message.rs b/lightyear/src/packet/message.rs index b9d2ae730..bf888eb1e 100644 --- a/lightyear/src/packet/message.rs +++ b/lightyear/src/packet/message.rs @@ -345,7 +345,7 @@ impl MessageContainer { } // TODO: for now messages must be able to be used as events, since we output them in our message events -pub trait Message: EventContext + Named + MapEntities {} +pub trait Message: EventContext + Named + for<'a> MapEntities<'a> {} #[cfg(test)] mod tests { diff --git a/lightyear/src/protocol/component.rs b/lightyear/src/protocol/component.rs index 59955d454..f48f480ff 100644 --- a/lightyear/src/protocol/component.rs +++ b/lightyear/src/protocol/component.rs @@ -1,6 +1,7 @@ use std::fmt::Debug; use std::hash::Hash; +use crate::client::components::{ComponentSyncMode, SyncComponent}; use bevy::prelude::{App, Component, EntityWorldMut, World}; use serde::de::DeserializeOwned; use serde::Serialize; @@ -23,7 +24,7 @@ pub trait ComponentProtocol: BitSerializable + Serialize + DeserializeOwned - + MapEntities + + for<'a> MapEntities<'a> + ComponentBehaviour + Debug + Send @@ -55,7 +56,15 @@ pub trait ComponentProtocol: ); fn add_prediction_systems(app: &mut App); + + /// Add all component systems for the PrepareInterpolation SystemSet + fn add_prepare_interpolation_systems(app: &mut App); + + /// Add all component systems for the Interpolation SystemSet fn add_interpolation_systems(app: &mut App); + + /// Get the sync mode for the component + fn mode(&self) -> ComponentSyncMode; } // TODO: enum_delegate doesn't work with generics + cannot be used multiple times since it derives a bunch of Into/From traits @@ -99,7 +108,7 @@ pub trait ComponentProtocolKind: BitSerializable + Serialize + DeserializeOwned - + MapEntities + + for<'a> MapEntities<'a> + PartialEq + Eq + Hash diff --git a/lightyear/src/protocol/message.rs b/lightyear/src/protocol/message.rs index 81c9179da..9764c7d83 100644 --- a/lightyear/src/protocol/message.rs +++ b/lightyear/src/protocol/message.rs @@ -1,4 +1,5 @@ use std::any::TypeId; +use std::fmt::Debug; use bevy::prelude::{App, World}; use serde::de::DeserializeOwned; @@ -21,9 +22,10 @@ pub trait MessageProtocol: + Serialize + DeserializeOwned + Clone - + MapEntities + + for<'a> MapEntities<'a> + MessageBehaviour + Named + + Debug + Send + Sync + From::Protocol as Protocol>::Input>> diff --git a/lightyear/src/server/connection.rs b/lightyear/src/server/connection.rs index 8eeb4d594..6bc954d22 100644 --- a/lightyear/src/server/connection.rs +++ b/lightyear/src/server/connection.rs @@ -24,7 +24,11 @@ use crate::shared::time_manager::TimeManager; /// (handling client inputs) pub struct Connection { pub(crate) base: crate::connection::Connection

, + /// Stores the inputs that we have received from the client. pub(crate) input_buffer: InputBuffer, + /// Stores the last input we have received from the client. + /// In case we are missing the client input for a tick, we will fallback to using this. + pub(crate) last_input: Option, } impl Connection

{ @@ -32,6 +36,7 @@ impl Connection

{ Self { base: crate::connection::Connection::new(channel_registry, ping_config), input_buffer: InputBuffer::default(), + last_input: None, } } diff --git a/lightyear/src/server/resource.rs b/lightyear/src/server/resource.rs index 84e38db33..418c6019d 100644 --- a/lightyear/src/server/resource.rs +++ b/lightyear/src/server/resource.rs @@ -9,9 +9,10 @@ use bevy::ecs::component::Tick as BevyTick; use bevy::prelude::{Entity, Resource, World}; use bevy::utils::HashSet; use crossbeam_channel::Sender; -use tracing::{debug, debug_span, info, trace, trace_span}; +use tracing::{debug, debug_span, info, trace, trace_span, warn}; use crate::channel::builder::Channel; +use crate::inputs::input_buffer::InputBuffer; use crate::netcode::{generate_key, ClientId, ConnectToken}; use crate::packet::message::Message; use crate::protocol::channel::ChannelKind; @@ -131,14 +132,51 @@ impl Server

{ // INPUTS + // TODO: exposed only for debugging + pub fn get_input_buffer(&self, client_id: ClientId) -> Option<&InputBuffer> { + self.user_connections + .get(&client_id) + .map(|connection| &connection.input_buffer) + } + /// Get the inputs for all clients for the given tick pub fn pop_inputs(&mut self) -> impl Iterator, ClientId)> + '_ { self.user_connections .iter_mut() .map(|(client_id, connection)| { - let input = connection + let received_input = connection .input_buffer .pop(self.tick_manager.current_tick()); + let fallback = received_input.is_none(); + + // NOTE: if there is no input for this tick, we should use the last input that we have + // as a best-effort fallback. + let input = match received_input { + None => connection.last_input.clone(), + Some(i) => { + connection.last_input = Some(i.clone()); + Some(i) + } + }; + // let input = received_input.map_or_else( + // || connection.last_input.clone(), + // |i| { + // connection.last_input = Some(i.clone()); + // Some(i) + // }, + // ); + if fallback { + // TODO: do not log this while clients are syncing.. + debug!( + ?client_id, + tick = ?self.tick_manager.current_tick(), + fallback_input = ?&input, + "Missed client input!" + ) + } + // TODO: We should also let the user know that it needs to send inputs a bit earlier so that + // we have more of a buffer. Send a SyncMessage to tell the user to speed up? + // See Overwatch GDC video (input, *client_id) }) } diff --git a/lightyear/src/server/room.rs b/lightyear/src/server/room.rs index 6f92b0a53..b4958efd0 100644 --- a/lightyear/src/server/room.rs +++ b/lightyear/src/server/room.rs @@ -595,7 +595,7 @@ mod tests { .connection() .base() .replication_manager - .entity_map + .remote_entity_map .get_local(server_entity) .unwrap(); @@ -752,7 +752,7 @@ mod tests { .connection() .base() .replication_manager - .entity_map + .remote_entity_map .get_local(server_entity) .unwrap(); diff --git a/lightyear/src/shared/ping/manager.rs b/lightyear/src/shared/ping/manager.rs index bb4db2a99..aa3743bc8 100644 --- a/lightyear/src/shared/ping/manager.rs +++ b/lightyear/src/shared/ping/manager.rs @@ -26,7 +26,7 @@ pub struct PingConfig { impl Default for PingConfig { fn default() -> Self { PingConfig { - ping_interval: Duration::from_millis(100), + ping_interval: Duration::from_millis(40), stats_buffer_duration: Duration::from_secs(4), } } @@ -214,6 +214,7 @@ impl PingManager { // round-trip-delay let rtt = received_time - ping_sent_time; let server_process_time = pong.pong_sent_time - pong.ping_received_time; + trace!(?rtt, ?received_time, ?ping_sent_time, ?server_process_time, ?pong.pong_sent_time, ?pong.ping_received_time, "process pong"); let round_trip_delay = (rtt - server_process_time).to_std().unwrap(); // update stats buffer @@ -269,7 +270,10 @@ mod tests { #[test] fn test_send_pings() { - let config = PingConfig::default(); + let config = PingConfig { + ping_interval: Duration::from_millis(100), + stats_buffer_duration: Duration::from_secs(4), + }; let mut ping_manager = PingManager::new(&config); let mut time_manager = TimeManager::new(Duration::default()); diff --git a/lightyear/src/shared/ping/message.rs b/lightyear/src/shared/ping/message.rs index 09a63e975..296c1a8d5 100644 --- a/lightyear/src/shared/ping/message.rs +++ b/lightyear/src/shared/ping/message.rs @@ -1,5 +1,7 @@ //! Defines the actual ping/pong messages use crate::prelude::MapEntities; +use bevy::prelude::Entity; +use bevy::utils::EntityHashSet; use serde::{Deserialize, Serialize}; use crate::shared::ping::store::PingId; @@ -28,7 +30,3 @@ pub enum SyncMessage { Ping(Ping), Pong(Pong), } - -impl MapEntities for SyncMessage { - fn map_entities(&mut self, _entity_map: &crate::prelude::EntityMap) {} -} diff --git a/lightyear/src/shared/ping/store.rs b/lightyear/src/shared/ping/store.rs index 295b7cae5..8ab042425 100644 --- a/lightyear/src/shared/ping/store.rs +++ b/lightyear/src/shared/ping/store.rs @@ -6,7 +6,7 @@ use crate::utils::wrapping_id::wrapping_id; wrapping_id!(PingId); -const PING_BUFFER_SIZE: usize = 32; +const PING_BUFFER_SIZE: usize = 128; /// Data structure to store the latest pings sent to remote pub struct PingStore { diff --git a/lightyear/src/shared/replication/components.rs b/lightyear/src/shared/replication/components.rs index 37692812d..6e28f5190 100644 --- a/lightyear/src/shared/replication/components.rs +++ b/lightyear/src/shared/replication/components.rs @@ -1,10 +1,12 @@ //! Components used for replication use crate::channel::builder::{Channel, EntityActionsChannel, EntityUpdatesChannel}; +use crate::client::components::{ComponentSyncMode, SyncComponent}; use crate::netcode::ClientId; +use crate::prelude::{EntityMapper, MapEntities}; use crate::protocol::channel::ChannelKind; use crate::server::room::{ClientVisibility, RoomId}; use bevy::prelude::{Component, Entity}; -use bevy::utils::{HashMap, HashSet}; +use bevy::utils::{EntityHashSet, HashMap, HashSet}; use lightyear_macros::MessageInternal; use serde::{Deserialize, Serialize}; @@ -28,6 +30,7 @@ pub struct Replicate { // or force users to use `Replicate::default().with...`? /// List of clients that we the entity is currently replicated to. /// Will be updated before the other replication systems + #[doc(hidden)] pub replication_clients_cache: HashMap, pub replication_mode: ReplicationMode, // TODO: currently, if the host removes Replicate, then the entity is not removed in the remote @@ -139,14 +142,48 @@ impl NetworkTarget { // that's pretty dangerous because it's now hard for the user to derive new traits. // let's think of another approach later. #[derive(Component, MessageInternal, Serialize, Deserialize, Clone, Debug, PartialEq)] +#[message(custom_map)] pub struct ShouldBeInterpolated; +impl SyncComponent for ShouldBeInterpolated { + fn mode() -> ComponentSyncMode { + ComponentSyncMode::None + } +} + +impl<'a> MapEntities<'a> for ShouldBeInterpolated { + fn map_entities(&mut self, entity_mapper: Box) {} + + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } +} + // TODO: Right now we use the approach that we add an extra component to the Protocol of components to be replicated. // that's pretty dangerous because it's now hard for the user to derive new traits. // let's think of another approach later. #[derive(Component, MessageInternal, Serialize, Deserialize, Clone, Debug, PartialEq)] +#[message(custom_map)] pub struct ShouldBePredicted; +// NOTE: need to define this here because otherwise we get the error +// "impl doesn't use only types from inside the current crate" +// TODO: does this mean that we cannot use existing types such as Transform? +// might need a list of pre-existing types? +impl SyncComponent for ShouldBePredicted { + fn mode() -> ComponentSyncMode { + ComponentSyncMode::None + } +} + +impl<'a> MapEntities<'a> for ShouldBePredicted { + fn map_entities(&mut self, entity_mapper: Box) {} + + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lightyear/src/shared/replication/entity_map.rs b/lightyear/src/shared/replication/entity_map.rs index ddc1b8e8d..1ba2befec 100644 --- a/lightyear/src/shared/replication/entity_map.rs +++ b/lightyear/src/shared/replication/entity_map.rs @@ -1,18 +1,52 @@ //! Map between local and remote entities use anyhow::Context; -use std::collections::hash_map::Entry; -use std::collections::HashMap; - use bevy::prelude::{Entity, EntityWorldMut, World}; +use bevy::utils::hashbrown::hash_map::Entry; +use bevy::utils::{EntityHashMap, EntityHashSet}; + +pub trait EntityMapper { + /// Map an entity + fn map(&self, entity: Entity) -> Option; +} + +impl EntityMapper for &T { + fn map(&self, entity: Entity) -> Option { + (*self).map(entity) + } +} #[derive(Default, Debug)] /// Map between local and remote entities. (used mostly on client because it's when we receive entity updates) -pub struct EntityMap { - remote_to_local: HashMap, - local_to_remote: HashMap, +pub struct RemoteEntityMap { + remote_to_local: EntityHashMap, + local_to_remote: EntityHashMap, +} + +#[derive(Default, Debug)] +pub struct PredictedEntityMap { + // map from the remote entity to the predicted entity + pub(crate) remote_to_predicted: EntityHashMap, +} + +impl EntityMapper for PredictedEntityMap { + fn map(&self, entity: Entity) -> Option { + self.remote_to_predicted.get(&entity).copied() + } } -impl EntityMap { +#[derive(Default, Debug)] +pub struct InterpolatedEntityMap { + // map from the remote entity to the interpolated entity + pub(crate) remote_to_interpolated: EntityHashMap, +} + +impl EntityMapper for InterpolatedEntityMap { + fn map(&self, entity: Entity) -> Option { + self.remote_to_interpolated.get(&entity).copied() + } +} + +impl RemoteEntityMap { #[inline] pub fn insert(&mut self, remote_entity: Entity, local_entity: Entity) { self.remote_to_local.insert(remote_entity, local_entity); @@ -65,12 +99,12 @@ impl EntityMap { } #[inline] - pub fn to_local(&self) -> &HashMap { + pub fn to_local(&self) -> &EntityHashMap { &self.remote_to_local } #[inline] - pub fn to_remote(&self) -> &HashMap { + pub fn to_remote(&self) -> &EntityHashMap { &self.local_to_remote } @@ -80,23 +114,36 @@ impl EntityMap { } } +impl EntityMapper for RemoteEntityMap { + fn map(&self, entity: Entity) -> Option { + self.get_local(entity).copied() + } +} + /// Trait that Messages or Components must implement to be able to map entities -pub trait MapEntities { +pub trait MapEntities<'a> { /// Map the entities inside the message or component from the remote World to the local World - fn map_entities(&mut self, entity_map: &EntityMap); + fn map_entities(&mut self, entity_mapper: Box); + + /// Get all the entities that are present in that message or component + fn entities(&self) -> EntityHashSet; } -impl MapEntities for Entity { - fn map_entities(&mut self, entity_map: &EntityMap) { - // TODO: if the entity is inside a component, then we don't want to just use the remote entity in the component - // instead we should say: - // - there is a remote entity that we haven't mapped yet - // - wait for it to appear - // - if it appears, we finish the mapping and spawn the entity - if let Some(local) = entity_map.get_local(*self) { - *self = *local; +impl<'a> MapEntities<'a> for Entity { + fn map_entities(&mut self, entity_mapper: Box) { + if let Some(local) = entity_mapper.map(*self) { + *self = local; + } else { + panic!( + "cannot map entity {:?} because it doesn't exist in the entity map!", + self + ); } } + + fn entities(&self) -> EntityHashSet { + EntityHashSet::from_iter(vec![*self]) + } } #[cfg(test)] @@ -159,7 +206,7 @@ mod tests { .connection() .base() .replication_manager - .entity_map + .remote_entity_map .get_local(server_entity) .unwrap(); assert_eq!( @@ -187,7 +234,7 @@ mod tests { .connection() .base() .replication_manager - .entity_map + .remote_entity_map .get_local(server_entity_2) .unwrap(); // the 'server entity' inside the Component4 component got mapped to the corresponding entity on the client diff --git a/lightyear/src/shared/replication/manager.rs b/lightyear/src/shared/replication/manager.rs index 1635c104e..5969e2a10 100644 --- a/lightyear/src/shared/replication/manager.rs +++ b/lightyear/src/shared/replication/manager.rs @@ -4,18 +4,26 @@ use bevy::a11y::accesskit::Action; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use crate::_reexport::{EntityActionsChannel, EntityUpdatesChannel}; +use crate::_reexport::{ + EntityActionsChannel, EntityUpdatesChannel, IntoKind, ShouldBeInterpolated, ShouldBePredicted, +}; +use crate::client::components::{ComponentSyncMode, Confirmed}; +use crate::client::prediction::Predicted; use crate::connection::events::ConnectionEvents; +use crate::protocol::component::ComponentProtocol; use bevy::ecs::component::Tick as BevyTick; use bevy::prelude::{Entity, EntityWorldMut, World}; +use bevy::utils::petgraph::algo::toposort; use bevy::utils::petgraph::data::ElementIterator; +use bevy::utils::petgraph::graphmap::DiGraphMap; +use bevy::utils::petgraph::prelude::DiGraph; use bevy::utils::EntityHashMap; use crossbeam_channel::Receiver; use tracing::{debug, error, info, trace, trace_span}; use tracing_subscriber::filter::FilterExt; use tracing_subscriber::fmt::writer::MakeWriterExt; -use super::entity_map::EntityMap; +use super::entity_map::{InterpolatedEntityMap, PredictedEntityMap, RemoteEntityMap}; use super::{ EntityActionMessage, EntityActions, EntityUpdatesMessage, Replicate, ReplicationMessage, ReplicationMessageData, @@ -23,7 +31,8 @@ use super::{ use crate::connection::message::ProtocolMessage; use crate::netcode::ClientId; use crate::packet::message::MessageId; -use crate::prelude::Tick; +use crate::prelude::client::Interpolated; +use crate::prelude::{MapEntities, Tick}; use crate::protocol::channel::ChannelKind; use crate::protocol::component::{ComponentBehaviour, ComponentKindBehaviour}; use crate::protocol::Protocol; @@ -42,13 +51,22 @@ pub(crate) struct ReplicationManager { /// are being buffered individually but we want to group them inside a message pub pending_actions: EntityHashMap< ReplicationGroupId, - BTreeMap>, + HashMap>, >, - pub pending_updates: EntityHashMap>>, + pub pending_updates: EntityHashMap>>, + // Get the graph of dependencies between entities within a same group. + // (for example if a component of entity1 refers to entity2, then entity2 must be spawned before entity1. + // In that case we add an edge entity2 -> entity1 in the graph + pub group_dependencies: EntityHashMap>, // RECEIVE /// Map between local and remote entities. (used mostly on client because it's when we receive entity updates) - pub entity_map: EntityMap, + pub remote_entity_map: RemoteEntityMap, + /// Map between remote and predicted entities + pub predicted_entity_map: PredictedEntityMap, + /// Map between remote and interpolated entities + pub interpolated_entity_map: InterpolatedEntityMap, + /// Map from remote entity to the replication group-id pub remote_entity_to_group: EntityHashMap, @@ -61,12 +79,15 @@ impl ReplicationManager

{ pub(crate) fn new(updates_ack_tracker: Receiver) -> Self { Self { // SEND - pending_actions: EntityHashMap::default(), - pending_updates: EntityHashMap::default(), updates_ack_tracker, updates_message_id_to_group_id: Default::default(), + pending_actions: EntityHashMap::default(), + pending_updates: EntityHashMap::default(), + group_dependencies: EntityHashMap::default(), // RECEIVE - entity_map: EntityMap::default(), + remote_entity_map: RemoteEntityMap::default(), + predicted_entity_map: PredictedEntityMap::default(), + interpolated_entity_map: InterpolatedEntityMap::default(), remote_entity_to_group: Default::default(), // BOTH group_channels: Default::default(), @@ -100,7 +121,7 @@ impl ReplicationManager

{ match message.data { ReplicationMessageData::Actions(m) => { // update the mapping from entity to group-id - m.actions.keys().for_each(|e| { + m.actions.iter().for_each(|(e, _)| { self.remote_entity_to_group.insert(*e, message.group_id); }); // if the message is too old, ignore it @@ -116,7 +137,7 @@ impl ReplicationManager

{ } ReplicationMessageData::Updates(m) => { // update the mapping from entity to group-id - m.updates.keys().for_each(|e| { + m.updates.iter().for_each(|(e, _)| { self.remote_entity_to_group.insert(*e, message.group_id); }); // if we have already applied a more recent update for this group, no need to keep this one @@ -179,7 +200,7 @@ impl ReplicationManager

{ // USED BY RECEIVE SIDE (SEND SIZE CAN GET THE GROUP_ID EASILY) /// Get the group channel associated with a given entity pub(crate) fn channel_by_local(&self, local_entity: Entity) -> Option<&GroupChannel

> { - self.entity_map + self.remote_entity_map .get_remote(local_entity) .and_then(|remote_entity| self.channel_by_remote(*remote_entity)) } @@ -214,6 +235,10 @@ impl ReplicationManager

{ .entry(entity) .or_default(); actions.spawn = true; + self.group_dependencies + .entry(group) + .or_default() + .add_node(entity); } pub(crate) fn prepare_entity_despawn(&mut self, entity: Entity, group: ReplicationGroupId) { @@ -223,6 +248,10 @@ impl ReplicationManager

{ .entry(entity) .or_default() .despawn = true; + self.group_dependencies + .entry(group) + .or_default() + .add_node(entity); } // we want to send all component inserts that happen together for the same entity in a single message @@ -234,6 +263,13 @@ impl ReplicationManager

{ group: ReplicationGroupId, component: P::Components, ) { + // if component contains entities, add an edge in the dependency graph + let graph = self.group_dependencies.entry(group).or_default(); + graph.add_node(entity); + component.entities().iter().for_each(|e| { + // `e` must be spawned before `entity` + graph.add_edge(*e, entity, ()); + }); self.pending_actions .entry(group) .or_default() @@ -256,6 +292,10 @@ impl ReplicationManager

{ .or_default() .remove .push(component); + self.group_dependencies + .entry(group) + .or_default() + .add_node(entity); } pub(crate) fn prepare_entity_update( @@ -264,6 +304,13 @@ impl ReplicationManager

{ group: ReplicationGroupId, component: P::Components, ) { + // if component contains entities, add an edge in the dependency graph + let graph = self.group_dependencies.entry(group).or_default(); + graph.add_node(entity); + component.entities().iter().for_each(|e| { + // `e` must be spawned before `entity` + graph.add_edge(*e, entity, ()); + }); self.pending_updates .entry(group) .or_default() @@ -283,53 +330,88 @@ impl ReplicationManager

{ )> { let mut messages = Vec::new(); - // if there are any entity actions, send EntityActions - for (group_id, mut actions) in self.pending_actions.drain() { - // add any updates for that group into the actions message - if let Some(updates) = self.pending_updates.remove(&group_id) { - for (entity, components) in updates { - // for any update that was not already in insert, add it to the update list - // TODO: also check if it's not already in updates? - let existing_inserts = actions - .entry(entity) - .or_default() - .insert - .iter() - .map(|c| c.into()) - .collect::>(); - components - .into_iter() - .filter(|c| !existing_inserts.contains(&c.into())) - .for_each(|c| actions.entry(entity).or_default().updates.push(c)); + // get the list of entities in topological order + for (group_id, dependency_graph) in self.group_dependencies.drain() { + match toposort(&dependency_graph, None) { + Ok(entities) => { + // create an actions message + if let Some(mut actions) = self.pending_actions.remove(&group_id) { + let channel = self.group_channels.entry(group_id).or_default(); + let message_id = channel.actions_next_send_message_id; + channel.actions_next_send_message_id += 1; + channel.last_action_tick = tick; + let mut actions_message = vec![]; + + // add any updates for that group into the actions message + let mut my_updates = self.pending_updates.remove(&group_id); + + // add actions to the message for entities in topological order + for e in entities.iter() { + let mut a = actions.remove(e).unwrap_or_else(EntityActions::default); + // for any update that was not already in insert/updates, add it to the update list + if let Some(ref mut updates) = my_updates { + // TODO: this suggests that we should already store inserts/updates as HashSet! + let existing_inserts = a + .insert + .iter() + .map(|c| c.into()) + .collect::>(); + let existing_updates = a + .updates + .iter() + .map(|c| c.into()) + .collect::>(); + if let Some(u) = updates.remove(e) { + u.into_iter() + .filter(|c| { + !existing_inserts.contains(&(c.into())) + && !existing_updates.contains(&(c.into())) + }) + .for_each(|c| a.updates.push(c)); + } + } + actions_message.push((*e, a)); + } + + messages.push(( + ChannelKind::of::(), + group_id, + ReplicationMessageData::Actions(EntityActionMessage { + sequence_id: message_id, + actions: actions_message, + }), + )); + } + + // create an updates message + if let Some(mut updates) = self.pending_updates.remove(&group_id) { + let channel = self.group_channels.entry(group_id).or_default(); + let mut updates_message = vec![]; + + // add updates to the message in topological order + entities.iter().for_each(|e| { + if let Some(u) = updates.remove(e) { + updates_message.push((*e, u)); + }; + }); + + messages.push(( + ChannelKind::of::(), + group_id, + ReplicationMessageData::Updates(EntityUpdatesMessage { + last_action_tick: channel.last_action_tick, + updates: updates_message, + }), + )); + }; + } + Err(e) => { + error!("There is a cyclic dependency in the group (with entity {:?})! Replication aborted.", e.node_id()); } } - let channel = self.group_channels.entry(group_id).or_default(); - let message_id = channel.actions_next_send_message_id; - channel.actions_next_send_message_id += 1; - channel.last_action_tick = tick; - messages.push(( - ChannelKind::of::(), - group_id, - ReplicationMessageData::Actions(EntityActionMessage { - sequence_id: message_id, - actions, - }), - )); } - - // send the remaining updates - for (group_id, updates) in self.pending_updates.drain() { - let channel = self.group_channels.entry(group_id).or_default(); - messages.push(( - ChannelKind::of::(), - group_id, - ReplicationMessageData::Updates(EntityUpdatesMessage { - last_action_tick: channel.last_action_tick, - updates, - }), - )); - } - + self.pending_actions.clear(); + self.pending_updates.clear(); if !messages.is_empty() { trace!(?messages, "Sending replication messages"); } @@ -349,58 +431,172 @@ impl ReplicationManager

{ let _span = trace_span!("Apply received replication message to world").entered(); match replication { ReplicationMessageData::Actions(m) => { + info!(?m, "Received replication actions"); + // NOTE: order matters here, the entities are stored in the message in topological order + // i.e. entities that depend on other entities are read later on for (entity, actions) in m.actions.into_iter() { - debug!(remote_entity = ?entity, "Received entity actions"); + info!(remote_entity = ?entity, "Received entity actions"); assert!(!(actions.spawn && actions.despawn)); // spawn/despawn - let mut local_entity: EntityWorldMut; if actions.spawn { // TODO: optimization: spawn the bundle of insert components - local_entity = world.spawn_empty(); - self.entity_map.insert(entity, local_entity.id()); + let local_entity = world.spawn_empty(); + self.remote_entity_map.insert(entity, local_entity.id()); debug!(remote_entity = ?entity, "Received entity spawn"); events.push_spawn(local_entity.id()); } else if actions.despawn { debug!(remote_entity = ?entity, "Received entity despawn"); - if let Some(local_entity) = self.entity_map.remove_by_remote(entity) { + if let Some(local_entity) = self.remote_entity_map.remove_by_remote(entity) + { events.push_despawn(local_entity); world.despawn(local_entity); } else { error!("Received despawn for an entity that does not exist") } continue; - } else if let Ok(l) = self.entity_map.get_by_remote(world, entity) { - local_entity = l; - } else { + } else if self.remote_entity_map.get_by_remote(world, entity).is_err() { error!("cannot find entity"); continue; } - // inserts + // safety: we know by this point that the entity exists + let local_entity = *self.remote_entity_map.get_local(entity).unwrap(); + + // prediction / interpolation let kinds = actions .insert .iter() .map(|c| c.into()) - .collect::>(); - debug!(remote_entity = ?entity, ?kinds, "Received InsertComponent"); - for component in actions.insert { - // TODO: figure out what to do with tick here - events.push_insert_component( - local_entity.id(), - (&component).into(), - Tick(0), + .collect::>(); + + // NOTE: we handle it here because we want to spawn the predicted/interpolated entities + // for the group in topological sort as well, and to maintain a new entity mapping + // i.e. if ComponentA contains an entity E, + // predicted component A' must contain an entity E' that is predicted + let predicted_kind = + P::ComponentKinds::from(&P::Components::from(ShouldBePredicted)); + if kinds.contains(&predicted_kind) { + // spawn a new predicted entity + let mut predicted_entity_mut = world.spawn(Predicted { + confirmed_entity: local_entity, + }); + + let predicted_entity = predicted_entity_mut.id(); + self.predicted_entity_map + .remote_to_predicted + .insert(entity, predicted_entity); + + // TODO: I don't like having coupling between the manager and prediction/interpolation + // now every component needs to implement ComponentSyncMode... + // sync components that have ComponentSyncMode != None + for mut component in actions + .insert + .iter() + .filter(|c| !matches!(c.mode(), ComponentSyncMode::None)) + .cloned() + { + info!("syncing component {:?} for predicted entity", component); + // map any entities inside the component + component.map_entities(Box::new(&self.predicted_entity_map)); + component.insert(&mut predicted_entity_mut); + } + + // add Confirmed to the confirmed entity + // safety: we know the entity exists + let mut local_entity_mut = world.entity_mut(local_entity); + if let Some(mut confirmed) = local_entity_mut.get_mut::() { + confirmed.predicted = Some(predicted_entity); + } else { + local_entity_mut.insert(Confirmed { + predicted: Some(predicted_entity), + interpolated: None, + }); + } + info!( + "Spawn predicted entity {:?} for confirmed: {:?}", + predicted_entity, local_entity, ); - component.insert(&mut local_entity); + #[cfg(feature = "metrics")] + { + metrics::increment_counter!("spawn_predicted_entity"); + } + } + // NOTE: we handle it here because we want to spawn the predicted/interpolated entities + // for the group in topological sort as well, and to maintain a new entity mapping + // i.e. if ComponentA contains an entity E, + // predicted component A' must contain an entity E' that is predicted + // maybe instead there should be an entity dependency graph that clients are aware of and maintain, + // and replication between confirmed/predicted should use that graph (even for non replicated components) + // For example what happens if on confirm we spawn a particle HasParent(ConfirmedEntity)? + // most probably it wouldn't get synced, but if it is we need to perform a mapping as well. + // + let interpolated_kind = + P::ComponentKinds::from(&P::Components::from(ShouldBeInterpolated)); + if kinds.contains(&interpolated_kind) { + // spawn a new interpolated entity + let mut interpolated_entity_mut = world.spawn(Interpolated { + confirmed_entity: local_entity, + }); + + let interpolated_entity = interpolated_entity_mut.id(); + self.interpolated_entity_map + .remote_to_interpolated + .insert(entity, interpolated_entity); + + // TODO: I don't like having coupling between the manager and prediction/interpolation + // now every component needs to implement ComponentSyncMode... + // sync components that have ComponentSyncMode != None + for mut component in actions + .insert + .iter() + .filter(|c| !matches!(c.mode(), ComponentSyncMode::None)) + .cloned() + { + // map any entities inside the component + component.map_entities(Box::new(&self.interpolated_entity_map)); + component.insert(&mut interpolated_entity_mut); + } + + // add Confirmed to the confirmed entity + // safety: we know the entity exists + let mut local_entity_mut = world.entity_mut(local_entity); + if let Some(mut confirmed) = local_entity_mut.get_mut::() { + confirmed.interpolated = Some(interpolated_entity); + } else { + local_entity_mut.insert(Confirmed { + interpolated: Some(interpolated_entity), + predicted: None, + }); + } + info!( + "Spawn interpolated entity {:?} for confirmed: {:?}/remote: {:?}", + interpolated_entity, local_entity, entity, + ); + #[cfg(feature = "metrics")] + { + metrics::increment_counter!("spawn_interpolated_entity"); + } + } + + // inserts + info!(remote_entity = ?entity, ?kinds, "Received InsertComponent"); + let mut local_entity_mut = world.entity_mut(local_entity); + for mut component in actions.insert { + // map any entities inside the component + component.map_entities(Box::new(&self.remote_entity_map)); + // TODO: figure out what to do with tick here + events.push_insert_component(local_entity, (&component).into(), Tick(0)); + component.insert(&mut local_entity_mut); } // removals debug!(remote_entity = ?entity, ?actions.remove, "Received RemoveComponent"); for kind in actions.remove { - events.push_remove_component(local_entity.id(), kind.clone(), Tick(0)); - kind.remove(&mut local_entity); + events.push_remove_component(local_entity, kind.clone(), Tick(0)); + kind.remove(&mut local_entity_mut); } // (no need to run apply_deferred after applying actions, that is only for Commands) @@ -412,14 +608,17 @@ impl ReplicationManager

{ .map(|c| c.into()) .collect::>(); debug!(remote_entity = ?entity, ?kinds, "Received UpdateComponent"); - for component in actions.updates { - events.push_update_component( - local_entity.id(), - (&component).into(), - Tick(0), - ); - component.update(&mut local_entity); + for mut component in actions.updates { + // map any entities inside the component + component.map_entities(Box::new(&self.remote_entity_map)); + events.push_update_component(local_entity, (&component).into(), Tick(0)); + component.update(&mut local_entity_mut); } + + // TODO: + // prediction/interpolation + // sync updates for ComponentSyncMode = simple or full + // NOTE: again we apply the replication here because we want to apply it in topological order } } ReplicationMessageData::Updates(m) => { @@ -431,7 +630,9 @@ impl ReplicationManager

{ .collect::>(); debug!(?entity, ?kinds, "Received UpdateComponent"); // if the entity does not exist, create it - if let Ok(mut local_entity) = self.entity_map.get_by_remote(world, entity) { + if let Ok(mut local_entity) = + self.remote_entity_map.get_by_remote(world, entity) + { for component in components { events.push_update_component( local_entity.id(), @@ -554,6 +755,7 @@ mod tests { use crate::tests::protocol::*; use bevy::prelude::*; + // TODO: add tests for replication with entity relations! #[test] fn test_buffer_replication_messages() { let (sender, receiver) = crossbeam_channel::unbounded(); @@ -607,54 +809,55 @@ mod tests { MyComponentsProtocol::Component3(Component3(5.0)), ); + // the order of actions is not important if there are no relations between the entities + let message = manager.finalize(Tick(2)); + let actions = message.first().unwrap(); + assert_eq!(actions.0, ChannelKind::of::()); + assert_eq!(actions.1, group_1); + let ReplicationMessageData::Actions(ref a) = actions.2 else { + panic!() + }; + assert_eq!(a.sequence_id, MessageId(2)); assert_eq!( - manager.finalize(Tick(2)), - vec![ + EntityHashMap::from_iter(a.actions.clone()), + EntityHashMap::from_iter(vec![ ( - ChannelKind::of::(), - group_1, - ReplicationMessageData::Actions(EntityActionMessage { - sequence_id: MessageId(2), - actions: BTreeMap::from([ - ( - entity_1, - EntityActions { - spawn: true, - despawn: false, - insert: vec![MyComponentsProtocol::Component1(Component1(1.0))], - remove: vec![MyComponentsProtocolKind::Component2], - updates: vec![MyComponentsProtocol::Component3(Component3( - 3.0 - ))], - } - ), - ( - entity_2, - EntityActions { - spawn: false, - despawn: false, - insert: vec![], - remove: vec![], - updates: vec![MyComponentsProtocol::Component2(Component2( - 4.0 - ))], - } - ) - ]), - }) + entity_1, + EntityActions { + spawn: true, + despawn: false, + insert: vec![MyComponentsProtocol::Component1(Component1(1.0))], + remove: vec![MyComponentsProtocolKind::Component2], + updates: vec![MyComponentsProtocol::Component3(Component3(3.0))], + } ), ( - ChannelKind::of::(), - group_2, - ReplicationMessageData::Updates(EntityUpdatesMessage { - last_action_tick: Tick(3), - updates: BTreeMap::from([( - entity_3, - vec![MyComponentsProtocol::Component3(Component3(5.0))] - )]), - }) + entity_2, + EntityActions { + spawn: false, + despawn: false, + insert: vec![], + remove: vec![], + updates: vec![MyComponentsProtocol::Component2(Component2(4.0))], + } ) - ] + ]) + ); + + let updates = message.get(1).unwrap(); + assert_eq!( + updates, + &( + ChannelKind::of::(), + group_2, + ReplicationMessageData::Updates(EntityUpdatesMessage { + last_action_tick: Tick(3), + updates: vec![( + entity_3, + vec![MyComponentsProtocol::Component3(Component3(5.0))] + )], + }) + ) ); assert_eq!( manager diff --git a/lightyear/src/shared/replication/mod.rs b/lightyear/src/shared/replication/mod.rs index b2192ff93..1d0889cd5 100644 --- a/lightyear/src/shared/replication/mod.rs +++ b/lightyear/src/shared/replication/mod.rs @@ -5,14 +5,14 @@ use bevy::ecs::component::Tick as BevyTick; use bevy::ecs::system::SystemChangeTick; use bevy::prelude::{Component, Entity, Resource}; use bevy::reflect::Map; -use bevy::utils::{EntityHashMap, HashMap}; +use bevy::utils::{EntityHashMap, EntityHashSet, HashMap}; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use crate::channel::builder::{Channel, EntityActionsChannel, EntityUpdatesChannel}; use crate::netcode::ClientId; use crate::packet::message::MessageId; -use crate::prelude::{EntityMap, MapEntities, NetworkTarget, Tick}; +use crate::prelude::{EntityMapper, MapEntities, NetworkTarget, RemoteEntityMap, Tick}; use crate::protocol::channel::ChannelKind; use crate::protocol::Protocol; use crate::shared::replication::components::{Replicate, ReplicationGroup, ReplicationGroupId}; @@ -74,19 +74,14 @@ impl Default for EntityActions { #[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] pub struct EntityActionMessage { sequence_id: MessageId, - // TODO: maybe we want a sorted hash map here? - // because if we want to send a group of entities together, presumably it's because there's a hierarchy - // between the elements of the group - // E1: head, E2: arm, parent=head. So we need to read E1 first. - pub(crate) actions: BTreeMap>, + pub(crate) actions: Vec<(Entity, EntityActions)>, } #[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] pub struct EntityUpdatesMessage { /// The last tick for which we sent an EntityActionsMessage for this group last_action_tick: Tick, - /// TODO: consider EntityHashMap or Vec if order doesn't matter - pub(crate) updates: BTreeMap>, + pub(crate) updates: Vec<(Entity, Vec)>, } #[derive(Serialize, Deserialize, Clone, PartialEq, Debug)] @@ -103,34 +98,6 @@ pub struct ReplicationMessage { pub(crate) data: ReplicationMessageData, } -impl MapEntities for ReplicationMessage { - // NOTE: we do NOT map the entities for these messages (apart from those contained in the components) - // because the replication logic (`apply_world`) expects the entities to be the remote entities - fn map_entities(&mut self, entity_map: &EntityMap) { - match &mut self.data { - ReplicationMessageData::Actions(m) => { - m.actions.values_mut().for_each(|entity_actions| { - entity_actions - .insert - .iter_mut() - .for_each(|c| c.map_entities(entity_map)); - entity_actions - .updates - .iter_mut() - .for_each(|c| c.map_entities(entity_map)); - }) - } - ReplicationMessageData::Updates(m) => { - m.updates.values_mut().for_each(|entity_updates| { - entity_updates - .iter_mut() - .for_each(|c| c.map_entities(entity_map)); - }) - } - } - } -} - pub trait ReplicationSend: Resource { // type Manager: ReplicationManager; @@ -257,7 +224,7 @@ mod tests { .connection() .base() .replication_manager - .entity_map + .remote_entity_map .get_local(server_entity) .unwrap(); assert_eq!( diff --git a/lightyear/src/tests/protocol.rs b/lightyear/src/tests/protocol.rs index 0ee99e240..6bd5096be 100644 --- a/lightyear/src/tests/protocol.rs +++ b/lightyear/src/tests/protocol.rs @@ -1,4 +1,5 @@ use bevy::prelude::{Component, Entity}; +use bevy::utils::EntityHashSet; use derive_more::{Add, Mul}; use serde::{Deserialize, Serialize}; @@ -12,7 +13,7 @@ pub struct Message1(pub String); #[derive(MessageInternal, Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct Message2(pub u32); -#[message_protocol_internal(protocol = "MyProtocol", derive(Debug))] +#[message_protocol_internal(protocol = "MyProtocol")] pub enum MyMessageProtocol { Message1(Message1), Message2(Message2), @@ -32,13 +33,16 @@ pub struct Component3(pub f32); #[message(custom_map)] pub struct Component4(pub Entity); -impl MapEntities for Component4 { - fn map_entities(&mut self, entity_map: &EntityMap) { - self.0.map_entities(entity_map); +impl<'a> MapEntities<'a> for Component4 { + fn map_entities(&mut self, entity_mapper: Box) { + self.0.map_entities(entity_mapper); + } + + fn entities(&self) -> EntityHashSet { + EntityHashSet::from_iter(vec![self.0]) } } -// #[component_protocol_internal(protocol = "MyProtocol", derive(Debug))] #[component_protocol_internal(protocol = "MyProtocol")] pub enum MyComponentsProtocol { #[sync(full)] diff --git a/lightyear/src/utils/bevy.rs b/lightyear/src/utils/bevy.rs index cab6fc420..a7beb5fe1 100644 --- a/lightyear/src/utils/bevy.rs +++ b/lightyear/src/utils/bevy.rs @@ -1,6 +1,7 @@ //! Implement lightyear traits for some common bevy types -use crate::prelude::{EntityMap, MapEntities, Message, Named}; -use bevy::prelude::Transform; +use crate::prelude::{EntityMapper, MapEntities, Message, Named, RemoteEntityMap}; +use bevy::prelude::{Entity, Transform}; +use bevy::utils::EntityHashSet; impl Named for Transform { fn name(&self) -> String { @@ -8,8 +9,12 @@ impl Named for Transform { } } -impl MapEntities for Transform { - fn map_entities(&mut self, entity_map: &EntityMap) {} +impl<'a> MapEntities<'a> for Transform { + fn map_entities(&mut self, entity_mapper: Box) {} + + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } } impl Message for Transform {} @@ -23,8 +28,12 @@ cfg_if::cfg_if! { } } - impl MapEntities for Color { - fn map_entities(&mut self, entity_map: &EntityMap) {} + impl<'a> MapEntities<'a> for Color { + fn map_entities(&mut self, entity_mapper: Box) {} + + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } } impl Message for Color {} @@ -35,8 +44,12 @@ cfg_if::cfg_if! { } } - impl MapEntities for Visibility { - fn map_entities(&mut self, entity_map: &EntityMap) {} + impl<'a> MapEntities<'a> for Visibility { + fn map_entities(&mut self, entity_mapper: Box) {} + + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } } impl Message for Visibility {} diff --git a/lightyear/src/utils/named.rs b/lightyear/src/utils/named.rs index 9e6da6dad..bb8698a25 100644 --- a/lightyear/src/utils/named.rs +++ b/lightyear/src/utils/named.rs @@ -4,6 +4,8 @@ // - derive Reflect for Messages // - require to derive Reflect for Components ? +use std::fmt::{Debug, Formatter}; + pub trait TypeNamed { fn name() -> String; } diff --git a/macros/src/component.rs b/macros/src/component.rs index 475637de6..c8e258371 100644 --- a/macros/src/component.rs +++ b/macros/src/component.rs @@ -55,6 +55,10 @@ impl SyncField { tokens = quote! { ComponentSyncMode::Once }; + } else { + tokens = quote! { + ComponentSyncMode::None + }; } tokens } @@ -70,7 +74,7 @@ impl SyncField { if self.once { count += 1; } - if count != 1 { + if count > 1 { panic!( "The field {:?} cannot have multiple sync attributes set at the same time", self @@ -121,7 +125,7 @@ pub fn component_protocol_impl( // Use darling to parse the attributes for each field let sync_fields: Vec = fields .iter() - .filter(|field| field.attrs.iter().any(|attr| attr.path().is_ident("sync"))) + // .filter(|field| field.attrs.iter().any(|attr| attr.path().is_ident("sync"))) .map(|field| FromField::from_field(field).unwrap()) .collect(); for field in &sync_fields { @@ -145,6 +149,7 @@ pub fn component_protocol_impl( let add_events_method = add_events_method(&fields); let push_component_events_method = push_component_events_method(&fields, protocol); let add_sync_systems_method = add_sync_systems_method(&sync_fields, protocol); + let mode_method = mode_method(&input, &fields); let encode_method = encode_method(); let decode_method = decode_method(); let delegate_method = delegate_method(&input, &enum_kind_name); @@ -163,15 +168,9 @@ pub fn component_protocol_impl( use #shared_crate_name::_reexport::*; use #shared_crate_name::prelude::*; use #shared_crate_name::prelude::client::*; - use bevy::prelude::{App, IntoSystemConfigs, EntityWorldMut, World}; - use #shared_crate_name::shared::replication::systems::add_per_component_replication_send_systems; - use #shared_crate_name::connection::events::{IterComponentInsertEvent, IterComponentRemoveEvent, IterComponentUpdateEvent}; - use #shared_crate_name::shared::systems::events::{ - push_component_insert_events, push_component_remove_events, push_component_update_events, - }; + use bevy::prelude::{App, Entity, IntoSystemConfigs, EntityWorldMut, World}; + use bevy::utils::{EntityHashMap, EntityHashSet}; use #shared_crate_name::shared::events::{ComponentInsertEvent, ComponentRemoveEvent, ComponentUpdateEvent}; - use #shared_crate_name::client::prediction::{add_prediction_systems}; - use #shared_crate_name::client::interpolation::{add_interpolation_systems, add_lerp_systems, InterpolatedComponent}; #[derive(Serialize, Deserialize, Clone, PartialEq)] #extra_derives @@ -185,6 +184,7 @@ pub fn component_protocol_impl( #add_events_method #push_component_events_method #add_sync_systems_method + #mode_method } impl std::fmt::Debug for #enum_name { @@ -234,6 +234,8 @@ fn get_fields(input: &ItemEnum) -> Vec { let mut component = unnamed.unnamed.first().unwrap().clone(); // get the attrs from the variant component.attrs = variant.attrs.clone(); + // set the field ident as the variant ident + component.ident = Some(variant.ident.clone()); // make field immutable fields.push(component); } @@ -324,6 +326,12 @@ fn add_events_method(fields: &Vec) -> TokenStream { fn sync_component_impl(fields: &Vec, protocol_name: &Ident) -> TokenStream { let mut body = quote! {}; for field in fields { + // skip components that are defined externally + if field.ident.as_ref().unwrap().eq("ShouldBePredicted") + || field.ident.as_ref().unwrap().eq("ShouldBeInterpolated") + { + continue; + } let component_type = &field.ty; let mode = field.get_mode_tokens(); body = quote! { @@ -335,22 +343,20 @@ fn sync_component_impl(fields: &Vec, protocol_name: &Ident) -> TokenS } }; if field.full { + // custom let lerp_mode = if let Some(lerp_fn) = &field.lerp { quote! { - fn lerp_mode() -> LerpMode<#component_type> { - LerpMode::Custom(lerm_fn) - } + type Fn = #lerp_fn; } } else { + // by default, use linear interpolation quote! { - fn lerp_mode() -> LerpMode<#component_type> { - LerpMode::Linear - } + type Fn = LinearInterpolation; } }; body = quote! { #body - impl InterpolatedComponent for #component_type { + impl InterpolatedComponent<#component_type> for #component_type { #lerp_mode } } @@ -361,6 +367,7 @@ fn sync_component_impl(fields: &Vec, protocol_name: &Ident) -> TokenS fn add_sync_systems_method(fields: &Vec, protocol_name: &Ident) -> TokenStream { let mut prediction_body = quote! {}; + let mut prepare_interpolation_body = quote! {}; let mut interpolation_body = quote! {}; for field in fields { let component_type = &field.ty; @@ -368,14 +375,14 @@ fn add_sync_systems_method(fields: &Vec, protocol_name: &Ident) -> To #prediction_body add_prediction_systems::<#component_type, #protocol_name>(app); }; - interpolation_body = quote! { - #interpolation_body - add_interpolation_systems::<#component_type, #protocol_name>(app); + prepare_interpolation_body = quote! { + #prepare_interpolation_body + add_prepare_interpolation_systems::<#component_type, #protocol_name>(app); }; if field.full { interpolation_body = quote! { #interpolation_body - add_lerp_systems::<#component_type, #protocol_name>(app); + add_interpolation_systems::<#component_type, #protocol_name>(app); }; } } @@ -384,6 +391,10 @@ fn add_sync_systems_method(fields: &Vec, protocol_name: &Ident) -> To { #prediction_body } + fn add_prepare_interpolation_systems(app: &mut App) + { + #prepare_interpolation_body + } fn add_interpolation_systems(app: &mut App) { #interpolation_body @@ -484,28 +495,63 @@ fn remove_method(input: &ItemEnum, fields: &[Field], enum_kind_name: &Ident) -> } } +fn mode_method(input: &ItemEnum, fields: &Vec) -> TokenStream { + let mut body = quote! {}; + for field in fields { + let ident = &field.ident; + + let component_type = &field.ty; + body = quote! { + #body + Self::#ident(_) => #component_type::mode(), + }; + } + + quote! { + fn mode(&self) -> ComponentSyncMode { + match self { + #body + } + } + } +} + fn delegate_method(input: &ItemEnum, enum_kind_name: &Ident) -> TokenStream { let enum_name = &input.ident; let variants = input.variants.iter().map(|v| v.ident.clone()); let mut map_entities_body = quote! {}; + let mut entities_body = quote! {}; for variant in input.variants.iter() { let ident = &variant.ident; map_entities_body = quote! { #map_entities_body - #enum_name::#ident(ref mut x) => x.map_entities(entity_map), + Self::#ident(ref mut x) => x.map_entities(entity_mapper), + }; + entities_body = quote! { + #entities_body + Self::#ident(ref x) => x.entities(), }; } + // TODO: make it work with generics quote! { - impl MapEntities for #enum_name { - fn map_entities(&mut self, entity_map: &EntityMap) { + impl<'a> MapEntities<'a> for #enum_name { + fn map_entities(&mut self, entity_mapper: Box) { match self { #map_entities_body } } + fn entities(&self) -> EntityHashSet { + match self { + #entities_body + } + } } - impl MapEntities for #enum_kind_name { - fn map_entities(&mut self, entity_map: &EntityMap) {} + impl<'a> MapEntities<'a> for #enum_kind_name { + fn map_entities(&mut self, entity_mapper: Box) {} + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } } } } diff --git a/macros/src/message.rs b/macros/src/message.rs index 1673620d6..98d052dc3 100644 --- a/macros/src/message.rs +++ b/macros/src/message.rs @@ -5,7 +5,10 @@ use darling::{Error, FromDeriveInput, FromMeta}; use proc_macro2::{Ident, Span, TokenStream}; use quote::{format_ident, quote}; use std::ops::Deref; -use syn::{parse_macro_input, parse_quote, DeriveInput, Field, Fields, ItemEnum, LitStr}; +use syn::{ + parse_macro_input, parse_quote, parse_quote_spanned, DeriveInput, Field, Fields, GenericParam, + Generics, ItemEnum, LifetimeParam, LitStr, +}; #[derive(Debug, FromDeriveInput)] #[darling(attributes(message))] @@ -36,6 +39,8 @@ pub fn message_impl( let gen = quote! { pub mod #module_name { use super::#struct_name; + use bevy::prelude::*; + use bevy::utils::{EntityHashMap, EntityHashSet}; use #shared_crate_name::prelude::*; impl #impl_generics Message for #struct_name #type_generics #where_clause {} @@ -54,13 +59,32 @@ pub fn message_impl( proc_macro::TokenStream::from(gen) } +// Add the MapEntities trait for the message. +// Need to combine the generics from the message with the generics from the trait fn map_entities_trait(input: &DeriveInput, ident_map: bool) -> TokenStream { - let (impl_generics, type_generics, where_clause) = input.generics.split_for_impl(); + // combined generics + let mut gen_clone = input.generics.clone(); + let lt: LifetimeParam = parse_quote_spanned! {Span::mixed_site() => 'a}; + gen_clone.params.push(GenericParam::from(lt)); + let (impl_generics, _, _) = gen_clone.split_for_impl(); + + // type generics + let (_, type_generics, where_clause) = input.generics.split_for_impl(); + + // trait generics (MapEntities) + let mut map_entities_generics = Generics::default(); + let lt: LifetimeParam = parse_quote_spanned! {Span::mixed_site() => 'a}; + map_entities_generics.params.push(GenericParam::from(lt)); + let (_, type_generics_map, _) = map_entities_generics.split_for_impl(); + let struct_name = &input.ident; if ident_map { quote! { - impl #impl_generics MapEntities for #struct_name #type_generics #where_clause { - fn map_entities(&mut self, entity_map: &EntityMap) {} + impl #impl_generics MapEntities #type_generics_map for #struct_name #type_generics #where_clause { + fn map_entities(&mut self, entity_mapper: Box) {} + fn entities(&self) -> EntityHashSet { + EntityHashSet::default() + } } } } else { @@ -131,7 +155,8 @@ pub fn message_protocol_impl( mod #module_name { use super::*; use serde::{Serialize, Deserialize}; - use bevy::prelude::{App, World}; + use bevy::prelude::{App, Entity, World}; + use bevy::utils::{EntityHashMap, EntityHashSet}; use #shared_crate_name::_reexport::*; use #shared_crate_name::prelude::*; use #shared_crate_name::connection::events::{IterMessageEvent}; @@ -152,6 +177,12 @@ pub fn message_protocol_impl( #push_message_events_method } + impl std::fmt::Debug for #enum_name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + self.name().fmt(f) + } + } + // #from_into_methods #delegate_method // impl BitSerializable for #enum_name { @@ -208,6 +239,7 @@ fn delegate_method(input: &ItemEnum) -> TokenStream { let variants = input.variants.iter().map(|v| v.ident.clone()); let mut name_body = quote! {}; let mut map_entities_body = quote! {}; + let mut entities_body = quote! {}; for variant in input.variants.iter() { let ident = &variant.ident; name_body = quote! { @@ -216,7 +248,11 @@ fn delegate_method(input: &ItemEnum) -> TokenStream { }; map_entities_body = quote! { #map_entities_body - #enum_name::#ident(ref mut x) => x.map_entities(entity_map), + #enum_name::#ident(ref mut x) => x.map_entities(entity_mapper), + }; + entities_body = quote! { + #entities_body + #enum_name::#ident(ref x) => x.entities(), }; } @@ -228,12 +264,17 @@ fn delegate_method(input: &ItemEnum) -> TokenStream { } } } - impl MapEntities for #enum_name { - fn map_entities(&mut self, entity_map: &EntityMap) { + impl<'a> MapEntities<'a> for #enum_name { + fn map_entities(&mut self, entity_mapper: Box) { match self { #map_entities_body } } + fn entities(&self) -> EntityHashSet { + match self { + #entities_body + } + } } } } diff --git a/macros/tests/derive_component.rs b/macros/tests/derive_component.rs index 7e81cfdd9..1b1ef8444 100644 --- a/macros/tests/derive_component.rs +++ b/macros/tests/derive_component.rs @@ -3,28 +3,50 @@ pub mod some_component { use derive_more::{Add, Mul}; use serde::{Deserialize, Serialize}; + use lightyear::prelude::client::InterpFn; use lightyear::prelude::*; use lightyear_macros::{component_protocol, message_protocol}; #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone, Add, Mul)] pub struct Component1(pub f32); - #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone, Add, Mul)] + #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct Component2(pub f32); + #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone)] + pub struct Component3(String); + + #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone)] + pub struct Component4(String); + + #[derive(Component, Message, Serialize, Deserialize, Debug, PartialEq, Clone)] + pub struct Component5(pub f32); + #[component_protocol(protocol = "MyProtocol")] pub enum MyComponentProtocol { #[sync(full)] Component1(Component1), #[sync(simple)] Component2(Component2), + #[sync(once)] + Component3(Component3), + Component4(Component4), + #[sync(full, lerp = "MyCustom")] + Component5(Component5), + } + + // custom interpolation logic + pub struct MyCustom; + impl InterpFn for MyCustom { + fn lerp(start: C, _other: C, _t: f32) -> C { + start + } } #[derive(Message, Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct Message1(pub u32); #[message_protocol(protocol = "MyProtocol")] - #[derive(Debug)] pub enum MyMessageProtocol { Message1(Message1), } @@ -38,6 +60,7 @@ pub mod some_component { #[cfg(test)] mod tests { + use lightyear::client::interpolation::InterpolatedComponent; use lightyear::protocol::BitSerializable; use lightyear::serialize::reader::ReadBuffer; use lightyear::serialize::wordbuffer::reader::ReadWordBuffer; @@ -57,6 +80,19 @@ mod tests { let copy = MyComponentProtocol::decode(&mut reader)?; assert_eq!(component1, copy); + // check interpolation + let component5 = Component5(0.1); + assert_eq!( + component5.clone(), + Component5::lerp(component5, Component5(0.0), 0.5) + ); + + let component1 = Component1(0.0); + assert_eq!( + Component1(0.5), + Component1::lerp(component1, Component1(1.0), 0.5) + ); + Ok(()) } } diff --git a/macros/tests/derive_message.rs b/macros/tests/derive_message.rs index f8b50d373..6ab705a33 100644 --- a/macros/tests/derive_message.rs +++ b/macros/tests/derive_message.rs @@ -11,10 +11,7 @@ pub mod some_message { #[derive(Message, Serialize, Deserialize, Debug, PartialEq, Clone)] pub struct Message2(pub u32); - // #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] - // #[derive(Debug, PartialEq)] - #[message_protocol(protocol = "MyProtocol", derive(Debug))] - // #[derive(EnumAsInner)] + #[message_protocol(protocol = "MyProtocol")] pub enum MyMessageProtocol { Message1(Message1), Message2(Message2),