Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(networking): Support restarting network session #384

Merged
merged 4 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 76 additions & 8 deletions framework_crates/bones_framework/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl From<ggrs::InputStatus> for NetworkInputStatus {

/// Module prelude.
pub mod prelude {
pub use super::{certs, debug::prelude::*, input, lan, online, proto};
pub use super::{certs, debug::prelude::*, input, lan, online, proto, NetworkInfo};
}

/// Muliplier for framerate that will be used when playing an online match.
Expand Down Expand Up @@ -138,7 +138,27 @@ pub struct NetworkMatchSocket(Arc<dyn NetworkSocket>);
/// A type-erased [`ggrs::NonBlockingSocket`]
/// implementation.
#[derive(Deref, DerefMut)]
pub struct BoxedNonBlockingSocket(Box<dyn ggrs::NonBlockingSocket<usize> + 'static>);
pub struct BoxedNonBlockingSocket(Box<dyn GgrsSocket>);

impl Clone for BoxedNonBlockingSocket {
fn clone(&self) -> Self {
self.ggrs_socket()
}
}

/// Wraps [`ggrs::Message`] with included `match_id`, used to determine if message received
/// from current match.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GameMessage {
/// Socket match id
pub match_id: u8,
/// Wrapped message
pub message: ggrs::Message,
}

/// Automatically implemented for [`NetworkSocket`] + [`ggrs::NonBlockingSocket<usize>`].
pub trait GgrsSocket: NetworkSocket + ggrs::NonBlockingSocket<usize> {}
impl<T> GgrsSocket for T where T: NetworkSocket + ggrs::NonBlockingSocket<usize> {}

impl ggrs::NonBlockingSocket<usize> for BoxedNonBlockingSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
Expand Down Expand Up @@ -170,6 +190,10 @@ pub trait NetworkSocket: Sync + Send {
fn player_is_local(&self) -> [bool; MAX_PLAYERS];
/// Get the player count for this network match.
fn player_count(&self) -> usize;

/// Increment match id so messages from previous match that are still in flight
/// will be filtered out. Used when starting new session with existing socket.
fn increment_match_id(&mut self);
}

/// The destination for a reliable network message.
Expand All @@ -180,6 +204,17 @@ pub enum SocketTarget {
All,
}

/// Resource updated each frame exposing current frame and last confirmed of online session.
#[derive(HasSchema, Copy, Clone, Default)]
pub struct NetworkInfo {
/// Current frame of simulation step
pub current_frame: i32,

/// Last confirmed frame by all clients.
/// Anything that occurred on this frame is agreed upon by all clients.
pub last_confirmed_frame: i32,
}

/// [`SessionRunner`] implementation that uses [`ggrs`] for network play.
///
/// This is where the whole `ggrs` integration is implemented.
Expand Down Expand Up @@ -207,11 +242,18 @@ pub struct GgrsSessionRunner<'a, InputTypes: NetworkInputConfig<'a>> {

/// Session runner's input collector.
pub input_collector: InputTypes::InputCollector,

/// Store copy of socket to be able to restart session runner with existing socket.
socket: BoxedNonBlockingSocket,

/// Local input delay ggrs session was initialized with
local_input_delay: usize,
}

/// The info required to create a [`GgrsSessionRunner`].
#[derive(Clone)]
pub struct GgrsSessionRunnerInfo {
/// The GGRS socket implementation to use.
/// The socket that will be converted into GGRS socket implementation.
pub socket: BoxedNonBlockingSocket,
/// The list of local players.
pub player_is_local: [bool; MAX_PLAYERS],
Expand All @@ -233,14 +275,16 @@ pub struct GgrsSessionRunnerInfo {
impl GgrsSessionRunnerInfo {
/// See [`GgrsSessionRunnerInfo`] fields for info on arguments.
pub fn new(
socket: &dyn NetworkSocket,
socket: BoxedNonBlockingSocket,
max_prediction_window: Option<usize>,
local_input_delay: Option<usize>,
) -> Self {
let player_is_local = socket.0.player_is_local();
let player_count = socket.0.player_count();
Self {
socket: socket.ggrs_socket(),
player_is_local: socket.player_is_local(),
player_count: socket.player_count(),
socket,
player_is_local,
player_count,
max_prediction_window,
local_input_delay,
}
Expand Down Expand Up @@ -299,7 +343,7 @@ where
let local_player_idx =
local_player_idx.expect("Networking player_is_local array has no local players.");

let session = builder.start_p2p_session(info.socket).unwrap();
let session = builder.start_p2p_session(info.socket.clone()).unwrap();

Self {
last_player_input: InputTypes::Dense::default(),
Expand All @@ -310,6 +354,8 @@ where
last_run: None,
network_fps: network_fps as f64,
input_collector: InputTypes::InputCollector::default(),
socket: info.socket.clone(),
local_input_delay,
}
}
}
Expand Down Expand Up @@ -458,6 +504,11 @@ where
// Input has been consumed, signal that we are in new input frame
self.input_collector.advance_frame();

world.insert_resource(NetworkInfo {
current_frame: self.session.current_frame(),
last_confirmed_frame: self.session.confirmed_frame(),
});

{
world
.resource_mut::<Time>()
Expand Down Expand Up @@ -525,4 +576,21 @@ where
.unwrap();
}
}

fn restart_session(&mut self) {
// Rebuild session info from runner + create new runner

// Increment match id so messages from previous match that are still in flight
// will be filtered out.
self.socket.0.increment_match_id();

let runner_info = GgrsSessionRunnerInfo {
socket: self.socket.clone(),
player_is_local: self.player_is_local,
player_count: self.session.num_players(),
max_prediction_window: Some(self.session.max_prediction()),
local_input_delay: Some(self.local_input_delay),
};
*self = GgrsSessionRunner::new(self.network_fps as f32, runner_info);
}
}
58 changes: 37 additions & 21 deletions framework_crates/bones_framework/src/networking/lan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -535,10 +535,12 @@ pub enum LanMatchmakerResponse {
pub struct LanSocket {
///
pub connections: [Option<quinn::Connection>; MAX_PLAYERS],
pub ggrs_receiver: async_channel::Receiver<(usize, ggrs::Message)>,
pub ggrs_receiver: async_channel::Receiver<(usize, GameMessage)>,
pub reliable_receiver: async_channel::Receiver<(usize, Vec<u8>)>,
pub player_idx: usize,
pub player_count: usize,
/// ID for current match, messages received that do not match ID are dropped.
pub match_id: u8,
}

impl LanSocket {
Expand Down Expand Up @@ -578,7 +580,7 @@ impl LanSocket {
}
either::Either::Right(datagram_result) => match datagram_result {
Ok(data) => {
let message: ggrs::Message = postcard::from_bytes(&data)
let message: GameMessage = postcard::from_bytes(&data)
.expect("Could not deserialize net message");

// Debugging code to introduce artificial latency
Expand Down Expand Up @@ -662,29 +664,11 @@ impl LanSocket {
connections,
ggrs_receiver,
reliable_receiver,
match_id: 0,
}
}
}

impl ggrs::NonBlockingSocket<usize> for LanSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
let conn = self.connections[*addr].as_ref().unwrap();

// TODO: determine a reasonable size for this buffer.
let msg_bytes = postcard::to_allocvec(msg).unwrap();
conn.send_datagram(Bytes::copy_from_slice(&msg_bytes[..]))
.ok();
}

fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_receiver.try_recv() {
messages.push(message);
}
messages
}
}

