Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http source): Add custom response header configuration #20811

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Allows configuring custom headers to be added to responses from the http_server when using the HttpSource.
ChrisCanCompute marked this conversation as resolved.
Show resolved Hide resolved

authors: chriscancompute
104 changes: 104 additions & 0 deletions src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ pub struct SimpleHttpConfig {
#[configurable(metadata(docs::examples = "*"))]
headers: Vec<String>,

/// Custom response headers to be added to the HTTP response
#[serde(default)]
#[configurable(metadata(docs::examples = "example_custom_response_headers()"))]
#[configurable(metadata(
docs::additional_props_description = "A custom response header key-value pair"
))]
custom_response_headers: HashMap<String, String>,

/// A list of URL query parameters to include in the log event.
///
/// These override any values included in the body with conflicting names.
Expand Down Expand Up @@ -170,6 +178,13 @@ pub struct SimpleHttpConfig {
keepalive: KeepaliveConfig,
}

fn example_custom_response_headers() -> HashMap<String, String> {
HashMap::<String, String>::from_iter([(
"Access-Control-Allow-Origin".to_string(),
"my-cool-server".to_string(),
)])
}

impl SimpleHttpConfig {
/// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
Expand Down Expand Up @@ -265,6 +280,7 @@ impl Default for SimpleHttpConfig {
address: "0.0.0.0:8080".parse().unwrap(),
encoding: None,
headers: Vec::new(),
custom_response_headers: HashMap::new(),
query_parameters: Vec::new(),
tls: None,
auth: None,
Expand Down Expand Up @@ -355,6 +371,7 @@ impl SourceConfig for SimpleHttpConfig {

let source = SimpleHttpSource {
headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
custom_response_headers: self.custom_response_headers.clone(),
query_parameters: remove_duplicates(self.query_parameters.clone(), "query_parameters"),
path_key: self.path_key.clone(),
host_key: self.host_key.clone(),
Expand Down Expand Up @@ -403,6 +420,7 @@ impl SourceConfig for SimpleHttpConfig {
#[derive(Clone)]
struct SimpleHttpSource {
headers: Vec<HttpConfigParamKind>,
custom_response_headers: HashMap<String, String>,
query_parameters: Vec<String>,
path_key: OptionalValuePath,
host_key: OptionalValuePath,
Expand Down Expand Up @@ -544,10 +562,23 @@ impl HttpSource for SimpleHttpSource {
fn enable_source_ip(&self) -> bool {
self.host_key.path.is_some()
}

/// Enriches the warp::reply::Reply with custom headers
///
/// This method adds the custom headers specified in the configuration
/// to the HTTP response.
fn enrich_reply<T: warp::Reply + 'static>(&self, reply: T) -> Box<dyn warp::Reply> {
let mut boxed_reply: Box<dyn warp::Reply> = Box::new(reply);
for (key, value) in &self.custom_response_headers {
boxed_reply = Box::new(warp::reply::with_header(boxed_reply, key, value));
}
boxed_reply
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::str::FromStr;
use std::{io::Write, net::SocketAddr};

Expand Down Expand Up @@ -591,6 +622,7 @@ mod tests {
#[allow(clippy::too_many_arguments)]
async fn source<'a>(
headers: Vec<String>,
custom_response_headers: HashMap<String, String>,
query_parameters: Vec<String>,
path_key: &'a str,
host_key: &'a str,
Expand Down Expand Up @@ -619,6 +651,7 @@ mod tests {
SimpleHttpConfig {
address,
headers,
custom_response_headers,
encoding: None,
query_parameters,
response_code,
Expand Down Expand Up @@ -730,6 +763,7 @@ mod tests {

let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -776,6 +810,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -815,6 +850,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -848,6 +884,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -886,6 +923,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -931,6 +969,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -982,6 +1021,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1068,6 +1108,7 @@ mod tests {
"X-*".to_string(),
"AbsentHeader".to_string(),
],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1112,6 +1153,7 @@ mod tests {

let (rx, addr) = source(
vec!["*".to_string()],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1145,11 +1187,65 @@ mod tests {
}
}

#[tokio::test]
async fn http_custom_response_headers() {
async fn send(address: SocketAddr, body: &str) -> reqwest::Response {
reqwest::Client::new()
.post(&format!("http://{}/", address))
.body(body.to_owned())
.send()
.await
.unwrap()
}

assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let mut custom_headers: HashMap<String, String> = HashMap::new();
custom_headers.insert(
"Access-Control-Allow-Origin".to_string(),
"example.com".to_string(),
);

let (rx, addr) = source(
vec!["*".to_string()],
custom_headers,
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;

spawn_collect_n(
async move {
let response = send(addr, "{\"key1\":\"value1\"}").await;
let response_headers = response.headers();
assert!(response_headers.contains_key("Access-Control-Allow-Origin"));
assert_eq!(
response_headers["Access-Control-Allow-Origin"],
"example.com"
);
},
rx,
1,
)
.await
})
.await;
}

#[tokio::test]
async fn http_query() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![
"source".to_string(),
"region".to_string(),
Expand Down Expand Up @@ -1206,6 +1302,7 @@ mod tests {

let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1237,6 +1334,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"vector_http_path",
"vector_remote_ip",
Expand Down Expand Up @@ -1278,6 +1376,7 @@ mod tests {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"vector_http_path",
"vector_remote_ip",
Expand Down Expand Up @@ -1339,6 +1438,7 @@ mod tests {
components::init_test();
let (_rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"vector_http_path",
"vector_remote_ip",
Expand All @@ -1364,6 +1464,7 @@ mod tests {
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1398,6 +1499,7 @@ mod tests {
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1429,6 +1531,7 @@ mod tests {
let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down Expand Up @@ -1462,6 +1565,7 @@ mod tests {
components::init_test();
let (_rx, addr) = source(
vec![],
HashMap::new(),
vec![],
"http_path",
"remote_ip",
Expand Down
39 changes: 28 additions & 11 deletions src/sources/util/http/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
decode(encoding_header, body)
}

// This function can be defined to enrich `warp::Reply`s.
fn enrich_reply<T: warp::Reply + 'static>(&self, reply: T) -> Box<dyn warp::Reply> {
Box::new(reply)
}

#[allow(clippy::too_many_arguments)]
fn run(
self,
Expand All @@ -90,6 +95,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
let path = path.to_owned();
let acknowledgements = cx.do_acknowledgements(acknowledgements);
let enable_source_ip = self.enable_source_ip();
let self_clone = self.clone();

Ok(Box::pin(async move {
let mut filter: BoxedFilter<()> = match method {
Expand Down Expand Up @@ -170,21 +176,32 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
events
});

handle_request(events, acknowledgements, response_code, cx.out.clone())
handle_request(events, acknowledgements, response_code, cx.out.clone()).map(
{
let self_clone = self.clone();
move |result| {
result.map(move |reply| self_clone.enrich_reply(reply))
}
},
)
},
);

let ping = warp::get().and(warp::path("ping")).map(|| "pong");
let routes = svc.or(ping).recover(|r: Rejection| async move {
if let Some(e_msg) = r.find::<ErrorMessage>() {
let json = warp::reply::json(e_msg);
Ok(warp::reply::with_status(json, e_msg.status_code()))
} else {
//other internal error - will return 500 internal server error
emit!(HttpInternalError {
message: &format!("Internal error: {:?}", r)
});
Err(r)
let routes = svc.or(ping).recover(move |r: Rejection| {
let self_clone = self_clone.clone();
async move {
if let Some(e_msg) = r.find::<ErrorMessage>() {
let json = warp::reply::json(e_msg);
Ok(self_clone
.enrich_reply(warp::reply::with_status(json, e_msg.status_code())))
} else {
//other internal error - will return 500 internal server error
emit!(HttpInternalError {
message: &format!("Internal error: {:?}", r)
});
Err(r)
}
}
});

Expand Down
14 changes: 14 additions & 0 deletions website/cue/reference/components/sources/base/http.cue
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ base: components: sources: http: configuration: {
}
}
}
custom_response_headers: {
description: "Custom response headers to be added to the HTTP response"
required: false
type: object: {
examples: [{
"Access-Control-Allow-Origin": "my-cool-server"
}]
options: "*": {
description: "A custom response header key-value pair"
required: true
type: string: {}
}
}
}
decoding: {
description: "Configures how events are decoded from raw bytes."
required: false
Expand Down
Loading
Loading