Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abstract streamhub message notifications #132

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions application/xiu/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use commonlib::auth::AuthType;
use rtmp::remuxer::RtmpRemuxer;

use std::sync::Arc;
use crate::config::{AuthConfig, AuthSecretConfig};

use {
Expand All @@ -16,7 +16,7 @@ use {
relay::{pull_client::PullClient, push_client::PushClient},
rtmp::RtmpServer,
},
streamhub::{notify::Notifier, StreamsHub},
streamhub::{notify::Notifier, notify::http::HttpNotifier, StreamsHub},
tokio,
xrtsp::rtsp::RtspServer,
xwebrtc::webrtc::WebRTCServer,
Expand Down Expand Up @@ -61,16 +61,16 @@ impl Service {
}

pub async fn run(&mut self) -> Result<()> {
let notifier = if let Some(httpnotifier) = &self.cfg.httpnotify {
let notifier: Option<Arc<dyn Notifier>> = if let Some(httpnotifier) = &self.cfg.httpnotify {
if !httpnotifier.enabled {
None
} else {
Some(Notifier::new(
Some(Arc::new(HttpNotifier::new(
httpnotifier.on_publish.clone(),
httpnotifier.on_unpublish.clone(),
httpnotifier.on_play.clone(),
httpnotifier.on_stop.clone(),
))
)))
}
} else {
None
Expand Down
43 changes: 43 additions & 0 deletions library/streamhub/src/define.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,27 @@ pub enum PubDataType {
Both,
}

#[derive(Clone, Serialize)]
pub enum StreamHubEventMessage {
Subscribe {
identifier: StreamIdentifier,
info: SubscriberInfo,
},
UnSubscribe {
identifier: StreamIdentifier,
info: SubscriberInfo,
},
Publish {
identifier: StreamIdentifier,
info: PublisherInfo,
},
UnPublish {
identifier: StreamIdentifier,
info: PublisherInfo,
},
NotSupport {},
}

#[derive(Serialize)]
pub enum StreamHubEvent {
Subscribe {
Expand Down Expand Up @@ -266,6 +287,28 @@ pub enum StreamHubEvent {
},
}

impl StreamHubEvent {
pub fn to_message(&self) -> StreamHubEventMessage {
match self {
StreamHubEvent::Subscribe { identifier, info, result_sender: _result_sender } => {
StreamHubEventMessage::Subscribe { identifier: identifier.clone(), info: info.clone() }
}
StreamHubEvent::UnSubscribe { identifier, info } => {
StreamHubEventMessage::UnSubscribe { identifier: identifier.clone(), info: info.clone() }
}
StreamHubEvent::Publish { identifier, info, result_sender: _result_sender, stream_handler: _stream_handler } => {
StreamHubEventMessage::Publish { identifier: identifier.clone(), info: info.clone() }
}
StreamHubEvent::UnPublish { identifier, info } => {
StreamHubEventMessage::UnPublish { identifier: identifier.clone(), info: info.clone() }
}
_ => {
StreamHubEventMessage::NotSupport {}
}
}
}
}

#[derive(Debug)]
pub enum TransceiverEvent {
Subscribe {
Expand Down
26 changes: 10 additions & 16 deletions library/streamhub/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,11 +516,11 @@ pub struct StreamsHub {
//enable hls
hls_enabled: bool,
//http notifier on sub/pub event
notifier: Option<Notifier>,
notifier: Option<Arc<dyn Notifier>>,
}

impl StreamsHub {
pub fn new(notifier: Option<Notifier>) -> Self {
pub fn new(notifier: Option<Arc<dyn Notifier>>) -> Self {
let (event_producer, event_consumer) = mpsc::unbounded_channel();
let (client_producer, _) = broadcast::channel(100);

Expand Down Expand Up @@ -566,15 +566,9 @@ impl StreamsHub {
}

pub async fn event_loop(&mut self) {
while let Some(message) = self.hub_event_receiver.recv().await {
let event_serialize_str = if let Ok(data) = serde_json::to_string(&message) {
log::info!("event data: {}", data);
data
} else {
String::from("empty body")
};

match message {
while let Some(event) = self.hub_event_receiver.recv().await {
let message = event.to_message();
match event {
StreamHubEvent::Publish {
identifier,
info,
Expand Down Expand Up @@ -627,7 +621,7 @@ impl StreamsHub {
{
Ok(statistic_data_sender) => {
if let Some(notifier) = &self.notifier {
notifier.on_publish_notify(event_serialize_str).await;
notifier.on_publish_notify(&message).await;
}
self.un_pub_sub_events
.insert(info.id, StreamHubEvent::UnPublish { identifier, info });
Expand Down Expand Up @@ -658,7 +652,7 @@ impl StreamsHub {
}

if let Some(notifier) = &self.notifier {
notifier.on_unpublish_notify(event_serialize_str).await;
notifier.on_unpublish_notify(&message).await;
}
}
StreamHubEvent::Subscribe {
Expand Down Expand Up @@ -700,7 +694,7 @@ impl StreamsHub {
let rv = match self.subscribe(&identifier, info_clone, sender).await {
Ok(statistic_data_sender) => {
if let Some(notifier) = &self.notifier {
notifier.on_play_notify(event_serialize_str).await;
notifier.on_play_notify(&message).await;
}

self.un_pub_sub_events
Expand All @@ -720,7 +714,7 @@ impl StreamsHub {
StreamHubEvent::UnSubscribe { identifier, info } => {
if self.unsubscribe(&identifier, info).is_ok() {
if let Some(notifier) = &self.notifier {
notifier.on_stop_notify(event_serialize_str).await;
notifier.on_stop_notify(&message).await;
}
}
}
Expand Down Expand Up @@ -999,7 +993,7 @@ impl StreamsHub {
None => {
return Err(StreamHubError {
value: StreamHubErrorValue::NoAppName,
})
});
}
}

Expand Down
122 changes: 122 additions & 0 deletions library/streamhub/src/notify/http.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use crate::notify::Notifier;
use reqwest::Client;
use async_trait::async_trait;
use crate::define::{StreamHubEventMessage};

macro_rules! serialize_event {
($message:expr) => {{
let event_serialize_str = match serde_json::to_string(&$message) {
Ok(data) => {
log::info!("event data: {}", data);
data
}
Err(_) => String::from("empty body"),
};
event_serialize_str
}};
}


pub struct HttpNotifier {
request_client: Client,
on_publish_url: Option<String>,
on_unpublish_url: Option<String>,
on_play_url: Option<String>,
on_stop_url: Option<String>,
}

impl HttpNotifier {
pub fn new(
on_publish_url: Option<String>,
on_unpublish_url: Option<String>,
on_play_url: Option<String>,
on_stop_url: Option<String>,
) -> Self {
Self {
request_client: reqwest::Client::new(),
on_publish_url,
on_unpublish_url,
on_play_url,
on_stop_url,
}
}
}

#[async_trait]
impl Notifier for HttpNotifier {
async fn on_publish_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_publish_url) = &self.on_publish_url {
match self
.request_client
.post(on_publish_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_publish error: {}", err);
}
Ok(response) => {
log::info!("on_publish success: {:?}", response);
}
}
}
}

async fn on_unpublish_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_unpublish_url) = &self.on_unpublish_url {
match self
.request_client
.post(on_unpublish_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_unpublish error: {}", err);
}
Ok(response) => {
log::info!("on_unpublish success: {:?}", response);
}
}
}
}

async fn on_play_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_play_url) = &self.on_play_url {
match self
.request_client
.post(on_play_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_play error: {}", err);
}
Ok(response) => {
log::info!("on_play success: {:?}", response);
}
}
}
}

async fn on_stop_notify(&self, event: &StreamHubEventMessage) {
if let Some(on_stop_url) = &self.on_stop_url {
match self
.request_client
.post(on_stop_url)
.body(serialize_event!(event))
.send()
.await
{
Err(err) => {
log::error!("on_stop error: {}", err);
}
Ok(response) => {
log::info!("on_stop success: {:?}", response);
}
}
}
}
}
Loading
Loading