-
-
Notifications
You must be signed in to change notification settings - Fork 284
/
Copy pathpubsub.rs
118 lines (103 loc) · 2.98 KB
/
pubsub.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use crate::error::StreamError;
use collab_entity::proto;
use futures::stream::BoxStream;
use futures::StreamExt;
use prost::Message;
#[allow(deprecated)]
use redis::aio::{Connection, ConnectionManager};
use redis::{AsyncCommands, RedisWrite, ToRedisArgs};
use serde::{Deserialize, Serialize};
use tracing::instrument;
const ACTIVE_COLLAB_CHANNEL: &str = "active_collab_channel";
pub struct CollabStreamSub {
#[allow(deprecated)]
conn: Connection,
}
impl CollabStreamSub {
#[allow(deprecated)]
pub fn new(conn: Connection) -> Self {
Self { conn }
}
pub async fn subscribe(
self,
) -> Result<BoxStream<'static, Result<PubSubMessage, StreamError>>, StreamError> {
let mut pubsub = self.conn.into_pubsub();
pubsub.subscribe(ACTIVE_COLLAB_CHANNEL).await?;
let message_stream = pubsub
.into_on_message()
.then(|msg| async move {
let payload = msg.get_payload_bytes();
PubSubMessage::from_vec(payload)
})
.boxed();
Ok(message_stream)
}
}
pub struct CollabStreamPub {
conn: ConnectionManager,
}
impl CollabStreamPub {
pub fn new(conn: ConnectionManager) -> Self {
Self { conn }
}
#[instrument(level = "debug", skip_all, err)]
pub async fn publish(&mut self, message: PubSubMessage) -> Result<(), StreamError> {
let () = self.conn.publish(ACTIVE_COLLAB_CHANNEL, message).await?;
Ok(())
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PubSubMessage {
pub workspace_id: String,
pub oid: String,
}
impl PubSubMessage {
#[allow(dead_code)]
fn to_proto(&self) -> proto::collab::ActiveCollabId {
proto::collab::ActiveCollabId {
workspace_id: self.workspace_id.clone(),
oid: self.oid.clone(),
}
}
fn from_proto(proto: &proto::collab::ActiveCollabId) -> Self {
Self {
workspace_id: proto.workspace_id.clone(),
oid: proto.oid.clone(),
}
}
pub fn from_vec(vec: &[u8]) -> Result<Self, StreamError> {
match Message::decode(vec) {
Ok(proto) => Ok(Self::from_proto(&proto)),
Err(_) => match bincode::deserialize(vec) {
Ok(event) => Ok(event),
Err(e) => Err(StreamError::BinCodeSerde(e)),
},
}
}
}
impl ToRedisArgs for PubSubMessage {
fn write_redis_args<W>(&self, out: &mut W)
where
W: ?Sized + RedisWrite,
{
let proto = self.to_proto().encode_to_vec();
proto.write_redis_args(out);
}
}
#[cfg(test)]
mod test {
use prost::Message;
#[test]
fn test_pubsub_message_decoding() {
let message = super::PubSubMessage {
workspace_id: "1".to_string(),
oid: "o1".to_string(),
};
let bincode_encoded = bincode::serialize(&message).unwrap();
let protobuf_encoded = message.to_proto().encode_to_vec();
let decoded_from_bincode = super::PubSubMessage::from_vec(&bincode_encoded).unwrap();
let decoded_from_protobuf = super::PubSubMessage::from_vec(&protobuf_encoded).unwrap();
assert_eq!(message, decoded_from_bincode);
assert_eq!(message, decoded_from_protobuf);
}
}