Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add trace feature with lots of trace instrumentation #422

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ criterion_group!(
criterion_main!(replication_benches);

const NUM_ENTITIES: &[usize] = &[0, 10, 100, 1000, 10000];
// const NUM_ENTITIES: &[usize] = &[1000];

/// Replicating N entity spawn from server to channel, with a local io
fn send_float_insert_one_client(criterion: &mut Criterion) {
Expand Down
28 changes: 4 additions & 24 deletions benches/src/local_stepper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,8 @@ impl LocalBevyStepper {
let mut client_app = App::new();
client_app.add_plugins((
MinimalPlugins,
// LogPlugin::default(),
// .set(TaskPoolPlugin {
// task_pool_options: TaskPoolOptions {
// compute: TaskPoolThreadAssignmentPolicy {
// min_threads: available_parallelism(),
// max_threads: std::usize::MAX,
// percent: 1.0,
// },
// ..default()
// },
// })
// .build(),
#[cfg(feature = "lightyear/trace")]
LogPlugin::default(),
));
let auth = Authentication::Manual {
server_addr,
Expand Down Expand Up @@ -167,18 +157,8 @@ impl LocalBevyStepper {
let mut server_app = App::new();
server_app.add_plugins((
MinimalPlugins,
// LogPlugin::default(),
// .set(TaskPoolPlugin {
// task_pool_options: TaskPoolOptions {
// compute: TaskPoolThreadAssignmentPolicy {
// min_threads: available_parallelism(),
// max_threads: std::usize::MAX,
// percent: 1.0,
// },
// ..default()
// },
// })
// .build(),
#[cfg(feature = "lightyear/trace")]
LogPlugin::default(),
));
let config = ServerConfig {
shared: shared_config.clone(),
Expand Down
1 change: 1 addition & 0 deletions lightyear/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ license = "MIT OR Apache-2.0"
exclude = ["/tests"]

[features]
trace = []
metrics = [
"dep:metrics",
"metrics-util",
Expand Down
7 changes: 7 additions & 0 deletions lightyear/src/packet/message_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use bevy::reflect::Reflect;
use bytes::Bytes;
use crossbeam_channel::{Receiver, Sender};
use tracing::{error, info, trace};
#[cfg(feature = "trace")]
use tracing::{instrument, Level};

use bitcode::buffer::BufferTrait;
use bitcode::word_buffer::WordBuffer;
Expand Down Expand Up @@ -70,6 +72,7 @@ impl MessageManager {
}

/// Update bookkeeping
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn update(
&mut self,
time_manager: &TimeManager,
Expand Down Expand Up @@ -132,6 +135,7 @@ impl MessageManager {
// was buffered, the user can just include the tick in the message itself.
/// Buffer a message to be sent on this connection
/// Returns the message id associated with the message, if there is one
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn buffer_send_with_priority(
&mut self,
message: RawData,
Expand All @@ -149,6 +153,7 @@ impl MessageManager {
// TODO: maybe pass TickManager instead of Tick? Find a more elegant way to pass extra data that might not be used?
// (ticks are not purely necessary without client prediction)
// maybe be generic over a Context ?
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn send_packets(&mut self, current_tick: Tick) -> anyhow::Result<Vec<Payload>> {
// Step 1. Get the list of packets to send from all channels
// for each channel, prepare packets using the buffered messages that are ready to be sent
Expand Down Expand Up @@ -244,6 +249,7 @@ impl MessageManager {
/// Process packet received over the network as raw bytes
/// Update the acks, and put the messages from the packets in internal buffers
/// Returns the tick of the packet
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn recv_packet(&mut self, packet: Packet) -> anyhow::Result<Tick> {
// Step 1. Parse the packet
let tick = packet.header().tick;
Expand Down Expand Up @@ -306,6 +312,7 @@ impl MessageManager {
/// Read all the messages in the internal buffers that are ready to be processed
// TODO: this is where naia converts the messages to events and pushes them to an event queue
// let be conservative and just return the messages right now. We could switch to an iterator
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn read_messages(&mut self) -> HashMap<ChannelKind, Vec<(Tick, Bytes)>> {
let mut map = HashMap::new();
for (channel_kind, channel) in self.channels.iter_mut() {
Expand Down
5 changes: 5 additions & 0 deletions lightyear/src/packet/packet_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
//! Module to take a buffer of messages to send and build packets
use std::collections::{BTreeMap, VecDeque};
#[cfg(feature = "trace")]
use tracing::{instrument, Level};

use bitcode::encoding::Gamma;

Expand Down Expand Up @@ -59,6 +62,7 @@ impl PacketBuilder {
}

/// Encode a packet into raw bytes
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn encode_packet(&mut self, packet: &Packet) -> anyhow::Result<Payload> {
// TODO: check that we haven't allocated!
// self.clear_write_buffer();
Expand Down Expand Up @@ -289,6 +293,7 @@ impl PacketBuilder {
// .collect::<_>()
// }

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub fn build_packets(
&mut self,
// TODO: change into IntoIterator? the order matters though!
Expand Down
3 changes: 3 additions & 0 deletions lightyear/src/packet/priority_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crossbeam_channel::{Receiver, Sender};
use governor::{DefaultDirectRateLimiter, Quota};
use nonzero_ext::*;
use tracing::{debug, error, trace};
#[cfg(feature = "trace")]
use tracing::{instrument, Level};

use crate::packet::message::{FragmentData, MessageContainer, MessageId, SingleData};
use crate::prelude::{ChannelKind, ChannelRegistry, Tick};
Expand Down Expand Up @@ -87,6 +89,7 @@ impl PriorityManager {
/// Filter the messages by priority and bandwidth quota
/// Returns the list of messages that we can send, along with the amount of bytes we used
/// in the rate limiter.
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn priority_filter(
&mut self,
data: Vec<(NetId, (VecDeque<SingleData>, VecDeque<FragmentData>))>,
Expand Down
8 changes: 6 additions & 2 deletions lightyear/src/server/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use bevy::utils::{HashMap, HashSet};
use bytes::Bytes;
use hashbrown::hash_map::Entry;
use serde::Serialize;
use tracing::{debug, error, info, trace, trace_span, warn};
use tracing::{debug, error, info, info_span, trace, trace_span, warn};
#[cfg(feature = "trace")]
use tracing::{instrument, Level};

use crate::channel::builder::{EntityUpdatesChannel, PingChannel};
use bitcode::encoding::Fixed;
Expand Down Expand Up @@ -302,17 +304,19 @@ impl ConnectionManager {
/// Buffer all the replication messages to send.
/// Keep track of the bevy Change Tick: when a message is acked, we know that we only have to send
/// the updates since that Change Tick
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn buffer_replication_messages(
&mut self,
tick: Tick,
bevy_tick: BevyTick,
) -> Result<()> {
let _span = trace_span!("buffer_replication_messages").entered();
let _span = info_span!("buffer_replication_messages").entered();
self.connections
.values_mut()
.try_for_each(move |c| c.buffer_replication_messages(tick, bevy_tick))
}

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn receive(
&mut self,
world: &mut World,
Expand Down
4 changes: 2 additions & 2 deletions lightyear/src/server/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,13 @@ pub(crate) fn send(
});

// SEND_PACKETS: send buffered packets to io
let span = trace_span!("send_packets").entered();
let span = info_span!("send_packets").entered();
connection_manager
.connections
.iter_mut()
.try_for_each(|(client_id, connection)| {
let client_span =
trace_span!("send_packets_to_client", client_id = ?client_id).entered();
info_span!("send_packets_to_client", client_id = ?client_id).entered();
let netserver_idx = *netservers
.client_server_map
.get(client_id)
Expand Down
9 changes: 8 additions & 1 deletion lightyear/src/shared/replication/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use bevy::prelude::{DespawnRecursiveExt, Entity, World};
use bevy::reflect::Reflect;
use bevy::utils::HashSet;
use tracing::{debug, error, info, trace, trace_span, warn};
#[cfg(feature = "trace")]
use tracing::{instrument, Level};

use crate::packet::message::MessageId;
use crate::prelude::client::Confirmed;
Expand Down Expand Up @@ -54,6 +56,7 @@ impl ReplicationReceiver {
}

/// Recv a new replication message and buffer it
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn recv_message(&mut self, message: ReplicationMessage, remote_tick: Tick) {
trace!(?message, ?remote_tick, "Received replication message");
let channel = self.group_channels.entry(message.group_id).or_default();
Expand Down Expand Up @@ -120,6 +123,7 @@ impl ReplicationReceiver {
///
///
/// Updates the `latest_tick` for this group
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn read_messages(
&mut self,
current_tick: Tick,
Expand Down Expand Up @@ -202,6 +206,7 @@ impl ReplicationReceiver {
/// Apply any replication messages to the world, and emit an event
/// I think we don't need to emit a tick with the event anymore, because
/// we can access the tick via the replication manager
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn apply_world(
&mut self,
Expand All @@ -214,7 +219,6 @@ impl ReplicationReceiver {
group_id: ReplicationGroupId,
events: &mut ConnectionEvents,
) {
let _span = trace_span!("Apply received replication message to world").entered();
match replication {
ReplicationMessageData::Actions(m) => {
debug!(?tick, ?m, "Received replication actions");
Expand Down Expand Up @@ -466,6 +470,7 @@ impl GroupChannel {
/// This assumes that the sender sends all message ids sequentially.
///
/// If had received updates that were waiting on a given action, we also return them
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
fn read_action(&mut self, current_tick: Tick) -> Option<(Tick, EntityActionMessage)> {
// TODO: maybe only get the message if our local client tick is >= to it? (so that we don't apply an update from the future)
let message = self
Expand All @@ -489,6 +494,7 @@ impl GroupChannel {
Some(message)
}

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
fn read_buffered_updates(&mut self) -> Vec<(Tick, EntityUpdatesMessage)> {
// if we haven't applied any actions (latest_tick is None) we cannot apply any updates
let Some(latest_tick) = self.latest_tick else {
Expand Down Expand Up @@ -525,6 +531,7 @@ impl GroupChannel {
res
}

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
fn read_messages(&mut self, current_tick: Tick) -> Option<Vec<(Tick, ReplicationMessageData)>> {
let mut res = Vec::new();

Expand Down
13 changes: 13 additions & 0 deletions lightyear/src/shared/replication/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use bevy::utils::{hashbrown, HashMap, HashSet};
use crossbeam_channel::Receiver;
use hashbrown::hash_map::Entry;
use tracing::{debug, error, info, trace, warn};
#[cfg(feature = "trace")]
use tracing::{instrument, Level};

use crate::packet::message::MessageId;
use crate::prelude::{ComponentRegistry, ShouldBePredicted, Tick};
Expand Down Expand Up @@ -117,6 +119,7 @@ impl ReplicationSender {

/// Keep track of the message_id/bevy_tick/tick where a replication-update message has been sent
/// for a given group
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn buffer_replication_update_message(
&mut self,
group_id: ReplicationGroupId,
Expand Down Expand Up @@ -199,6 +202,7 @@ impl ReplicationSender {
/// Then we accumulate the priority for all replication groups.
///
/// This should be call after the Send SystemSet.
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn recv_send_notification(&mut self) {
if !self.bandwidth_cap_enabled {
return;
Expand Down Expand Up @@ -243,6 +247,7 @@ impl ReplicationSender {

// TODO: call this in a system after receive?
/// We call this after the Receive SystemSet; to update the bevy_tick at which we received entity updates for each group
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn recv_update_acks(
&mut self,
component_registry: &ComponentRegistry,
Expand Down Expand Up @@ -323,6 +328,7 @@ impl ReplicationSender {

/// Host has spawned an entity, and we want to replicate this to remote
/// Returns true if we should send a message
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn prepare_entity_spawn(&mut self, entity: Entity, group_id: ReplicationGroupId) {
self.pending_actions
.entry(group_id)
Expand All @@ -334,6 +340,7 @@ impl ReplicationSender {

/// Host wants to start replicating an entity, but instead of spawning a new entity, it wants to reuse an existing entity
/// on the remote. This can be useful for transferring ownership of an entity from one player to another.
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn prepare_entity_spawn_reuse(
&mut self,
local_entity: Entity,
Expand All @@ -348,6 +355,7 @@ impl ReplicationSender {
.spawn = SpawnAction::Reuse(remote_entity.to_bits());
}

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn prepare_entity_despawn(&mut self, entity: Entity, group_id: ReplicationGroupId) {
self.pending_actions
.entry(group_id)
Expand All @@ -360,6 +368,7 @@ impl ReplicationSender {
// we want to send all component inserts that happen together for the same entity in a single message
// (because otherwise the inserts might be received at different packets/ticks by the remote, and
// the remote might expect the components insert to be received at the same time)
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn prepare_component_insert(
&mut self,
entity: Entity,
Expand All @@ -376,6 +385,7 @@ impl ReplicationSender {
.push(component);
}

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn prepare_component_remove(
&mut self,
entity: Entity,
Expand All @@ -392,6 +402,7 @@ impl ReplicationSender {
.insert(kind);
}

#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn prepare_component_update(
&mut self,
entity: Entity,
Expand All @@ -407,6 +418,7 @@ impl ReplicationSender {
}

/// Create a component update.
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
#[allow(clippy::too_many_arguments)]
pub(crate) fn prepare_delta_component_update(
&mut self,
Expand Down Expand Up @@ -456,6 +468,7 @@ impl ReplicationSender {
}

/// Finalize the replication messages
#[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))]
pub(crate) fn finalize(
&mut self,
tick: Tick,
Expand Down