From 1fdb5a6615945cce831d2a39055c4aa83525cb6c Mon Sep 17 00:00:00 2001 From: abyrd Date: Thu, 21 Jun 2012 12:25:11 +0200 Subject: [PATCH] Streaming stoptime updater for Dutch KV8 CTX over ZeroMQ --- opentripplanner-updater/pom.xml | 6 +- .../stoptime/GTFSZMQUpdateStreamer.java | 5 + .../stoptime/KV8ZMQUpdateStreamer.java | 111 +++++++++++++ .../updater/stoptime/StoptimeUpdater.java | 84 ++++++++++ .../updater/stoptime/StoptimeUpdaterOld.java | 147 ++++++++++++++++++ .../updater/stoptime/UpdateStreamer.java | 9 ++ 6 files changed, 361 insertions(+), 1 deletion(-) create mode 100644 opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/GTFSZMQUpdateStreamer.java create mode 100644 opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/KV8ZMQUpdateStreamer.java create mode 100644 opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdater.java create mode 100644 opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdaterOld.java create mode 100644 opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/UpdateStreamer.java diff --git a/opentripplanner-updater/pom.xml b/opentripplanner-updater/pom.xml index 1571acb65bb..974ffb9aabe 100644 --- a/opentripplanner-updater/pom.xml +++ b/opentripplanner-updater/pom.xml @@ -25,6 +25,11 @@ + + org.zeromq + jzmq + 1.0.0 + ${project.groupId} opentripplanner-routing @@ -38,7 +43,6 @@ org.slf4j slf4j-log4j12 - 1.5.8 junit diff --git a/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/GTFSZMQUpdateStreamer.java b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/GTFSZMQUpdateStreamer.java new file mode 100644 index 00000000000..9833e82f266 --- /dev/null +++ b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/GTFSZMQUpdateStreamer.java @@ -0,0 +1,5 @@ +package org.opentripplanner.updater.stoptime; + +public class GTFSZMQUpdateStreamer { + +} diff --git a/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/KV8ZMQUpdateStreamer.java b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/KV8ZMQUpdateStreamer.java new file mode 100644 index 00000000000..84da0e1e4f0 --- /dev/null +++ b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/KV8ZMQUpdateStreamer.java @@ -0,0 +1,111 @@ +package org.opentripplanner.updater.stoptime; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.zip.GZIPInputStream; + +import org.onebusaway.gtfs.model.AgencyAndId; +import org.opentripplanner.common.CTX; +import org.opentripplanner.routing.core.ServiceDay; +import org.opentripplanner.routing.trippattern.Update; +import org.opentripplanner.routing.trippattern.UpdateList; +import org.zeromq.ZFrame; +import org.zeromq.ZMQ; +import org.zeromq.ZMsg; + +/** StoptimeUpdateStreamer for CTX-encoded Dutch KV8 realtime updates over ZeroMQ */ +public class KV8ZMQUpdateStreamer implements UpdateStreamer { + + private ZMQ.Context context; + private ZMQ.Socket subscriber; + private int count = 0; + private String defaultAgencyId = ""; + private String address = "tcp://node01.post.openov.nl:7817"; + private static String feed = "/GOVI/KV8"; + + public KV8ZMQUpdateStreamer() { + context = ZMQ.context(1); + subscriber = context.socket(ZMQ.SUB); + subscriber.connect(address); + subscriber.subscribe(feed.getBytes()); + } + + public UpdateList getUpdates() { + ZMsg msg = ZMsg.recvMsg(subscriber); + try { + Iterator msgs = msg.iterator(); + msgs.next(); + ArrayList receivedMsgs = new ArrayList(); + while (msgs.hasNext()) { + for (byte b : msgs.next().getData()) { + receivedMsgs.add(b); + } + } + byte[] fullMsg = new byte[receivedMsgs.size()]; + for (int i = 0; i < fullMsg.length; i++) { + fullMsg[i] = receivedMsgs.get(i); + } + InputStream gzipped = new ByteArrayInputStream(fullMsg); + InputStream in = new GZIPInputStream(gzipped); + StringBuffer out = new StringBuffer(); + byte[] b = new byte[4096]; + for (int n; (n = in.read(b)) != -1;) { + out.append(new String(b, 0, n)); + } + return parseCTX(out.toString()); + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } + + public UpdateList parseCTX(String ctxString) { + System.out.println("CTX MSG " + count++); + CTX ctx = new CTX(ctxString); + UpdateList ret = new UpdateList(null); // indicate that updates may have mixed trip IDs + for (int i = 0; i < ctx.rows.size(); i++) { + HashMap row = ctx.rows.get(i); + int arrival = secondsSinceMidnight(row.get("ExpectedArrivalTime")); + int departure = secondsSinceMidnight(row.get("ExpectedDepartureTime")); + Update u = new Update( + kv7TripId(row), + row.get("UserStopCode"), + Integer.parseInt(row.get("UserStopOrderNumber")), + arrival, departure); + ret.addUpdate(u); + } + return ret; + } + + /** no good for DST */ + private int secondsSinceMidnight(String hhmmss) { + String[] time = hhmmss.split(":"); + int hours = Integer.parseInt(time[0]); + int minutes = Integer.parseInt(time[1]); + int seconds = Integer.parseInt(time[2]); + return (hours * 60 + minutes) * 60 + seconds; + } + + /** + * convert KV7 fields into a GTFS trip_id + * trip_ids must be data set unique in GTFS, which is why we use the DataOwnerCode (~=agency_id) + * twice, in the trip_id itself and the enclosing AgencyAndId + */ + public AgencyAndId kv7TripId (HashMap row) { + String tripId = String.format("%s_%s_%s_%s_%s", + row.get("DataOwnerCode"), + row.get("LinePlanningNumber"), + row.get("LocalServiceLevelCode"), + row.get("JourneyNumber"), + row.get("FortifyOrderNumber")); + return new AgencyAndId(row.get("DataOwnerCode"), tripId); + } + + public void setAddress(String address) { + this.address = address; + } + +} diff --git a/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdater.java b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdater.java new file mode 100644 index 00000000000..82f1ff6ef7d --- /dev/null +++ b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdater.java @@ -0,0 +1,84 @@ +package org.opentripplanner.updater.stoptime; + +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.onebusaway.gtfs.model.AgencyAndId; +import org.onebusaway.gtfs.model.Trip; +import org.opentripplanner.routing.edgetype.PatternBoard; +import org.opentripplanner.routing.graph.Graph; +import org.opentripplanner.routing.services.GraphService; +import org.opentripplanner.routing.trippattern.TripPattern; +import org.opentripplanner.routing.trippattern.UpdateList; +import org.opentripplanner.routing.vertextype.TransitStopDepart; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import static org.opentripplanner.common.IterableLibrary.filter; + +/** + * Update OTP stop time tables from some (realtime) source + * @author abyrd + */ +@Component +public class StoptimeUpdater implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(StoptimeUpdater.class); + + public static final int NEVER = Integer.MIN_VALUE; + public static final int UPDATED = Integer.MAX_VALUE; + + @Autowired + private GraphService graphService; + private UpdateStreamer updateStreamer; + private Map patternForTripId; + + @PostConstruct + public void setup () { + Graph g = graphService.getGraph(); + // index trip patterns on trip ids they contain + patternForTripId = new HashMap(); + for (TransitStopDepart tsd : filter(g.getVertices(), TransitStopDepart.class)) { + for (PatternBoard pb : filter(tsd.getOutgoing(), PatternBoard.class)) { + TripPattern pattern = pb.getPattern(); + for (Trip trip : pattern.getTrips()) { + patternForTripId.put(trip.getId(), pattern); + } + } + } +// System.out.println("Indexed trips:"); +// for (AgencyAndId tripId : patternForTripId.keySet()) { +// System.out.println(tripId); +// } + } + + @Override + public void run() { + while (true) { + // blocking call + for (UpdateList ul : updateStreamer.getUpdates().splitByTrip()) { + System.out.println(ul.toString()); + if (! ul.isSane()) { + LOG.debug("incoherent stoptime UpdateList"); + continue; + } + TripPattern pattern = patternForTripId.get(ul.tripId); + if (pattern != null) { + LOG.debug("pattern found for {}", ul.tripId); + pattern.update(ul); + } else { + LOG.debug("pattern not found {}", ul.tripId); + } + } + } + } + + public void setUpdateStreamer (UpdateStreamer us) { + this.updateStreamer = us; + } + +} diff --git a/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdaterOld.java b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdaterOld.java new file mode 100644 index 00000000000..da12552b863 --- /dev/null +++ b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/StoptimeUpdaterOld.java @@ -0,0 +1,147 @@ +package org.opentripplanner.updater.stoptime; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import javax.annotation.PostConstruct; + +import org.onebusaway.gtfs.model.AgencyAndId; +import org.onebusaway.gtfs.model.Trip; +import org.opentripplanner.common.model.T2; +import org.opentripplanner.routing.edgetype.PatternBoard; +import org.opentripplanner.routing.graph.Edge; +import org.opentripplanner.routing.graph.Graph; +import org.opentripplanner.routing.graph.Vertex; +import org.opentripplanner.routing.services.GraphService; +import org.opentripplanner.routing.trippattern.TripPattern; +import org.opentripplanner.routing.trippattern.TripPattern; +import org.opentripplanner.routing.vertextype.TransitStopDepart; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import com.google.transit.realtime.GtfsRealtime; +import com.google.transit.realtime.GtfsRealtime.FeedEntity; +import com.google.transit.realtime.GtfsRealtime.FeedHeader; +import com.google.transit.realtime.GtfsRealtime.FeedMessage; +import com.google.transit.realtime.GtfsRealtime.TripUpdate; +import com.google.transit.realtime.GtfsRealtime.TripUpdate.StopTimeUpdate; + +import static org.opentripplanner.common.IterableLibrary.filter; + +public class StoptimeUpdaterOld { + + @Autowired + private GraphService graphService; + + public static final int NEVER = Integer.MIN_VALUE; + public static final int UPDATED = Integer.MAX_VALUE; + + private static class UpdatedTrip { + TripPattern pattern; + int tripIndex; + int[] scheduledArrivals; + int[] scheduledDepartures; + int[] arrivals; + int[] departures; + int[] updateTimes; // 0 for never + boolean dirty = false; + + public UpdatedTrip (AgencyAndId tripId) { + TripPattern pattern = (TripPattern) patternForTripId.get(tripId); + tripIndex = pattern.getTrips().indexOf(tripId); + scheduledArrivals = pattern.getArrivals(tripIndex); + scheduledDepartures = pattern.getDepartures(tripIndex); + arrivals = scheduledArrivals; + departures = scheduledDepartures; + updateTimes = new int[scheduledDepartures.length]; + Arrays.fill(updateTimes, NEVER); + } + + public synchronized void update(long time, int stopIndex, int arrival, int departure) { + if ( ! dirty) { + arrivals = arrivals.clone(); + departures = departures.clone(); + dirty = true; + } + arrivals[stopIndex] = arrival; + departures[stopIndex] = departure; + updateTimes[stopIndex] = UPDATED; + } + + public synchronized boolean flush() { + if (dirty) { + interpolate(); + pattern.setArrivals(arrivals); + pattern.setDepartures(departures); + dirty = false; + return true; + } + return false; + } + + } + + private static Map patternForTripId = new HashMap(); + private static Map updatedTrips = new HashMap(); + + @PostConstruct + public void setup () { + Graph g = graphService.getGraph(); + // index trip patterns on trip ids they contain + for (TransitStopDepart tsd : filter(g.getVertices(), TransitStopDepart.class)) { + for (PatternBoard pb : filter(tsd.getOutgoing(), PatternBoard.class)) { + TripPattern pattern = pb.getPattern(); + for (Trip trip : pattern.getTrips()) { + patternForTripId.put(trip.getId(), pattern); + } + } + } + } + + public static void main(String[] params) { + File file = new File("/var/otp/data/nl/gtfs-rt.protobuf"); + try { + InputStream is = new FileInputStream(file); + FeedMessage feed = GtfsRealtime.FeedMessage.parseFrom(is); + System.out.println(feed); + + FeedHeader header = feed.getHeader(); + long timestamp = header.getTimestamp(); + + for (FeedEntity entity : feed.getEntityList()) { + System.out.println(entity); +// TripUpdate tUpdate = entity.getTripUpdate(); +// String trip = tUpdate.getTrip().getTripId(); +// AgencyAndId tripId = new AgencyAndId("agency", trip); +// UpdatedTrip uTrip = getOrMakeUpdatedTrip(tripId); +// for (StopTimeUpdate sUpdate : tUpdate.getStopTimeUpdateList()) { +// uTrip.update(time, stopIndex, arrival, departure); +// } + } + + + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + private static UpdatedTrip getOrMakeUpdatedTrip(AgencyAndId tripId) { + UpdatedTrip ut = updatedTrips.get(tripId); + if (ut == null) { + ut = new UpdatedTrip(tripId); + updatedTrips.put(tripId, ut); + } + return ut; + } +} diff --git a/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/UpdateStreamer.java b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/UpdateStreamer.java new file mode 100644 index 00000000000..650857ad7e0 --- /dev/null +++ b/opentripplanner-updater/src/main/java/org/opentripplanner/updater/stoptime/UpdateStreamer.java @@ -0,0 +1,9 @@ +package org.opentripplanner.updater.stoptime; + +import org.opentripplanner.routing.trippattern.UpdateList; + +public interface UpdateStreamer { + + public UpdateList getUpdates(); + +}