Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add authority handling natively #594

Merged
merged 8 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
622 changes: 83 additions & 539 deletions NOTES.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
- [Server](./concepts/bevy_integration/server.md)
- [Events](./concepts/bevy_integration/events.md)
- [Advanced Replication](./concepts/advanced_replication/title.md)
- [Authority](./concepts/advanced_replication/authority.md)
- [Bandwidth Management](./concepts/advanced_replication/bandwidth_management.md)
- [Replication Logic](./concepts/advanced_replication/replication_logic.md)
- [Inputs](./concepts/advanced_replication/inputs.md)
Expand Down
65 changes: 65 additions & 0 deletions book/src/concepts/advanced_replication/authority.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Authority

Networked entities can be simulated on a client or on a server.
We define by 'Authority' the decision of which **peer is simulating an entity**.
The authoritative peer (client or server) is the only one that is allowed to send replication updates for an entity, and it won't accept updates from a non-authoritative peer.

Only **one peer** can be the authority over an entity at a given time.


### Benefits of distributed client-authority

Client authority means that the client is directly responsible for simulating an entity and sending
replication updates for that entity.

Cons:
- high exposure to cheating.
- lower latency
Pros:
- less CPU load on the server since the client is simulating some entities


### How it works

We have 2 components:
- `HasAuthority`: this is a marker component that you can use as a filter in queries
to check if the current peer has authority over the entity.
- on clients:
- a client will not accept any replication updates from the server if it has `HasAuthority` for an entity
- a client will send replication updates for an entity only if it has `HasAuthority` for that entity
- on server:
- this component is just used as an indicator for convenience, but the server can still send replication
updates even if it doesn't have `HasAuthority` for an entity. (because it's broadcasting the updates coming
from a client)
- `AuthorityPeer`: this component is only present on the server, and it indicates to the server which
peer currently holds authority over an entity. (`None`, `Server` or a `Client`).
The server will only accept replication updates for an entity if the sender matches the `AuthorityPeer`.

### Authority Transfer

On the server, you can use the `EntityCommand` `transfer_authority` to transfer the authority for an entity to a different peer.
The command is simply `commands.entity(entity).transfer_authority(new_owner)` to transfer the authority of `entity` to the `AuthorityPeer` `new_owner`.

Under the hood, authority transfers do two things:
- on the server, the transfer is applied immediately (i.e. the `HasAuthority` and `AuthorityPeer` components are updated instantly)
- than the server sends messages to clients to notify them of an authority change. Upon receiving the message, the client will add or remove the `HasAuthority` component as needed.

### Caveats

- There could be a time where both the client and server have authority at the same time
- server is transferring authority from itself to a client: there is a period of time where
no peer has authority, which is ok.
- server is transferring authority from a client to itself: there is a period of time where
both the client and server have authority. The client's updates won't be accepted by the server because it has authority, and the server's updates won't be accepted by the client because it
has authority, so no updates will be applied.

- server is transferring authority from client C1 to client C2:
- if C1 receives the message first, then for a short period of time no client has authority, which is ok
- if C2 receives the message first, then for a short period of time both clients have authority. However the `AuthorityPeer` is immediately updated on the server, so the server will only
accept updates from C2, and will discard the updates from C1.

TODO:
- maybe let the client always accept updates from the server, even if the client has `HasAuthority`? What is the goal of disallowing the client to accept updates from the server if it has
`HasAuthority`?
- maybe include a timestamp/tick to the `ChangeAuthority` messages so that any in-flight replication updates can be handled correctly?
- maybe have an API `request_authority` where the client requests the authority? and receives a response from the server telling it if the request is accepted or not?
5 changes: 5 additions & 0 deletions lightyear/src/channel/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,8 @@ pub struct PongChannel;
#[derive(ChannelInternal)]
/// Default channel to send inputs from client to server. This is a Sequenced Unreliable channel.
pub struct InputChannel;

