Skip to content

Commit

Permalink
fix next stop calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanArns committed Nov 29, 2024
1 parent ba938ea commit fbd2852
Showing 1 changed file with 30 additions and 19 deletions.
49 changes: 30 additions & 19 deletions rust-crates/event-engine/src/stream_processor/delay_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,31 +37,42 @@ impl ProcessingStep for DelayProcessor {
}) => {
self.number_of_stops.lock().await.insert(trip_id.clone(), stops.len());
let key = String::from("delays:") + &trip_id;
if let Ok(delays) = redis_get::<Vec<i32>>(&key).await {
let now = FixedOffset::west_opt(3600).unwrap().from_utc_datetime(&Utc::now().naive_utc());
for (i, stop) in stops.iter().enumerate() {
let mut parts = stop.arrival_time.split(":");
let mut h: u32 = parts.next().map(|x| x.parse::<u32>().unwrap_or(0)).unwrap_or(0);
let m: u32 = parts.next().map(|x| x.parse::<u32>().unwrap_or(0)).unwrap_or(0);
let s: u32 = parts.next().map(|x| x.parse::<u32>().unwrap_or(0)).unwrap_or(0);
let mut stop_time = FixedOffset::west_opt(3600).unwrap().from_utc_datetime(&Utc::now().naive_utc());
if h > 24 {
if now.hour() > 12 {
stop_time += TimeDelta::days(1);
} else {
stop_time += TimeDelta::days(-1);
}
let delays = redis_get::<Vec<i32>>(&key).await.unwrap_or_else(|_| vec![0; stops.len()]);
let now = FixedOffset::east_opt(3600).unwrap().from_utc_datetime(&Utc::now().naive_utc());
let mut stop_time = now.clone();
let mut reached_previous = true;
for (i, stop) in stops.iter().enumerate() {
let mut parts = stop.arrival_time.split(":");
let mut h: u32 = parts.next().map(|x| x.parse::<u32>().unwrap_or(0)).unwrap_or(0);
let m: u32 = parts.next().map(|x| x.parse::<u32>().unwrap_or(0)).unwrap_or(0);
let s: u32 = parts.next().map(|x| x.parse::<u32>().unwrap_or(0)).unwrap_or(0);
stop_time = now.clone();
if h > 24 {
if now.hour() > 12 {
stop_time = stop_time.checked_add_signed(TimeDelta::days(1)).unwrap_or_else(|| stop_time);
}
h = h % 24;
stop_time.with_hour(h);
stop_time.with_minute(m);
stop_time.with_second(s);
if stop_time + TimeDelta::seconds(delays[i] as i64) > now {
}
h = h % 24;
stop_time = stop_time.with_hour(h).unwrap_or_else(|| stop_time);
stop_time = stop_time.with_minute(m).unwrap_or_else(|| stop_time);
stop_time = stop_time.with_second(s).unwrap_or_else(|| stop_time);
stop_time = stop_time.with_nanosecond(0).unwrap_or_else(|| stop_time);
stop_time = stop_time.checked_add_signed(TimeDelta::seconds(delays[i] as i64)).unwrap_or_else(|| stop_time);
if now > stop_time {
reached_previous = true;
} else {
if reached_previous {
reached_previous = false;
*delay = Some(delays[i]);
*stop_seq = Some(stop.stop_sequence);
}
}
}
if now > stop_time {
// vehicle has reached the end of the trip
*delay = delays.last().map(|x| x.to_owned()).or(Some(0));
*stop_seq = stops.last().map(|x| x.stop_sequence + 1).or(Some(1));
}
true
},
Event::TripUpdate(updates) => {
Expand Down

0 comments on commit fbd2852

Please sign in to comment.