impl NetworkSocket for LanSocket {
fn send_reliable(&self, target: SocketTarget, message: &[u8]) {
let task_pool = IoTaskPool::get();
Expand Down Expand Up @@ -748,6 +732,38 @@ impl NetworkSocket for LanSocket {
fn player_is_local(&self) -> [bool; MAX_PLAYERS] {
std::array::from_fn(|i| self.connections[i].is_none() && i < self.player_count)
}

fn increment_match_id(&mut self) {
self.match_id = self.match_id.wrapping_add(1);
}
}

impl ggrs::NonBlockingSocket<usize> for LanSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
let msg = GameMessage {
// Consider a way we can send message by reference and avoid clone?
message: msg.clone(),
match_id: self.match_id,
};
let conn = self.connections[*addr].as_ref().unwrap();
let message = bones_matchmaker_proto::SendProxyMessage {
target_client: bones_matchmaker_proto::TargetClient::One(*addr as u8),
message: postcard::to_allocvec(&msg).unwrap(),
};
let msg_bytes = postcard::to_allocvec(&message).unwrap();
conn.send_datagram(Bytes::copy_from_slice(&msg_bytes[..]))
.ok();
}

fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_receiver.try_recv() {
if message.1.match_id == self.match_id {
messages.push((message.0, message.1.message));
}
}
messages
}
}

