Skip to content

Commit

Permalink
Fix some gateway deserialization errors (#577)
Browse files Browse the repository at this point in the history
## Description:

- Discord is messing with their api again, read states are a mystery
- Refactored the gateway to fully use the `Opcode` enum instead of constants (4ed68ce)
- Added Last Messages request and response (4ed68ce)
- Fixed a deserialization error related to `presences` in `GuildMembersChunk` being an array, not a single value (4baecf9)
- Fixed a deserialization error with deserializing `activities` in `PresenceUpdate` as an empty array when they are sent as `null` (1b20102)
- Add type `OneOrMoreSnowflakes`, allow `GatewayRequestGuildMembers` to request multiple guild and user ids (644d3be, 85e922b)
- Updated `LazyRequest` (op 14) to use the `Snowflake` type for ids instead of just `String` (61ac7d1)
- Fixed a deserialization error on discord.com related to experiments (they are not implemented yet, see #578) (7feb571)
- Fixed a deserialization error on discord.com related to `last_viewed` in `ReadState` being a version / counter, not a `DateTime` (fb94afa)

## Commits:

* fix: temporarily fix READY on Spacebar

Spacebar servers have mention_count in ReadStateEntry as nullable, as well as not having the flags field

This should probably be investigated further

Reported by greysilly7 on the polyphony discord

* fix: on DDC the user field in Relationship is not send in READY anymore

* fix: for some reason presences wasn't an array??

* fix: deserialize activities: null as empty array

* feat: add OneOrMoreSnowflakes type

Adds a public type which allows serializing one snowflake or an array of snowflakes.

Useful for e.g. request guild members, where we can request either for one user or for multiple

* feat: update RequestGuildMembers, allow multiple snowflakes

Updates the RequsetGuildMembers gateway event, allows to query multiple user_ids and guild_ids using OneOrMoreSnowflakes type, update its documentation

* fix?: add Default to OneOrMoreSnowflakes, GatewayRequestGuildMembers

* feat: add gateway last messages, refactor gateway to use Opcode enum

* fix: the string in lazy request is a snowflake

* fix: deserialization error related to experiments on Discord.com

See also #578 on why we aren't currently juts implementing them as types

* fix?: last_viewed is a counter, not a datetime

thanks to MaddyUnderStars; this explanation seems to make sense considering we only need to know if we are ahead / behind the version, not when exactly it was accessed
  • Loading branch information
kozabrada123 authored Nov 21, 2024
1 parent 5adc0bc commit 7369016
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 158 deletions.
1 change: 1 addition & 0 deletions src/gateway/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ pub struct Message {
pub reaction_remove_emoji: Publisher<types::MessageReactionRemoveEmoji>,
pub recent_mention_delete: Publisher<types::RecentMentionDelete>,
pub ack: Publisher<types::MessageACK>,
pub last_messages: Publisher<types::LastMessages>,
}

#[derive(Default, Debug)]
Expand Down
86 changes: 43 additions & 43 deletions src/gateway/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use super::{Sink, Stream};
use crate::types::{
self, AutoModerationRule, AutoModerationRuleUpdate, Channel, ChannelCreate, ChannelDelete,
ChannelUpdate, CloseCode, GatewayInvalidSession, GatewayReconnect, Guild, GuildRoleCreate,
GuildRoleUpdate, JsonField, RoleObject, SourceUrlField, ThreadUpdate, UpdateMessage,
GuildRoleUpdate, JsonField, Opcode, RoleObject, SourceUrlField, ThreadUpdate, UpdateMessage,
WebSocketEvent,
};

Expand Down Expand Up @@ -117,13 +117,14 @@ impl Gateway {
let gateway_payload: types::GatewayReceivePayload =
serde_json::from_str(&message.0).unwrap();

if gateway_payload.op_code != GATEWAY_HELLO {
if gateway_payload.op_code != (Opcode::Hello as u8) {
warn!("GW: Received a non-hello opcode ({}) on gateway init", gateway_payload.op_code);
return Err(GatewayError::NonHelloOnInitiate {
opcode: gateway_payload.op_code,
});
}

info!("GW: Received Hello");
debug!("GW: Received Hello");

let gateway_hello: types::HelloData =
serde_json::from_str(gateway_payload.event_data.unwrap().get()).unwrap();
Expand Down Expand Up @@ -348,16 +349,25 @@ impl Gateway {
return;
};

// See https://discord.com/developers/docs/topics/opcodes-and-status-codes#gateway-gateway-opcodes
match gateway_payload.op_code {
let op_code_res = Opcode::try_from(gateway_payload.op_code);

if op_code_res.is_err() {
warn!("Received unrecognized gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code);
trace!("Event data: {:?}", gateway_payload);
return;
}

let op_code = op_code_res.unwrap();

match op_code {
// An event was dispatched, we need to look at the gateway event name t
GATEWAY_DISPATCH => {
Opcode::Dispatch => {
let Some(event_name) = gateway_payload.event_name else {
warn!("Gateway dispatch op without event_name");
warn!("GW: Received dispatch without event_name");
return;
};

trace!("Gateway: Received {event_name}");
trace!("GW: Received {event_name}");

macro_rules! handle {
($($name:literal => $($path:ident).+ $( $message_type:ty: $update_type:ty)?),*) => {
Expand Down Expand Up @@ -483,6 +493,7 @@ impl Gateway {
"INTERACTION_CREATE" => interaction.create, // TODO
"INVITE_CREATE" => invite.create, // TODO
"INVITE_DELETE" => invite.delete, // TODO
"LAST_MESSAGES" => message.last_messages,
"MESSAGE_CREATE" => message.create,
"MESSAGE_UPDATE" => message.update, // TODO
"MESSAGE_DELETE" => message.delete,
Expand Down Expand Up @@ -511,14 +522,28 @@ impl Gateway {
}
// We received a heartbeat from the server
// "Discord may send the app a Heartbeat (opcode 1) event, in which case the app should send a Heartbeat event immediately."
GATEWAY_HEARTBEAT => {
Opcode::Heartbeat => {
trace!("GW: Received Heartbeat // Heartbeat Request");

// Tell the heartbeat handler it should send a heartbeat right away
let heartbeat_communication = HeartbeatThreadCommunication {
sequence_number: gateway_payload.sequence_number,
op_code: Some(Opcode::Heartbeat),
};

self.heartbeat_handler
.send
.send(heartbeat_communication)
.await
.unwrap();
}
Opcode::HeartbeatAck => {
trace!("GW: Received Heartbeat ACK");

// Tell the heartbeat handler we received an ack
let heartbeat_communication = HeartbeatThreadCommunication {
sequence_number: gateway_payload.sequence_number,
op_code: Some(GATEWAY_HEARTBEAT),
op_code: Some(Opcode::HeartbeatAck),
};

self.heartbeat_handler
Expand All @@ -527,7 +552,7 @@ impl Gateway {
.await
.unwrap();
}
GATEWAY_RECONNECT => {
Opcode::Reconnect => {
trace!("GW: Received Reconnect");

let reconnect = GatewayReconnect {};
Expand All @@ -540,7 +565,7 @@ impl Gateway {
.publish(reconnect)
.await;
}
GATEWAY_INVALID_SESSION => {
Opcode::InvalidSession => {
trace!("GW: Received Invalid Session");

let mut resumable: bool = false;
Expand All @@ -566,44 +591,19 @@ impl Gateway {
.await;
}
// Starts our heartbeat
// We should have already handled this in gateway init
GATEWAY_HELLO => {
// We should have already handled this
Opcode::Hello => {
warn!("Received hello when it was unexpected");
}
GATEWAY_HEARTBEAT_ACK => {
trace!("GW: Received Heartbeat ACK");

// Tell the heartbeat handler we received an ack

let heartbeat_communication = HeartbeatThreadCommunication {
sequence_number: gateway_payload.sequence_number,
op_code: Some(GATEWAY_HEARTBEAT_ACK),
};

self.heartbeat_handler
.send
.send(heartbeat_communication)
.await
.unwrap();
}
GATEWAY_IDENTIFY
| GATEWAY_UPDATE_PRESENCE
| GATEWAY_UPDATE_VOICE_STATE
| GATEWAY_RESUME
| GATEWAY_REQUEST_GUILD_MEMBERS
| GATEWAY_CALL_SYNC
| GATEWAY_LAZY_REQUEST => {
info!(
"Received unexpected opcode ({}) for current state. This might be due to a faulty server implementation and is likely not the fault of chorus.",
_ => {
warn!(
"Received unexpected opcode ({}) for current state. This might be due to a faulty server implementation, but you can open an issue on the chorus github anyway",
gateway_payload.op_code
);
}
_ => {
warn!("Received unrecognized gateway op code ({})! Please open an issue on the chorus github so we can implement it", gateway_payload.op_code);
}
}

// If we we received a seq number we should let it know
// If we we received a sequence number we should let the heartbeat thread know
if let Some(seq_num) = gateway_payload.sequence_number {
let heartbeat_communication = HeartbeatThreadCommunication {
sequence_number: Some(seq_num),
Expand Down
50 changes: 35 additions & 15 deletions src/gateway/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use log::*;
use std::fmt::Debug;

use super::{events::Events, *};
use crate::types::{self, Composite, Shared};
use crate::types::{self, Composite, Opcode, Shared};

/// Represents a handle to a Gateway connection.
///
Expand Down Expand Up @@ -105,70 +105,90 @@ impl GatewayHandle {
object
}

/// Sends an identify event to the gateway
/// Sends an identify event ([types::GatewayIdentifyPayload]) to the gateway
pub async fn send_identify(&self, to_send: types::GatewayIdentifyPayload) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Identify..");

self.send_json_event(GATEWAY_IDENTIFY, to_send_value).await;
self.send_json_event(Opcode::Identify as u8, to_send_value).await;
}

/// Sends a resume event to the gateway
/// Sends a resume event ([types::GatewayResume]) to the gateway
pub async fn send_resume(&self, to_send: types::GatewayResume) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Resume..");

self.send_json_event(GATEWAY_RESUME, to_send_value).await;
self.send_json_event(Opcode::Resume as u8, to_send_value).await;
}

/// Sends an update presence event to the gateway
/// Sends an update presence event ([types::UpdatePresence]) to the gateway
pub async fn send_update_presence(&self, to_send: types::UpdatePresence) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Update Presence..");

self.send_json_event(GATEWAY_UPDATE_PRESENCE, to_send_value)
self.send_json_event(Opcode::PresenceUpdate as u8, to_send_value)
.await;
}

/// Sends a request guild members to the server
/// Sends a request guild members ([types::GatewayRequestGuildMembers]) to the server
pub async fn send_request_guild_members(&self, to_send: types::GatewayRequestGuildMembers) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Request Guild Members..");

self.send_json_event(GATEWAY_REQUEST_GUILD_MEMBERS, to_send_value)
self.send_json_event(Opcode::RequestGuildMembers as u8, to_send_value)
.await;
}

/// Sends an update voice state to the server
/// Sends an update voice state ([types::UpdateVoiceState]) to the server
pub async fn send_update_voice_state(&self, to_send: types::UpdateVoiceState) {
let to_send_value = serde_json::to_value(to_send).unwrap();

trace!("GW: Sending Update Voice State..");

self.send_json_event(GATEWAY_UPDATE_VOICE_STATE, to_send_value)
self.send_json_event(Opcode::VoiceStateUpdate as u8, to_send_value)
.await;
}

/// Sends a call sync to the server
/// Sends a call sync ([types::CallSync]) to the server
pub async fn send_call_sync(&self, to_send: types::CallSync) {
let to_send_value = serde_json::to_value(to_send).unwrap();

trace!("GW: Sending Call Sync..");

self.send_json_event(GATEWAY_CALL_SYNC, to_send_value).await;
self.send_json_event(Opcode::CallConnect as u8, to_send_value).await;
}

/// Sends a Lazy Request
/// Sends a request call connect event (aka [types::CallSync]) to the server
///
/// # Notes
/// Alias of [Self::send_call_sync]
pub async fn send_request_call_connect(&self, to_send: types::CallSync) {
self.send_call_sync(to_send).await
}

/// Sends a Lazy Request ([types::LazyRequest]) to the server
pub async fn send_lazy_request(&self, to_send: types::LazyRequest) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Lazy Request..");

self.send_json_event(GATEWAY_LAZY_REQUEST, to_send_value)
self.send_json_event(Opcode::GuildSubscriptions as u8, to_send_value)
.await;
}

/// Sends a Request Last Messages ([types::RequestLastMessages]) to the server
///
/// The server should respond with a [types::LastMessages] event
pub async fn send_request_last_messages(&self, to_send: types::RequestLastMessages) {
let to_send_value = serde_json::to_value(&to_send).unwrap();

trace!("GW: Sending Request Last Messages..");

self.send_json_event(Opcode::RequestLastMessages as u8, to_send_value)
.await;
}

Expand Down
12 changes: 6 additions & 6 deletions src/gateway/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use tokio::sync::mpsc::{Receiver, Sender};
use tokio::task;

use super::*;
use crate::types;
use crate::types::{self, Opcode};

/// The amount of time we wait for a heartbeat ack before resending our heartbeat in ms
pub const HEARTBEAT_ACK_TIMEOUT: u64 = 2000;

/// Handles sending heartbeats to the gateway in another thread
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is used
#[allow(dead_code)] // FIXME: Remove this, once HeartbeatHandler is "used"
#[derive(Debug)]
pub(super) struct HeartbeatHandler {
/// How ofter heartbeats need to be sent at a minimum
Expand Down Expand Up @@ -98,11 +98,11 @@ impl HeartbeatHandler {

if let Some(op_code) = communication.op_code {
match op_code {
GATEWAY_HEARTBEAT => {
Opcode::Heartbeat => {
// As per the api docs, if the server sends us a Heartbeat, that means we need to respond with a heartbeat immediately
should_send = true;
}
GATEWAY_HEARTBEAT_ACK => {
Opcode::HeartbeatAck => {
// The server received our heartbeat
last_heartbeat_acknowledged = true;
}
Expand All @@ -120,7 +120,7 @@ impl HeartbeatHandler {
trace!("GW: Sending Heartbeat..");

let heartbeat = types::GatewayHeartbeat {
op: GATEWAY_HEARTBEAT,
op: (Opcode::Heartbeat as u8),
d: last_seq_number,
};

Expand All @@ -147,7 +147,7 @@ impl HeartbeatHandler {
#[derive(Clone, Copy, Debug)]
pub(super) struct HeartbeatThreadCommunication {
/// The opcode for the communication we received, if relevant
pub(super) op_code: Option<u8>,
pub(super) op_code: Option<Opcode>,
/// The sequence number we got from discord, if any
pub(super) sequence_number: Option<u64>,
}
50 changes: 1 addition & 49 deletions src/gateway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,62 +20,14 @@ pub use message::*;
pub use options::*;

use crate::errors::GatewayError;
use crate::types::{Opcode, Snowflake};
use crate::types::Snowflake;

use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use tokio::sync::Mutex;

// Gateway opcodes
/// Opcode received when the server dispatches a [crate::types::WebSocketEvent]
const GATEWAY_DISPATCH: u8 = Opcode::Dispatch as u8;
/// Opcode sent when sending a heartbeat
const GATEWAY_HEARTBEAT: u8 = Opcode::Heartbeat as u8;
/// Opcode sent to initiate a session
///
/// See [types::GatewayIdentifyPayload]
const GATEWAY_IDENTIFY: u8 = Opcode::Identify as u8;
/// Opcode sent to update our presence
///
/// See [types::GatewayUpdatePresence]
const GATEWAY_UPDATE_PRESENCE: u8 = Opcode::PresenceUpdate as u8;
/// Opcode sent to update our state in vc
///
/// Like muting, deafening, leaving, joining..
///
/// See [types::UpdateVoiceState]
const GATEWAY_UPDATE_VOICE_STATE: u8 = Opcode::VoiceStateUpdate as u8;
/// Opcode sent to resume a session
///
/// See [types::GatewayResume]
const GATEWAY_RESUME: u8 = Opcode::Resume as u8;
/// Opcode received to tell the client to reconnect
const GATEWAY_RECONNECT: u8 = Opcode::Reconnect as u8;
/// Opcode sent to request guild member data
///
/// See [types::GatewayRequestGuildMembers]
const GATEWAY_REQUEST_GUILD_MEMBERS: u8 = Opcode::RequestGuildMembers as u8;
/// Opcode received to tell the client their token / session is invalid
const GATEWAY_INVALID_SESSION: u8 = Opcode::InvalidSession as u8;
/// Opcode received when initially connecting to the gateway, starts our heartbeat
///
/// See [types::HelloData]
const GATEWAY_HELLO: u8 = Opcode::Hello as u8;
/// Opcode received to acknowledge a heartbeat
const GATEWAY_HEARTBEAT_ACK: u8 = Opcode::HeartbeatAck as u8;
/// Opcode sent to get the voice state of users in a given DM/group channel
///
/// See [types::CallSync]
const GATEWAY_CALL_SYNC: u8 = Opcode::CallConnect as u8;
/// Opcode sent to get data for a server (Lazy Loading request)
///
/// Sent by the official client when switching to a server
///
/// See [types::LazyRequest]
const GATEWAY_LAZY_REQUEST: u8 = Opcode::GuildSync as u8;

pub type ObservableObject = dyn Send + Sync + Any;

/// Note: this is a reexport of [pubserve::Subscriber],
Expand Down
Loading

0 comments on commit 7369016

Please sign in to comment.