#[derive(ChannelInternal)]
/// Channel to send messages related to Authority transfers
/// This is an Ordered Reliable channel
pub struct AuthorityChannel;
6 changes: 3 additions & 3 deletions lightyear/src/client/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Specify how a Client sends/receives messages with a Server
use bevy::ecs::component::Tick as BevyTick;
use bevy::ecs::entity::MapEntities;
use bevy::prelude::{Commands, Resource};
use bevy::prelude::{Resource, World};
use bevy::utils::{Duration, HashMap};
use bytes::Bytes;
use tracing::{debug, trace, trace_span};
Expand Down Expand Up @@ -387,7 +387,7 @@ impl ConnectionManager {

pub(crate) fn receive(
&mut self,
commands: &mut Commands,
world: &mut World,
// TODO: pass the `ComponentRegistry`/`MessageRegistry` as arguments instead of storing a copy
// in the `ConnectionManager`
time_manager: &TimeManager,
Expand Down Expand Up @@ -471,7 +471,7 @@ impl ConnectionManager {
if self.sync_manager.is_synced() {
// Check if we have any replication messages we can apply to the World (and emit events)
self.replication_receiver.apply_world(
commands,
world,
None,
&self.component_registry,
tick_manager.tick(),
Expand Down
30 changes: 23 additions & 7 deletions lightyear/src/client/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ impl Plugin for ClientNetworkingPlugin {
)
.add_systems(
PreUpdate,
(listen_io_state, receive).in_set(InternalMainSet::<ClientMarker>::Receive),
(listen_io_state, (receive_packets, receive).chain())
.in_set(InternalMainSet::<ClientMarker>::Receive),
)
// TODO: make HostServer a computed state?
.add_systems(
Expand Down Expand Up @@ -125,8 +126,7 @@ impl Plugin for ClientNetworkingPlugin {
}
}

pub(crate) fn receive(
mut commands: Commands,
pub(crate) fn receive_packets(
mut connection: ResMut<ConnectionManager>,
state: Res<State<NetworkingState>>,
mut next_state: ResMut<NextState<NetworkingState>>,
Expand Down Expand Up @@ -178,11 +178,27 @@ pub(crate) fn receive(
.recv_packet(packet, tick_manager.as_ref(), component_registry.as_ref())
.unwrap();
}
// RECEIVE: receive packets from message managers
let _ = connection
.receive(&mut commands, time_manager.as_ref(), tick_manager.as_ref())
}

/// Read from internal buffers and apply the changes to the world
pub(crate) fn receive(world: &mut World) {
let unsafe_world = world.as_unsafe_world_cell();

// TODO: an alternative would be to use `Commands + EntityMut` which both don't conflict with resources
// SAFETY: we guarantee that the `world` is not used in `connection_manager.receive` to update
// these resources
let mut connection_manager =
unsafe { unsafe_world.get_resource_mut::<ConnectionManager>() }.unwrap();
let time_manager = unsafe { unsafe_world.get_resource::<TimeManager>() }.unwrap();
let tick_manager = unsafe { unsafe_world.get_resource::<TickManager>() }.unwrap();
// RECEIVE: read messages and parse them into events
let _ = connection_manager
.receive(
unsafe { unsafe_world.world_mut() },
time_manager,
tick_manager,
)
.inspect_err(|e| error!("Error receiving packets: {}", e));
trace!("client finished recv");
}

pub(crate) fn send(
Expand Down
39 changes: 37 additions & 2 deletions lightyear/src/client/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@ use crate::shared::sets::{ClientMarker, InternalReplicationSet};

pub(crate) mod receive {
use super::*;
use crate::prelude::client::MessageEvent;
use crate::prelude::{
client::{is_connected, is_synced},
is_host_server,
};
use crate::shared::replication::authority::{AuthorityChange, HasAuthority};
use crate::shared::sets::InternalMainSet;

#[derive(Default)]
pub struct ClientReplicationReceivePlugin {
pub tick_interval: Duration,
Expand All @@ -38,6 +42,29 @@ pub(crate) mod receive {
.and_then(not(is_host_server)),
),
);

app.add_systems(
PreUpdate,
handle_authority_change.after(InternalMainSet::<ClientMarker>::EmitEvents),
);
}
}

/// Apply authority changes requested by the server
// TODO: use observer to handle these?
fn handle_authority_change(
mut commands: Commands,
mut messages: ResMut<Events<MessageEvent<AuthorityChange>>>,
) {
for message in messages.drain() {
let entity = message.message.entity;
if let Some(mut entity_mut) = commands.get_entity(entity) {
if message.message.gain_authority {
entity_mut.insert(HasAuthority);
} else {
entity_mut.remove::<HasAuthority>();
}
}
}
}
}
Expand All @@ -59,7 +86,10 @@ pub(crate) mod send {

use crate::shared::replication::components::{Replicating, ReplicationGroupId};

use crate::shared::replication::archetypes::{get_erased_component, ReplicatedArchetypes};
use crate::shared::replication::archetypes::{
get_erased_component, ClientReplicatedArchetypes,
};
use crate::shared::replication::authority::HasAuthority;
use crate::shared::replication::error::ReplicationError;
use bevy::ecs::system::SystemChangeTick;
use bevy::ptr::Ptr;
Expand Down Expand Up @@ -149,6 +179,11 @@ pub(crate) mod send {
/// Marker indicating that the entity should be replicated to the server.
/// If this component is removed, the entity will be despawned on the server.
pub target: ReplicateToServer,
/// Marker component that indicates that the client has authority over the entity.
/// This means that this client:
/// - is allowed to send replication updates for this entity
/// - will not accept any replication messages for this entity
pub authority: HasAuthority,
/// The replication group defines how entities are grouped (sent as a single message) for replication.
///
/// After the entity is first replicated, the replication group of the entity should not be modified.
Expand Down Expand Up @@ -209,7 +244,7 @@ pub(crate) mod send {
pub(crate) fn replicate(
tick_manager: Res<TickManager>,
component_registry: Res<ComponentRegistry>,
mut replicated_archetypes: Local<ReplicatedArchetypes<ReplicateToServer>>,
mut replicated_archetypes: Local<ClientReplicatedArchetypes>,
system_ticks: SystemChangeTick,
mut set: ParamSet<(&World, ResMut<ConnectionManager>)>,
) {
Expand Down
10 changes: 9 additions & 1 deletion lightyear/src/protocol/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use bevy::utils::Duration;
use std::any::TypeId;
use std::collections::HashMap;

use crate::channel::builder::{Channel, ChannelBuilder, ChannelSettings, PongChannel};
use crate::channel::builder::{
AuthorityChannel, Channel, ChannelBuilder, ChannelSettings, PongChannel,
};
use crate::channel::builder::{
ChannelContainer, EntityActionsChannel, EntityUpdatesChannel, InputChannel, PingChannel,
};
Expand Down Expand Up @@ -110,6 +112,12 @@ impl ChannelRegistry {
// we always want to include the inputs in the packet
priority: f32::INFINITY,
});
registry.add_channel::<AuthorityChannel>(ChannelSettings {
mode: ChannelMode::OrderedReliable(ReliableSettings::default()),
send_frequency: Duration::default(),
// we want to send the authority transfers as soon as possible
priority: 10.0,
});
registry
}

Expand Down
Loading
Loading