Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix and isolate message receipts #116

Merged
merged 6 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 154 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use unicode_width::UnicodeWidthStr;
use uuid::Uuid;

use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::collections::{hash_map::Entry, HashMap, HashSet};
use std::convert::{TryFrom, TryInto};
use std::path::Path;
use std::str::FromStr;
Expand All @@ -47,6 +47,140 @@ pub struct App {
display_help: bool,
pub is_searching: bool,
pub channel_text_width: usize,
receipt_handler: ReceiptHandler,
}

#[derive(Debug, Default, PartialEq, Eq)]
pub struct ReceiptHandler {
receipt_set: HashMap<Uuid, ReceiptQueues>,
time_since_update: u64,
}

impl ReceiptHandler {
pub fn new() -> Self {
Self {
receipt_set: HashMap::new(),
time_since_update: 0u64,
}
}

pub fn add_receipt_event(&mut self, event: ReceiptEvent) {
// Add a new set in the case no receipt had been handled for this contact
// over the current session
self.receipt_set
.entry(event.uuid)
.or_insert_with(ReceiptQueues::new)
.add(event.timestamp, event.receipt_type);
}

// Dictates whether receipts should be sent on the current tick
// Not used for now as
fn do_tick(&mut self) -> bool {
true
}

pub fn step(&mut self, signal_manager: &dyn SignalManager) -> bool {
if !self.do_tick() {
return false;
}
if self.receipt_set.is_empty() {
return false;
}

// Get any key
let uuid = *self.receipt_set.keys().next().unwrap();

let j = self.receipt_set.entry(uuid);
match j {
Entry::Occupied(mut e) => {
let u = e.get_mut();
if let Some((timestamps, receipt)) = u.get_data() {
signal_manager.send_receipt(uuid, timestamps, receipt);
if u.is_empty() {
self.receipt_set.remove_entry(&uuid);
boxdot marked this conversation as resolved.
Show resolved Hide resolved
}
return true;
}
}
Entry::Vacant(_) => {}
};
self.receipt_set.remove_entry(&uuid);
boxdot marked this conversation as resolved.
Show resolved Hide resolved
false
}
}

/// This get built anywhere in the client and get passed to the App
#[derive(Debug, Default, PartialEq, Eq)]
pub struct ReceiptEvent {
uuid: Uuid,
/// Timestamp of the messages
timestamp: u64,
/// Type : Received, Read
receipt_type: Receipt,
}

impl ReceiptEvent {
pub fn new(uuid: Uuid, timestamp: u64, receipt_type: Receipt) -> Self {
Self {
uuid,
timestamp,
receipt_type,
}
}
}

#[derive(Debug, Default, PartialEq, Eq)]
pub struct ReceiptQueues {
received_msg: HashSet<u64>,
read_msg: HashSet<u64>,
}

impl ReceiptQueues {
pub fn new() -> Self {
Self {
received_msg: HashSet::new(),
read_msg: HashSet::new(),
}
}

pub fn add_received(&mut self, timestamp: u64) {
if !self.received_msg.insert(timestamp) {
log::error!("Somehow got duplicate Received receipt @ {}", timestamp);
}
}

pub fn add_read(&mut self, timestamp: u64) {
// Ensures we do not send uselessly double the amount of receipts
// in the case a message is immediatly received and read.
self.received_msg.remove(&timestamp);
if !self.read_msg.insert(timestamp) {
log::error!("Somehow got duplicate Read receipt @ {}", timestamp);
}
}

pub fn add(&mut self, timestamp: u64, receipt: Receipt) {
match receipt {
Receipt::Received => self.add_received(timestamp),
Receipt::Read => self.add_read(timestamp),
_ => {}
}
}

pub fn get_data(&mut self) -> Option<(Vec<u64>, Receipt)> {
boxdot marked this conversation as resolved.
Show resolved Hide resolved
if !self.received_msg.is_empty() {
let timestamps = self.received_msg.drain().collect::<Vec<u64>>();
return Some((timestamps, Receipt::Received));
}
if !self.read_msg.is_empty() {
let timestamps = self.read_msg.drain().collect::<Vec<u64>>();
return Some((timestamps, Receipt::Read));
}
None
}

pub fn is_empty(&self) -> bool {
self.received_msg.is_empty() && self.read_msg.is_empty()
}
}

#[derive(Debug, Default, PartialEq, Eq)]
Expand Down Expand Up @@ -368,7 +502,7 @@ impl TypingAction {

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub enum Receipt {
Nothing,
Nothing, // Do not do anything to these receipts in order to avoid spamming receipt messages when an old database is loaded
Sent,
Received,
Read,
Expand Down Expand Up @@ -434,7 +568,7 @@ impl Message {
quote: None,
attachments: Default::default(),
reactions: Default::default(),
receipt: Default::default(),
receipt: Receipt::Sent,
}
}

Expand All @@ -446,7 +580,7 @@ impl Message {
quote: None,
attachments: Default::default(),
reactions: Default::default(),
receipt: Default::default(),
receipt: Receipt::Sent,
})
}
}
Expand All @@ -460,6 +594,7 @@ pub enum Event {
Message(Content),
Resize { cols: u16, rows: u16 },
Quit(Option<anyhow::Error>),
Tick,
}

impl App {
Expand All @@ -482,6 +617,7 @@ impl App {
display_help: false,
is_searching: false,
channel_text_width: 0,
receipt_handler: ReceiptHandler::new(),
})
}