fn pinger(server: BiChannelServer<PingerRequest, PingerResponse>) {
Expand Down
25 changes: 20 additions & 5 deletions framework_crates/bones_framework/src/networking/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use tracing::{info, warn};

use crate::{networking::NetworkMatchSocket, prelude::*};

use super::{BoxedNonBlockingSocket, NetworkSocket, SocketTarget, MAX_PLAYERS, NETWORK_ENDPOINT};
use super::{
BoxedNonBlockingSocket, GameMessage, NetworkSocket, SocketTarget, MAX_PLAYERS, NETWORK_ENDPOINT,
};

#[derive(Default, PartialEq, Eq, Clone, Copy)]
pub enum SearchState {
Expand Down Expand Up @@ -198,10 +200,12 @@ fn resolve_addr_blocking(addr: &str) -> anyhow::Result<SocketAddr> {
#[derive(Debug, Clone)]
pub struct OnlineSocket {
pub conn: Connection,
pub ggrs_receiver: async_channel::Receiver<(usize, ggrs::Message)>,
pub ggrs_receiver: async_channel::Receiver<(usize, GameMessage)>,
pub reliable_receiver: async_channel::Receiver<(usize, Vec<u8>)>,
pub player_idx: usize,
pub player_count: usize,
/// ID for current match, messages received that do not match ID are dropped.
pub match_id: u8,
}

impl OnlineSocket {
Expand Down Expand Up @@ -292,11 +296,11 @@ impl OnlineSocket {
reliable_receiver,
player_idx,
player_count,
match_id: 0,
}
}
}

// TODO see zig's PR
impl NetworkSocket for OnlineSocket {
fn ggrs_socket(&self) -> BoxedNonBlockingSocket {
BoxedNonBlockingSocket(Box::new(self.clone()))
Expand Down Expand Up @@ -349,13 +353,22 @@ impl NetworkSocket for OnlineSocket {
fn player_count(&self) -> usize {
self.player_count
}

fn increment_match_id(&mut self) {
// This is wrapping addition
self.match_id = self.match_id.wrapping_add(1);
}
}

impl ggrs::NonBlockingSocket<usize> for OnlineSocket {
fn send_to(&mut self, msg: &ggrs::Message, addr: &usize) {
let msg = GameMessage {
message: msg.clone(),
match_id: self.match_id,
};
let message = bones_matchmaker_proto::SendProxyMessage {
target_client: bones_matchmaker_proto::TargetClient::One(*addr as u8),
message: postcard::to_allocvec(msg).unwrap(),
message: postcard::to_allocvec(&msg).unwrap(),
};
let msg_bytes = postcard::to_allocvec(&message).unwrap();
self.conn
Expand All @@ -366,7 +379,9 @@ impl ggrs::NonBlockingSocket<usize> for OnlineSocket {
fn receive_all_messages(&mut self) -> Vec<(usize, ggrs::Message)> {
let mut messages = Vec::new();
while let Ok(message) = self.ggrs_receiver.try_recv() {
messages.push(message);
if message.1.match_id == self.match_id {
messages.push((message.0, message.1.message));
}
}
messages
}
Expand Down
13 changes: 13 additions & 0 deletions framework_crates/bones_lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,16 @@ pub trait SessionRunner: Sync + Send + 'static {
/// world.resource_mut::<Time>().update_with_instant(now);
/// stages.run(world);
/// }
/// fn restart_session(&mut self) {}
/// # }
/// ```
fn step(&mut self, now: Instant, world: &mut World, stages: &mut SystemStages);

/// Restart Session Runner. This should reset accumulated time, inputs, etc.
///
/// The expectation is that current players using it may continue to, so something like a network
/// socket or player info should persist.
fn restart_session(&mut self);
}

/// The default [`SessionRunner`], which just runs the systems once every time it is run.
Expand All @@ -153,6 +160,12 @@ impl SessionRunner for DefaultSessionRunner {
world.resource_mut::<Time>().update_with_instant(now);
stages.run(world)
}

// This is a no-op as no state, but implemented this way in case that changes later.
#[allow(clippy::default_constructed_unit_structs)]
fn restart_session(&mut self) {
*self = DefaultSessionRunner::default();
}
}

/// The [`Game`] encompasses a complete bones game's logic, independent of the renderer and IO
Expand Down
Loading