Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a notification endpoint to the json-delta protocol. #863

Merged
merged 7 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 135 additions & 198 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rand = "0.8.1"
reqwest = { version = "0.11.0", default-features = false, features = ["blocking", "rustls-tls" ] }
ring = "0.16.12"
routecore = "0.3.1"
rpki = { version = "0.16.1", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
rpki = { git = "https://github.com/NLnetLabs/rpki-rs.git", features = [ "repository", "rrdp", "rtr", "serde", "slurm" ] }
rustls-pemfile = "1"
serde = { version = "1.0.95", features = [ "derive" ] }
serde_json = "1.0.57"
Expand All @@ -55,9 +55,9 @@ syslog = "6"
[features]
default = [ "socks", "ui"]
arbitrary = [ "dep:arbitrary", "chrono/arbitrary", "rpki/arbitrary" ]
socks = [ "reqwest/socks" ]
rta = []
native-tls = [ "reqwest/native-tls", "tls" ]
rta = []
socks = [ "reqwest/socks" ]
tls = []
ui = [ "routinator-ui" ]

Expand Down
10 changes: 10 additions & 0 deletions doc/manual/source/manual-page.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,16 @@ The service only supports GET requests with the following paths:
provided session and serial. If *reset* is *true*, the *withdrawn*
member is not present.

/json-delta/notify, /json-delta/notify?session=session&serial=serial
Waits with a response until a new update is available and then returns
a JSON object with two members *session* and *serial* which contain the
session ID and serial number of the updated data set.

If the *session* and *serial* query parameters are provided, the JSON
object is returned immediately if the session ID and serial number of
partim marked this conversation as resolved.
Show resolved Hide resolved
the current data set differ from the provided values and only waits
for an update if they are identical.

In addition, the current set of VRPs is available for each output format at a
partim marked this conversation as resolved.
Show resolved Hide resolved
path with the same name as the output format. E.g., the CSV output is
available at ``/csv``.
Expand Down
101 changes: 78 additions & 23 deletions src/http/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ use futures::stream;
use hyper::{Body, Method, Request};
use rpki::rtr::Serial;
use rpki::rtr::payload::{Action, PayloadRef};
use rpki::rtr::server::PayloadDiff;
use rpki::rtr::server::{NotifySender, PayloadDiff};
use crate::payload::{
DeltaArcIter, PayloadDelta, PayloadSnapshot, SharedHistory, SnapshotArcIter
};
use crate::utils::fmt::WriteOrPanic;
use crate::utils::json::JsonBuilder;
use super::response::{ContentType, Response, ResponseBuilder};

//------------ handle_get ----------------------------------------------------
//------------ handle_get_or_head --------------------------------------------

pub fn handle_get_or_head(
req: &Request<Body>,
Expand Down Expand Up @@ -57,6 +58,81 @@ pub fn handle_get_or_head(
Some(handle_reset(history.session(), history.serial(), snapshot))
}

fn handle_delta(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta)
.map(Result::<_, Infallible>::Ok)
)))
}

fn handle_reset(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
SnapshotStream::new(session, to_serial, snapshot)
.map(Result::<_, Infallible>::Ok)
)))
}


//------------ handle_get_or_head --------------------------------------------
partim marked this conversation as resolved.
Show resolved Hide resolved

pub async fn handle_notify_get_or_head(
req: &Request<Body>,
history: &SharedHistory,
notify: &NotifySender,
) -> Option<Response> {
if req.uri().path() != "/json-delta/notify" {
return None
}

let wait = match need_wait(req, history) {
Ok(wait) => wait,
Err(resp) => return Some(resp),
};

if wait {
notify.subscribe().recv().await;
}

if *req.method() == Method::HEAD {
Some(
ResponseBuilder::ok().content_type(ContentType::JSON).empty()
)
}
else {
let (session, serial) = history.read().session_and_serial();
Some(
ResponseBuilder::ok().content_type(ContentType::JSON).body(
JsonBuilder::build(|json| {
json.member_raw("session", session);
json.member_raw("serial", serial);
})
)
)
}
}

fn need_wait(
req: &Request<Body>,
history: &SharedHistory,
) -> Result<bool, Response> {
let version = match version_from_query(req.uri().query())? {
Some(version) => version,
None => return Ok(true),
};

Ok(history.read().session_and_serial() == version)
}


//------------ Helpers -------------------------------------------------------

fn version_from_query(
query: Option<&str>
) -> Result<Option<(u64, Serial)>, Response> {
Expand Down Expand Up @@ -95,27 +171,6 @@ fn version_from_query(
}
}

fn handle_delta(
session: u64, from_serial: Serial, to_serial: Serial,
delta: Arc<PayloadDelta>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
DeltaStream::new(session, from_serial, to_serial, delta)
.map(Result::<_, Infallible>::Ok)
)))
}

fn handle_reset(
session: u64, to_serial: Serial, snapshot: Arc<PayloadSnapshot>
) -> Response {
ResponseBuilder::ok().content_type(ContentType::JSON)
.body(Body::wrap_stream(stream::iter(
SnapshotStream::new(session, to_serial, snapshot)
.map(Result::<_, Infallible>::Ok)
)))
}


//------------ DeltaStream ---------------------------------------------------