Expand Down Expand Up @@ -835,6 +971,9 @@ impl App {

self.notify(&from, &text);

// Send "Delivered" receipt
self.add_receipt_event(ReceiptEvent::new(uuid, timestamp, Receipt::Received));

let quote = quote.and_then(Message::from_quote).map(Box::new);
let message = Message {
quote,
Expand Down Expand Up @@ -998,9 +1137,13 @@ impl App {
Ok(())
}

pub fn send_receipts(&self, channel: &Channel, timestamps: Vec<u64>, receipt: Receipt) {
self.signal_manager
.send_receipt(channel, self.user_id, timestamps, receipt)
pub fn step_receipts(&mut self) -> anyhow::Result<()> {
if self.receipt_handler.step(self.signal_manager.as_ref()) {
// No need to save if no receipt was sent
self.save()
} else {
Ok(())
}
}

fn handle_typing(
Expand Down Expand Up @@ -1069,6 +1212,10 @@ impl App {
Ok(())
}

pub fn add_receipt_event(&mut self, event: ReceiptEvent) {
self.receipt_handler.add_receipt_event(event);
}

fn handle_receipt(&mut self, sender_uuid: Uuid, typ: i32, timestamps: Vec<u64>) {
let earliest = timestamps.iter().min().unwrap();
for c in self.data.channels.items.iter_mut() {
Expand Down
18 changes: 18 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use std::time::{Duration, Instant};
use crate::{signal::PresageManager, storage::JsonStorage};

const TARGET_FPS: u64 = 144;
const RECEIPT_TICK_PERIOD: u64 = 144;
const FRAME_BUDGET: Duration = Duration::from_millis(1000 / TARGET_FPS);
const RECEIPT_BUDGET: Duration = Duration::from_millis(RECEIPT_TICK_PERIOD * 1000 / TARGET_FPS);
const MESSAGE_SCROLL_BACK: bool = false;

#[derive(Debug, StructOpt)]
Expand Down Expand Up @@ -166,6 +168,19 @@ async fn run_single_threaded(relink: bool) -> anyhow::Result<()> {
let mut last_render_at = Instant::now();
let is_render_spawned = Arc::new(AtomicBool::new(false));

let tick_tx = tx.clone();
// Tick to trigger receipt sending
tokio::spawn(async move {
Sup3Legacy marked this conversation as resolved.
Show resolved Hide resolved
let mut interval = tokio::time::interval(RECEIPT_BUDGET);
loop {
interval.tick().await;
tick_tx
.send(Event::Tick)
.await
.expect("Cannot tick: events channel closed.");
}
});

loop {
// render
let left_frame_budget = FRAME_BUDGET.checked_sub(last_render_at.elapsed());
Expand All @@ -191,6 +206,9 @@ async fn run_single_threaded(relink: bool) -> anyhow::Result<()> {
}

match rx.recv().await {
Some(Event::Tick) => {
let _ = app.step_receipts();
}
Some(Event::Click(event)) => match event.kind {
MouseEventKind::Down(MouseButton::Left) => {
let col = event.column;
Expand Down
44 changes: 14 additions & 30 deletions src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,7 @@ pub trait SignalManager {
master_key_bytes: GroupMasterKeyBytes,
) -> anyhow::Result<ResolvedGroup>;

fn send_receipt(
&self,
channel: &Channel,
sender_uuid: Uuid,
timestamps: Vec<u64>,
receipt: Receipt,
);
fn send_receipt(&self, sender_uuid: Uuid, timestamps: Vec<u64>, receipt: Receipt);

fn send_text(
&self,
Expand Down Expand Up @@ -82,33 +76,23 @@ impl SignalManager for PresageManager {
self.manager.uuid()
}

fn send_receipt(
&self,
channel: &Channel,
_sender_uuid: Uuid,
timestamps: Vec<u64>,
receipt: Receipt,
) {
fn send_receipt(&self, _sender_uuid: Uuid, timestamps: Vec<u64>, receipt: Receipt) {
let now_timestamp = utc_now_timestamp_msec();
let data_message = ReceiptMessage {
r#type: Some(receipt.to_i32()),
timestamp: timestamps,
};

match channel.id {
ChannelId::User(uuid) => {
let manager = self.manager.clone();
tokio::task::spawn_local(async move {
let body = ContentBody::ReceiptMessage(data_message);
if let Err(e) = manager.send_message(uuid, body, now_timestamp).await {
log::error!("Failed to send message to {}: {}", uuid, e);
}
});
let manager = self.manager.clone();
tokio::task::spawn_local(async move {
let body = ContentBody::ReceiptMessage(data_message);
if let Err(e) = manager
.send_message(_sender_uuid, body, now_timestamp)
.await
{
log::error!("Failed to send message to {}: {}", _sender_uuid, e);
}
ChannelId::Group(_) => {
log::warn!("Not supported for now.");
}
}
});
}

fn send_text(
Expand Down Expand Up @@ -195,7 +179,7 @@ impl SignalManager for PresageManager {
quote: quote_message,
attachments: Default::default(),
reactions: Default::default(),
receipt: Default::default(),
receipt: Receipt::Sent,
}
}

Expand Down Expand Up @@ -439,7 +423,7 @@ pub mod test {
self.user_id
}

fn send_receipt(&self, _: &Channel, _: Uuid, _: Vec<u64>, _: Receipt) {}
fn send_receipt(&self, _: Uuid, _: Vec<u64>, _: Receipt) {}

async fn contact_name(&self, _id: Uuid, _profile_key: [u8; 32]) -> Option<String> {
None
Expand Down Expand Up @@ -476,7 +460,7 @@ pub mod test {
attachments: Default::default(),
reactions: Default::default(),
// TODO make sure the message sending procedure did not fail
receipt: crate::app::Receipt::Sent,
receipt: Receipt::Sent,
};
self.sent_messages.borrow_mut().push(message.clone());
println!("sent messages: {:?}", self.sent_messages.borrow());
Expand Down
Loading