Skip to content

Commit

Permalink
SSE::Connected event to return status an headers
Browse files Browse the repository at this point in the history
  • Loading branch information
spolu committed Jun 10, 2024
1 parent d34896e commit a04669b
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 6 deletions.
17 changes: 14 additions & 3 deletions contract-tests/src/bin/sse-test-api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,25 @@ struct Config {
#[derive(Serialize, Debug)]
#[serde(tag = "kind", rename_all = "camelCase")]
enum EventType {
Event { event: Event },
Comment { comment: String },
Error { error: String },
Connected {
status: u16,
headers: HashMap<String, String>,
},
Event {
event: Event,
},
Comment {
comment: String,
},
Error {
error: String,
},
}

impl From<es::SSE> for EventType {
fn from(event: es::SSE) -> Self {
match event {
es::SSE::Connected((status, headers)) => Self::Connected { status, headers },
es::SSE::Event(evt) => Self::Event {
event: Event {
event_type: evt.event_type,
Expand Down
3 changes: 3 additions & 0 deletions eventsource-client/examples/tail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ fn tail_events(client: impl es::Client) -> impl Stream<Item = Result<(), ()>> {
client
.stream()
.map_ok(|event| match event {
es::SSE::Connected((status, _)) => {
println!("got connected: \nstatus={}", status)
}
es::SSE::Event(ev) => {
println!("got an event: {}\n{}", ev.event_type, ev.data)
}
Expand Down
18 changes: 17 additions & 1 deletion eventsource-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use log::{debug, info, trace, warn};
use pin_project::pin_project;
use std::{
boxed,
collections::HashMap,
fmt::{self, Debug, Display, Formatter},
future::Future,
io::ErrorKind,
Expand Down Expand Up @@ -393,6 +394,7 @@ where
let this = self.as_mut().project();
if let Some(event) = this.event_parser.get_event() {
return match event {
SSE::Connected(_) => Poll::Ready(Some(Ok(event))),
SSE::Event(ref evt) => {
*this.last_event_id = evt.id.clone();

Expand Down Expand Up @@ -438,11 +440,25 @@ where
if resp.status().is_success() {
self.as_mut().project().retry_strategy.reset(Instant::now());
self.as_mut().reset_redirects();

let headers = resp.headers();
let mut map = HashMap::new();
for (key, value) in headers.iter() {
let key = key.to_string();
let value = match value.to_str() {
Ok(value) => value.to_string(),
Err(_) => String::from(""),
};
map.insert(key, value);
}
let status = resp.status().as_u16();

self.as_mut()
.project()
.state
.set(State::Connected(resp.into_body()));
continue;

return Poll::Ready(Some(Ok(SSE::Connected((status, map)))));
}

if resp.status() == 301 || resp.status() == 307 {
Expand Down
7 changes: 6 additions & 1 deletion eventsource-client/src/event_parser.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{collections::VecDeque, convert::TryFrom, str::from_utf8};
use std::{
collections::{HashMap, VecDeque},
convert::TryFrom,
str::from_utf8,
};

use hyper::body::Bytes;
use log::{debug, log_enabled, trace};
Expand Down Expand Up @@ -32,6 +36,7 @@ impl EventData {

#[derive(Debug, Eq, PartialEq)]
pub enum SSE {
Connected((u16, HashMap<String, String>)),
Event(Event),
Comment(String),
}
Expand Down
3 changes: 2 additions & 1 deletion eventsource-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
//! let mut stream = Box::pin(client.stream())
//! .map_ok(|event| match event {
//! SSE::Comment(comment) => println!("got a comment event: {:?}", comment),
//! SSE::Event(evt) => println!("got an event: {}", evt.event_type)
//! SSE::Event(evt) => println!("got an event: {}", evt.event_type),
//! SSE::Connected(_) => println!("got connected")
//! })
//! .map_err(|e| println!("error streaming events: {:?}", e));
//! # while let Ok(Some(_)) = stream.try_next().await {}
Expand Down

0 comments on commit a04669b

Please sign in to comment.