Expand Down
13 changes: 11 additions & 2 deletions src/http/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use hyper::Server;
use hyper::server::accept::Accept;
use hyper::service::{make_service_fn, service_fn};
use log::error;
use rpki::rtr::server::NotifySender;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpListener;
use tokio_rustls::TlsAcceptor;
Expand All @@ -34,6 +35,7 @@ pub fn http_listener(
rtr_metrics: SharedRtrServerMetrics,
log: Option<Arc<LogOutput>>,
config: &Config,
notify: NotifySender,
) -> Result<impl Future<Output = ()>, ExitError> {
let metrics = Arc::new(HttpServerMetrics::default());

Expand All @@ -51,7 +53,7 @@ pub fn http_listener(
);
}
}
Ok(_http_listener(origins, metrics, rtr_metrics, log, listeners))
Ok(_http_listener(origins, metrics, rtr_metrics, log, notify, listeners))
}

fn create_tls_config(
Expand Down Expand Up @@ -79,6 +81,7 @@ async fn _http_listener(
metrics: Arc<HttpServerMetrics>,
rtr_metrics: SharedRtrServerMetrics,
log: Option<Arc<LogOutput>>,
notify: NotifySender,
listeners: Vec<(SocketAddr, Option<Arc<tls::ServerConfig>>, StdListener)>,
) {
// If there are no listeners, just never return.
Expand All @@ -93,6 +96,7 @@ async fn _http_listener(
addr, tls_config, listener,
origins.clone(), metrics.clone(),
rtr_metrics.clone(), log.clone(),
notify.clone(),
))
})
).await;
Expand All @@ -104,6 +108,7 @@ async fn _http_listener(
/// listener, in which case it will print an error and resolve the error case.
/// It will listen bind a Hyper server onto `addr` and produce any data
/// served from `origins`.
#[allow(clippy::too_many_arguments)]
async fn single_http_listener(
addr: SocketAddr,
tls_config: Option<Arc<tls::ServerConfig>>,
Expand All @@ -112,22 +117,26 @@ async fn single_http_listener(
metrics: Arc<HttpServerMetrics>,
rtr_metrics: SharedRtrServerMetrics,
log: Option<Arc<LogOutput>>,
notify: NotifySender,
) {
let make_service = make_service_fn(|_conn| {
let origins = origins.clone();
let metrics = metrics.clone();
let rtr_metrics = rtr_metrics.clone();
let log = log.clone();
let notify = notify.clone();
async move {
Ok::<_, Infallible>(service_fn(move |req| {
let origins = origins.clone();
let metrics = metrics.clone();
let rtr_metrics = rtr_metrics.clone();
let log = log.clone();
let notify = notify.clone();
async move {
Ok::<_, Infallible>(handle_request(
req, &origins, &metrics, &rtr_metrics,
log.as_ref().map(|x| x.as_ref())
log.as_ref().map(|x| x.as_ref()),
&notify,
).await.into_hyper())
}
}))
Expand Down
7 changes: 7 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod validity;
//------------ handle_request ------------------------------------------------

use hyper::{Body, Method, Request};
use rpki::rtr::server::NotifySender;
use crate::metrics::{HttpServerMetrics, SharedRtrServerMetrics};
use crate::payload::SharedHistory;
use crate::process::LogOutput;
Expand All @@ -36,6 +37,7 @@ async fn handle_request(
metrics: &HttpServerMetrics,
rtr_metrics: &SharedRtrServerMetrics,
log: Option<&LogOutput>,
notify: &NotifySender,
) -> Response {
metrics.inc_requests();
if *req.method() != Method::GET && *req.method() != Method::HEAD {
Expand All @@ -45,6 +47,11 @@ async fn handle_request(
if let Some(response) = payload::handle_get_or_head(&req, origins) {
return response
}
if let Some(response) = delta::handle_notify_get_or_head(
&req, origins, notify,
).await {
return response
}
if let Some(response) = delta::handle_get_or_head(&req, origins) {
return response
}
Expand Down
8 changes: 5 additions & 3 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,14 @@ impl Server {
);

let history = SharedHistory::from_config(process.config());
let (mut notify, rtr) = rtr_listener(
let mut notify = NotifySender::new();
let rtr = rtr_listener(
history.clone(), rtr_metrics.clone(), process.config(),
process.get_listen_fd()?
notify.clone(), process.get_listen_fd()?
)?;
let http = http_listener(
history.clone(), rtr_metrics, log.clone(), process.config()
history.clone(), rtr_metrics, log.clone(), process.config(),
notify.clone(),
)?;

process.drop_privileges()?;
Expand Down
5 changes: 5 additions & 0 deletions src/payload/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ impl PayloadHistory {
self.session
}

/// Returns the session and serial number of the current data set.
pub fn session_and_serial(&self) -> (u64, Serial) {
(self.session(), self.serial())
}

/// Returns the RTR version of the session ID.
///
/// This is the last 16 bits of the full session ID.
Expand Down
9 changes: 4 additions & 5 deletions src/rtr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ pub fn rtr_listener(
history: SharedHistory,
metrics: SharedRtrServerMetrics,
config: &Config,
sender: NotifySender,
extra_listener: Option<StdListener>,
) -> Result<(NotifySender, impl Future<Output = ()>), ExitError> {
let sender = NotifySender::new();

) -> Result<impl Future<Output = ()>, ExitError> {
// Binding needs to have happened before dropping privileges
// during detach. So we do this here synchronously.
let mut listeners = Vec::new();
Expand All @@ -54,9 +53,9 @@ pub fn rtr_listener(
));
}
}
Ok((sender.clone(), _rtr_listener(
Ok(_rtr_listener(
history, metrics, sender, listeners, config.rtr_tcp_keepalive,
)))
))
}

fn create_tls_config(
Expand Down