diff --git a/benches/replication.rs b/benches/replication.rs index 0aa0305c2..06e1164dd 100644 --- a/benches/replication.rs +++ b/benches/replication.rs @@ -34,6 +34,7 @@ criterion_group!( criterion_main!(replication_benches); const NUM_ENTITIES: &[usize] = &[0, 10, 100, 1000, 10000]; +// const NUM_ENTITIES: &[usize] = &[1000]; /// Replicating N entity spawn from server to channel, with a local io fn send_float_insert_one_client(criterion: &mut Criterion) { diff --git a/benches/src/local_stepper.rs b/benches/src/local_stepper.rs index 6a43a5198..9534bc15d 100644 --- a/benches/src/local_stepper.rs +++ b/benches/src/local_stepper.rs @@ -118,18 +118,8 @@ impl LocalBevyStepper { let mut client_app = App::new(); client_app.add_plugins(( MinimalPlugins, - // LogPlugin::default(), - // .set(TaskPoolPlugin { - // task_pool_options: TaskPoolOptions { - // compute: TaskPoolThreadAssignmentPolicy { - // min_threads: available_parallelism(), - // max_threads: std::usize::MAX, - // percent: 1.0, - // }, - // ..default() - // }, - // }) - // .build(), + #[cfg(feature = "lightyear/trace")] + LogPlugin::default(), )); let auth = Authentication::Manual { server_addr, @@ -167,18 +157,8 @@ impl LocalBevyStepper { let mut server_app = App::new(); server_app.add_plugins(( MinimalPlugins, - // LogPlugin::default(), - // .set(TaskPoolPlugin { - // task_pool_options: TaskPoolOptions { - // compute: TaskPoolThreadAssignmentPolicy { - // min_threads: available_parallelism(), - // max_threads: std::usize::MAX, - // percent: 1.0, - // }, - // ..default() - // }, - // }) - // .build(), + #[cfg(feature = "lightyear/trace")] + LogPlugin::default(), )); let config = ServerConfig { shared: shared_config.clone(), diff --git a/lightyear/Cargo.toml b/lightyear/Cargo.toml index 64ca8e44d..8e4776242 100644 --- a/lightyear/Cargo.toml +++ b/lightyear/Cargo.toml @@ -13,6 +13,7 @@ license = "MIT OR Apache-2.0" exclude = ["/tests"] [features] +trace = [] metrics = [ "dep:metrics", "metrics-util", diff --git a/lightyear/src/packet/message_manager.rs b/lightyear/src/packet/message_manager.rs index 68469e15b..97d06d6c9 100644 --- a/lightyear/src/packet/message_manager.rs +++ b/lightyear/src/packet/message_manager.rs @@ -6,6 +6,8 @@ use bevy::reflect::Reflect; use bytes::Bytes; use crossbeam_channel::{Receiver, Sender}; use tracing::{error, info, trace}; +#[cfg(feature = "trace")] +use tracing::{instrument, Level}; use bitcode::buffer::BufferTrait; use bitcode::word_buffer::WordBuffer; @@ -70,6 +72,7 @@ impl MessageManager { } /// Update bookkeeping + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub fn update( &mut self, time_manager: &TimeManager, @@ -132,6 +135,7 @@ impl MessageManager { // was buffered, the user can just include the tick in the message itself. /// Buffer a message to be sent on this connection /// Returns the message id associated with the message, if there is one + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub fn buffer_send_with_priority( &mut self, message: RawData, @@ -149,6 +153,7 @@ impl MessageManager { // TODO: maybe pass TickManager instead of Tick? Find a more elegant way to pass extra data that might not be used? // (ticks are not purely necessary without client prediction) // maybe be generic over a Context ? + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub fn send_packets(&mut self, current_tick: Tick) -> anyhow::Result> { // Step 1. Get the list of packets to send from all channels // for each channel, prepare packets using the buffered messages that are ready to be sent @@ -244,6 +249,7 @@ impl MessageManager { /// Process packet received over the network as raw bytes /// Update the acks, and put the messages from the packets in internal buffers /// Returns the tick of the packet + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub fn recv_packet(&mut self, packet: Packet) -> anyhow::Result { // Step 1. Parse the packet let tick = packet.header().tick; @@ -306,6 +312,7 @@ impl MessageManager { /// Read all the messages in the internal buffers that are ready to be processed // TODO: this is where naia converts the messages to events and pushes them to an event queue // let be conservative and just return the messages right now. We could switch to an iterator + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub fn read_messages(&mut self) -> HashMap> { let mut map = HashMap::new(); for (channel_kind, channel) in self.channels.iter_mut() { diff --git a/lightyear/src/packet/packet_manager.rs b/lightyear/src/packet/packet_manager.rs index 63fb3ea49..8752a11d0 100644 --- a/lightyear/src/packet/packet_manager.rs +++ b/lightyear/src/packet/packet_manager.rs @@ -1,4 +1,7 @@ +//! Module to take a buffer of messages to send and build packets use std::collections::{BTreeMap, VecDeque}; +#[cfg(feature = "trace")] +use tracing::{instrument, Level}; use bitcode::encoding::Gamma; @@ -59,6 +62,7 @@ impl PacketBuilder { } /// Encode a packet into raw bytes + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn encode_packet(&mut self, packet: &Packet) -> anyhow::Result { // TODO: check that we haven't allocated! // self.clear_write_buffer(); @@ -289,6 +293,7 @@ impl PacketBuilder { // .collect::<_>() // } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub fn build_packets( &mut self, // TODO: change into IntoIterator? the order matters though! diff --git a/lightyear/src/packet/priority_manager.rs b/lightyear/src/packet/priority_manager.rs index 6b9ad92d2..69fa09ba0 100644 --- a/lightyear/src/packet/priority_manager.rs +++ b/lightyear/src/packet/priority_manager.rs @@ -6,6 +6,8 @@ use crossbeam_channel::{Receiver, Sender}; use governor::{DefaultDirectRateLimiter, Quota}; use nonzero_ext::*; use tracing::{debug, error, trace}; +#[cfg(feature = "trace")] +use tracing::{instrument, Level}; use crate::packet::message::{FragmentData, MessageContainer, MessageId, SingleData}; use crate::prelude::{ChannelKind, ChannelRegistry, Tick}; @@ -87,6 +89,7 @@ impl PriorityManager { /// Filter the messages by priority and bandwidth quota /// Returns the list of messages that we can send, along with the amount of bytes we used /// in the rate limiter. + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn priority_filter( &mut self, data: Vec<(NetId, (VecDeque, VecDeque))>, diff --git a/lightyear/src/server/connection.rs b/lightyear/src/server/connection.rs index 0dcff9cbf..6be56e293 100644 --- a/lightyear/src/server/connection.rs +++ b/lightyear/src/server/connection.rs @@ -8,7 +8,9 @@ use bevy::utils::{HashMap, HashSet}; use bytes::Bytes; use hashbrown::hash_map::Entry; use serde::Serialize; -use tracing::{debug, error, info, trace, trace_span, warn}; +use tracing::{debug, error, info, info_span, trace, trace_span, warn}; +#[cfg(feature = "trace")] +use tracing::{instrument, Level}; use crate::channel::builder::{EntityUpdatesChannel, PingChannel}; use bitcode::encoding::Fixed; @@ -302,17 +304,19 @@ impl ConnectionManager { /// Buffer all the replication messages to send. /// Keep track of the bevy Change Tick: when a message is acked, we know that we only have to send /// the updates since that Change Tick + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn buffer_replication_messages( &mut self, tick: Tick, bevy_tick: BevyTick, ) -> Result<()> { - let _span = trace_span!("buffer_replication_messages").entered(); + let _span = info_span!("buffer_replication_messages").entered(); self.connections .values_mut() .try_for_each(move |c| c.buffer_replication_messages(tick, bevy_tick)) } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn receive( &mut self, world: &mut World, diff --git a/lightyear/src/server/networking.rs b/lightyear/src/server/networking.rs index 25c0af16d..7e3ffbf76 100644 --- a/lightyear/src/server/networking.rs +++ b/lightyear/src/server/networking.rs @@ -247,13 +247,13 @@ pub(crate) fn send( }); // SEND_PACKETS: send buffered packets to io - let span = trace_span!("send_packets").entered(); + let span = info_span!("send_packets").entered(); connection_manager .connections .iter_mut() .try_for_each(|(client_id, connection)| { let client_span = - trace_span!("send_packets_to_client", client_id = ?client_id).entered(); + info_span!("send_packets_to_client", client_id = ?client_id).entered(); let netserver_idx = *netservers .client_server_map .get(client_id) diff --git a/lightyear/src/shared/replication/receive.rs b/lightyear/src/shared/replication/receive.rs index f9dd3662a..23719e3d9 100644 --- a/lightyear/src/shared/replication/receive.rs +++ b/lightyear/src/shared/replication/receive.rs @@ -8,6 +8,8 @@ use bevy::prelude::{DespawnRecursiveExt, Entity, World}; use bevy::reflect::Reflect; use bevy::utils::HashSet; use tracing::{debug, error, info, trace, trace_span, warn}; +#[cfg(feature = "trace")] +use tracing::{instrument, Level}; use crate::packet::message::MessageId; use crate::prelude::client::Confirmed; @@ -54,6 +56,7 @@ impl ReplicationReceiver { } /// Recv a new replication message and buffer it + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn recv_message(&mut self, message: ReplicationMessage, remote_tick: Tick) { trace!(?message, ?remote_tick, "Received replication message"); let channel = self.group_channels.entry(message.group_id).or_default(); @@ -120,6 +123,7 @@ impl ReplicationReceiver { /// /// /// Updates the `latest_tick` for this group + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn read_messages( &mut self, current_tick: Tick, @@ -202,6 +206,7 @@ impl ReplicationReceiver { /// Apply any replication messages to the world, and emit an event /// I think we don't need to emit a tick with the event anymore, because /// we can access the tick via the replication manager + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] #[allow(clippy::too_many_arguments)] pub(crate) fn apply_world( &mut self, @@ -214,7 +219,6 @@ impl ReplicationReceiver { group_id: ReplicationGroupId, events: &mut ConnectionEvents, ) { - let _span = trace_span!("Apply received replication message to world").entered(); match replication { ReplicationMessageData::Actions(m) => { debug!(?tick, ?m, "Received replication actions"); @@ -466,6 +470,7 @@ impl GroupChannel { /// This assumes that the sender sends all message ids sequentially. /// /// If had received updates that were waiting on a given action, we also return them + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] fn read_action(&mut self, current_tick: Tick) -> Option<(Tick, EntityActionMessage)> { // TODO: maybe only get the message if our local client tick is >= to it? (so that we don't apply an update from the future) let message = self @@ -489,6 +494,7 @@ impl GroupChannel { Some(message) } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] fn read_buffered_updates(&mut self) -> Vec<(Tick, EntityUpdatesMessage)> { // if we haven't applied any actions (latest_tick is None) we cannot apply any updates let Some(latest_tick) = self.latest_tick else { @@ -525,6 +531,7 @@ impl GroupChannel { res } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] fn read_messages(&mut self, current_tick: Tick) -> Option> { let mut res = Vec::new(); diff --git a/lightyear/src/shared/replication/send.rs b/lightyear/src/shared/replication/send.rs index 7016acf13..488c65c99 100644 --- a/lightyear/src/shared/replication/send.rs +++ b/lightyear/src/shared/replication/send.rs @@ -15,6 +15,8 @@ use bevy::utils::{hashbrown, HashMap, HashSet}; use crossbeam_channel::Receiver; use hashbrown::hash_map::Entry; use tracing::{debug, error, info, trace, warn}; +#[cfg(feature = "trace")] +use tracing::{instrument, Level}; use crate::packet::message::MessageId; use crate::prelude::{ComponentRegistry, ShouldBePredicted, Tick}; @@ -117,6 +119,7 @@ impl ReplicationSender { /// Keep track of the message_id/bevy_tick/tick where a replication-update message has been sent /// for a given group + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn buffer_replication_update_message( &mut self, group_id: ReplicationGroupId, @@ -199,6 +202,7 @@ impl ReplicationSender { /// Then we accumulate the priority for all replication groups. /// /// This should be call after the Send SystemSet. + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn recv_send_notification(&mut self) { if !self.bandwidth_cap_enabled { return; @@ -243,6 +247,7 @@ impl ReplicationSender { // TODO: call this in a system after receive? /// We call this after the Receive SystemSet; to update the bevy_tick at which we received entity updates for each group + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn recv_update_acks( &mut self, component_registry: &ComponentRegistry, @@ -323,6 +328,7 @@ impl ReplicationSender { /// Host has spawned an entity, and we want to replicate this to remote /// Returns true if we should send a message + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn prepare_entity_spawn(&mut self, entity: Entity, group_id: ReplicationGroupId) { self.pending_actions .entry(group_id) @@ -334,6 +340,7 @@ impl ReplicationSender { /// Host wants to start replicating an entity, but instead of spawning a new entity, it wants to reuse an existing entity /// on the remote. This can be useful for transferring ownership of an entity from one player to another. + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn prepare_entity_spawn_reuse( &mut self, local_entity: Entity, @@ -348,6 +355,7 @@ impl ReplicationSender { .spawn = SpawnAction::Reuse(remote_entity.to_bits()); } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn prepare_entity_despawn(&mut self, entity: Entity, group_id: ReplicationGroupId) { self.pending_actions .entry(group_id) @@ -360,6 +368,7 @@ impl ReplicationSender { // we want to send all component inserts that happen together for the same entity in a single message // (because otherwise the inserts might be received at different packets/ticks by the remote, and // the remote might expect the components insert to be received at the same time) + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn prepare_component_insert( &mut self, entity: Entity, @@ -376,6 +385,7 @@ impl ReplicationSender { .push(component); } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn prepare_component_remove( &mut self, entity: Entity, @@ -392,6 +402,7 @@ impl ReplicationSender { .insert(kind); } + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn prepare_component_update( &mut self, entity: Entity, @@ -407,6 +418,7 @@ impl ReplicationSender { } /// Create a component update. + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] #[allow(clippy::too_many_arguments)] pub(crate) fn prepare_delta_component_update( &mut self, @@ -456,6 +468,7 @@ impl ReplicationSender { } /// Finalize the replication messages + #[cfg_attr(feature = "trace", instrument(level = Level::INFO, skip_all))] pub(crate) fn finalize( &mut self, tick: Tick,