Skip to content

Commit

Permalink
fix: separate centrifugo client handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
cailloumajor committed May 17, 2023
1 parent 30c8aeb commit e1d3f49
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 78 deletions.
49 changes: 30 additions & 19 deletions src/centrifugo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use clap::Args;
use reqwest::{header, Client as HttpClient};
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, info_span, instrument, Instrument};
use url::Url;

use crate::model::CentrifugoClientRequest;
use crate::model::{HealthChannel, TagsUpdateChannel, UpdateEvent};

#[derive(Args)]
#[group(skip)]
Expand Down Expand Up @@ -89,34 +89,45 @@ impl Client {
Ok(())
}

pub(crate) fn handle_requests(
&self,
) -> (mpsc::Sender<CentrifugoClientRequest>, JoinHandle<()>) {
let (tx, mut rx) = mpsc::channel(2);
pub(crate) fn handle_tags_update(&self) -> (TagsUpdateChannel, JoinHandle<()>) {
let (tx, mut rx) = mpsc::channel::<UpdateEvent>(10);
let cloned_self = self.clone();

let task = tokio::spawn(
async move {
info!(status = "started");

while let Some(request) = rx.recv().await {
match request {
CentrifugoClientRequest::TagsUpdate(event) => {
let (channel, data) = event.into_centrifugo();
let _ = cloned_self.publish(&channel, data).await;
}
CentrifugoClientRequest::Health(outcome_tx) => {
let outcome = cloned_self.publish("_", ()).await.is_ok();
if outcome_tx.send(outcome).is_err() {
error!(kind = "outcome channel sending");
}
}
while let Some(update_event) = rx.recv().await {
let (channel, data) = update_event.into_centrifugo();
let _ = cloned_self.publish(&channel, data).await;
}

info!(status = "terminating");
}
.instrument(info_span!("centrifugo_tags_update_handler")),
);

(tx, task)
}

pub(crate) fn handle_health(&self) -> (HealthChannel, JoinHandle<()>) {
let (tx, mut rx) = mpsc::channel::<oneshot::Sender<bool>>(1);
let cloned_self = self.clone();

let task = tokio::spawn(
async move {
info!(status = "started");

while let Some(response_tx) = rx.recv().await {
let outcome = cloned_self.publish("_", ()).await.is_ok();
if response_tx.send(outcome).is_err() {
error!(kind = "response channel sending");
}
}

info!(status = "terminating");
}
.instrument(info_span!("centrifugo_handle_requests")),
.instrument(info_span!("centrifugo_health_handler")),
);

(tx, task)
Expand Down
18 changes: 7 additions & 11 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ use futures_util::StreamExt;
use mongodb::bson::{doc, Document};
use mongodb::options::ClientOptions;
use mongodb::{Client, Collection};
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, info_span, instrument, Instrument};

use crate::model::{CentrifugoClientRequest, CurrentDataResponse, MongoDBData, UpdateEvent};
use crate::model::{
CurrentDataChannel, CurrentDataResponse, MongoDBData, TagsUpdateChannel, UpdateEvent,
};

const APP_NAME: &str = concat!(env!("CARGO_PKG_NAME"), " (", env!("CARGO_PKG_VERSION"), ")");

Expand Down Expand Up @@ -41,7 +43,7 @@ impl MongoDBCollection {

pub(crate) async fn handle_change_stream(
&self,
centrifugo_request: Sender<CentrifugoClientRequest>,
tags_update_channel: TagsUpdateChannel,
abort_reg: AbortRegistration,
stopper: oneshot::Sender<()>,
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
Expand All @@ -68,8 +70,7 @@ impl MongoDBCollection {
return Err(anyhow!("broken change stream"));
}
};
let request = CentrifugoClientRequest::TagsUpdate(event);
if let Err(err) = centrifugo_request.try_send(request) {
if let Err(err) = tags_update_channel.try_send(event) {
error!(kind = "request channel sending", %err);
}
}
Expand All @@ -83,12 +84,7 @@ impl MongoDBCollection {
Ok(handle)
}

pub(crate) fn handle_current_data(
&self,
) -> (
mpsc::Sender<(String, oneshot::Sender<CurrentDataResponse>)>,
JoinHandle<()>,
) {
pub(crate) fn handle_current_data(&self) -> (CurrentDataChannel, JoinHandle<()>) {
let collection = self.0.clone_with_type::<MongoDBData>();
let (tx, mut rx) = mpsc::channel::<(String, oneshot::Sender<CurrentDataResponse>)>(1);

Expand Down
51 changes: 17 additions & 34 deletions src/http_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ use axum::http::StatusCode;
use axum::{routing, Json, Router};
use serde::Deserialize;
use serde_json::{json, Value};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::oneshot;
use tracing::{debug, error, instrument};

use crate::model::{CentrifugoClientRequest, CurrentDataResponse, EnsureObject};
use crate::model::{CurrentDataChannel, EnsureObject, HealthChannel};

type StatusWithText = (StatusCode, &'static str);

Expand All @@ -16,8 +16,8 @@ const INTERNAL_ERROR: StatusWithText = (StatusCode::INTERNAL_SERVER_ERROR, "inte
#[derive(Clone)]
struct AppState {
namespace_prefix: ArcStr,
centrifugo_request: mpsc::Sender<CentrifugoClientRequest>,
current_data: mpsc::Sender<(String, oneshot::Sender<CurrentDataResponse>)>,
health_channel: HealthChannel,
current_data: CurrentDataChannel,
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -61,8 +61,8 @@ impl From<CentrifugoProxyError> for Json<Value> {

pub(crate) fn app(
mongodb_namespace: &str,
centrifugo_request: mpsc::Sender<CentrifugoClientRequest>,
current_data: mpsc::Sender<(String, oneshot::Sender<CurrentDataResponse>)>,
health_channel: HealthChannel,
current_data: CurrentDataChannel,
) -> Router {
let namespace_prefix = ArcStr::from(mongodb_namespace.to_owned() + ":");
Router::new()
Expand All @@ -73,16 +73,15 @@ pub(crate) fn app(
)
.with_state(AppState {
namespace_prefix,
centrifugo_request,
health_channel,
current_data,
})
}

#[instrument(name = "health_api_handler", skip_all)]
async fn health_handler(State(state): State<AppState>) -> Result<StatusCode, StatusWithText> {
let (tx, rx) = oneshot::channel();
let request = CentrifugoClientRequest::Health(tx);
state.centrifugo_request.try_send(request).map_err(|err| {
state.health_channel.try_send(tx).map_err(|err| {
error!(kind = "request channel sending", %err);
INTERNAL_ERROR
})?;
Expand Down Expand Up @@ -145,19 +144,17 @@ async fn centrifugo_subscribe_handler(
mod tests {
use axum::body::Body;
use axum::http::Request;
use tokio::sync::mpsc;
use tower::ServiceExt;

use super::*;

mod health_handler {

use super::*;

fn testing_fixture(
centrifugo_request: mpsc::Sender<CentrifugoClientRequest>,
) -> (Router, Request<Body>) {
fn testing_fixture(health_channel: HealthChannel) -> (Router, Request<Body>) {
let (current_data, _) = mpsc::channel(1);
let app = app(Default::default(), centrifugo_request, current_data);
let app = app(Default::default(), health_channel, current_data);
let req = Request::builder()
.uri("/health")
.body(Body::empty())
Expand All @@ -176,47 +173,35 @@ mod tests {
#[tokio::test]
async fn outcome_channel_receiving_error() {
let (tx, mut rx) = mpsc::channel(1);
let (app, req) = testing_fixture(tx);
tokio::spawn(async move {
// Consume and drop the response channel
let _ = rx.recv().await.expect("channel has been closed");
});
let (app, req) = testing_fixture(tx);
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
}

#[tokio::test]
async fn health_error() {
let (tx, mut rx) = mpsc::channel(1);
let (app, req) = testing_fixture(tx);
tokio::spawn(async move {
let CentrifugoClientRequest::Health(response_tx) = rx
.recv()
.await
.expect("channel has been closed")
else {
panic!("unexpected request");
};
let response_tx = rx.recv().await.expect("channel has been closed");
response_tx.send(false).expect("error sending response");
});
let (app, req) = testing_fixture(tx);
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::INTERNAL_SERVER_ERROR);
}

#[tokio::test]
async fn success() {
let (tx, mut rx) = mpsc::channel(1);
let (app, req) = testing_fixture(tx);
tokio::spawn(async move {
let CentrifugoClientRequest::Health(response_tx) = rx
.recv()
.await
.expect("channel has been closed")
else {
panic!("unexpected request");
};
let response_tx = rx.recv().await.expect("channel has been closed");
response_tx.send(true).expect("error sending response");
});
let (app, req) = testing_fixture(tx);
let res = app.oneshot(req).await.unwrap();
assert_eq!(res.status(), StatusCode::NO_CONTENT);
}
Expand All @@ -229,9 +214,7 @@ mod tests {

use super::*;

fn testing_app(
current_data: mpsc::Sender<(String, oneshot::Sender<CurrentDataResponse>)>,
) -> Router {
fn testing_app(current_data: CurrentDataChannel) -> Router {
let (centrifugo_request, _) = mpsc::channel(1);
app("ns", centrifugo_request, current_data)
}
Expand Down
14 changes: 6 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,18 @@ async fn main() -> anyhow::Result<()> {
let signals_handle = signals.handle();

let centrifugo_client = centrifugo::Client::new(&args.centrifugo);
let (centrifugo_requests_tx, centrifugo_client_task) = centrifugo_client.handle_requests();
let (tags_update_tx, tags_update_task) = centrifugo_client.handle_tags_update();
let (health_tx, health_task) = centrifugo_client.handle_health();

let (abort_handle, abort_reg) = AbortHandle::new_pair();

let mongodb_collection = db::create_collection(&args.mongodb).await?;
let change_stream_task = mongodb_collection
.handle_change_stream(centrifugo_requests_tx.clone(), abort_reg, api_stopper_tx)
.handle_change_stream(tags_update_tx, abort_reg, api_stopper_tx)
.await?;
let (current_data_tx, current_data_task) = mongodb_collection.handle_current_data();

let app = http_api::app(
&mongodb_collection.namespace(),
centrifugo_requests_tx,
current_data_tx,
);
let app = http_api::app(&mongodb_collection.namespace(), health_tx, current_data_tx);
async move {
info!(addr = %args.common.listen_address, msg = "start listening");
if let Err(err) = Server::bind(&args.common.listen_address)
Expand All @@ -104,7 +101,8 @@ async fn main() -> anyhow::Result<()> {

let (change_stream_task_result, ..) = tokio::try_join!(
change_stream_task,
centrifugo_client_task,
tags_update_task,
health_task,
current_data_task
)
.context("error joining tasks")?;
Expand Down
11 changes: 5 additions & 6 deletions src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ use mongodb::bson::{Bson, DateTime};
use mongodb::Namespace;
use serde::ser::{self, SerializeMap, Serializer};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tracing::{error, info_span};

pub(crate) type TagsUpdateChannel = mpsc::Sender<UpdateEvent>;
pub(crate) type HealthChannel = mpsc::Sender<oneshot::Sender<bool>>;
pub(crate) type CurrentDataChannel = mpsc::Sender<(String, oneshot::Sender<CurrentDataResponse>)>;

#[derive(Deserialize)]
#[serde(remote = "Namespace")]
struct UpdateNamespace {
Expand Down Expand Up @@ -59,11 +63,6 @@ impl UpdateEvent {
}
}

pub(crate) enum CentrifugoClientRequest {
TagsUpdate(UpdateEvent),
Health(oneshot::Sender<bool>),
}

pub(crate) type CurrentDataResponse = Result<Option<MongoDBData>, ()>;

#[derive(Clone, Debug, Deserialize)]
Expand Down

0 comments on commit e1d3f49

Please sign in to comment.