Skip to content

Commit

Permalink
Streaming stoptime updater for Dutch KV8 CTX over ZeroMQ
Browse files Browse the repository at this point in the history
  • Loading branch information
abyrd committed Jun 21, 2012
1 parent 18ab411 commit 1fdb5a6
Show file tree
Hide file tree
Showing 6 changed files with 361 additions and 1 deletion.
6 changes: 5 additions & 1 deletion opentripplanner-updater/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
</repositories>

<dependencies>
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jzmq</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>opentripplanner-routing</artifactId>
Expand All @@ -38,7 +43,6 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package org.opentripplanner.updater.stoptime;

public class GTFSZMQUpdateStreamer {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package org.opentripplanner.updater.stoptime;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.zip.GZIPInputStream;

import org.onebusaway.gtfs.model.AgencyAndId;
import org.opentripplanner.common.CTX;
import org.opentripplanner.routing.core.ServiceDay;
import org.opentripplanner.routing.trippattern.Update;
import org.opentripplanner.routing.trippattern.UpdateList;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/** StoptimeUpdateStreamer for CTX-encoded Dutch KV8 realtime updates over ZeroMQ */
public class KV8ZMQUpdateStreamer implements UpdateStreamer {

private ZMQ.Context context;
private ZMQ.Socket subscriber;
private int count = 0;
private String defaultAgencyId = "";
private String address = "tcp://node01.post.openov.nl:7817";
private static String feed = "/GOVI/KV8";

public KV8ZMQUpdateStreamer() {
context = ZMQ.context(1);
subscriber = context.socket(ZMQ.SUB);
subscriber.connect(address);
subscriber.subscribe(feed.getBytes());
}

public UpdateList getUpdates() {
ZMsg msg = ZMsg.recvMsg(subscriber);
try {
Iterator<ZFrame> msgs = msg.iterator();
msgs.next();
ArrayList<Byte> receivedMsgs = new ArrayList<Byte>();
while (msgs.hasNext()) {
for (byte b : msgs.next().getData()) {
receivedMsgs.add(b);
}
}
byte[] fullMsg = new byte[receivedMsgs.size()];
for (int i = 0; i < fullMsg.length; i++) {
fullMsg[i] = receivedMsgs.get(i);
}
InputStream gzipped = new ByteArrayInputStream(fullMsg);
InputStream in = new GZIPInputStream(gzipped);
StringBuffer out = new StringBuffer();
byte[] b = new byte[4096];
for (int n; (n = in.read(b)) != -1;) {
out.append(new String(b, 0, n));
}
return parseCTX(out.toString());
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

public UpdateList parseCTX(String ctxString) {
System.out.println("CTX MSG " + count++);
CTX ctx = new CTX(ctxString);
UpdateList ret = new UpdateList(null); // indicate that updates may have mixed trip IDs
for (int i = 0; i < ctx.rows.size(); i++) {
HashMap<String, String> row = ctx.rows.get(i);
int arrival = secondsSinceMidnight(row.get("ExpectedArrivalTime"));
int departure = secondsSinceMidnight(row.get("ExpectedDepartureTime"));
Update u = new Update(
kv7TripId(row),
row.get("UserStopCode"),
Integer.parseInt(row.get("UserStopOrderNumber")),
arrival, departure);
ret.addUpdate(u);
}
return ret;
}

/** no good for DST */
private int secondsSinceMidnight(String hhmmss) {
String[] time = hhmmss.split(":");
int hours = Integer.parseInt(time[0]);
int minutes = Integer.parseInt(time[1]);
int seconds = Integer.parseInt(time[2]);
return (hours * 60 + minutes) * 60 + seconds;
}

/**
* convert KV7 fields into a GTFS trip_id
* trip_ids must be data set unique in GTFS, which is why we use the DataOwnerCode (~=agency_id)
* twice, in the trip_id itself and the enclosing AgencyAndId
*/
public AgencyAndId kv7TripId (HashMap<String, String> row) {
String tripId = String.format("%s_%s_%s_%s_%s",
row.get("DataOwnerCode"),
row.get("LinePlanningNumber"),
row.get("LocalServiceLevelCode"),
row.get("JourneyNumber"),
row.get("FortifyOrderNumber"));
return new AgencyAndId(row.get("DataOwnerCode"), tripId);
}

public void setAddress(String address) {
this.address = address;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package org.opentripplanner.updater.stoptime;

import java.util.HashMap;
import java.util.Map;

import javax.annotation.PostConstruct;

import org.onebusaway.gtfs.model.AgencyAndId;
import org.onebusaway.gtfs.model.Trip;
import org.opentripplanner.routing.edgetype.PatternBoard;
import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.routing.services.GraphService;
import org.opentripplanner.routing.trippattern.TripPattern;
import org.opentripplanner.routing.trippattern.UpdateList;
import org.opentripplanner.routing.vertextype.TransitStopDepart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import static org.opentripplanner.common.IterableLibrary.filter;

/**
* Update OTP stop time tables from some (realtime) source
* @author abyrd
*/
@Component
public class StoptimeUpdater implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(StoptimeUpdater.class);

public static final int NEVER = Integer.MIN_VALUE;
public static final int UPDATED = Integer.MAX_VALUE;

@Autowired
private GraphService graphService;
private UpdateStreamer updateStreamer;
private Map<AgencyAndId, TripPattern> patternForTripId;

@PostConstruct
public void setup () {
Graph g = graphService.getGraph();
// index trip patterns on trip ids they contain
patternForTripId = new HashMap<AgencyAndId, TripPattern>();
for (TransitStopDepart tsd : filter(g.getVertices(), TransitStopDepart.class)) {
for (PatternBoard pb : filter(tsd.getOutgoing(), PatternBoard.class)) {
TripPattern pattern = pb.getPattern();
for (Trip trip : pattern.getTrips()) {
patternForTripId.put(trip.getId(), pattern);
}
}
}
// System.out.println("Indexed trips:");
// for (AgencyAndId tripId : patternForTripId.keySet()) {
// System.out.println(tripId);
// }
}

@Override
public void run() {
while (true) {
// blocking call
for (UpdateList ul : updateStreamer.getUpdates().splitByTrip()) {
System.out.println(ul.toString());
if (! ul.isSane()) {
LOG.debug("incoherent stoptime UpdateList");
continue;
}
TripPattern pattern = patternForTripId.get(ul.tripId);
if (pattern != null) {
LOG.debug("pattern found for {}", ul.tripId);
pattern.update(ul);
} else {
LOG.debug("pattern not found {}", ul.tripId);
}
}
}
}

public void setUpdateStreamer (UpdateStreamer us) {
this.updateStreamer = us;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package org.opentripplanner.updater.stoptime;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import javax.annotation.PostConstruct;

import org.onebusaway.gtfs.model.AgencyAndId;
import org.onebusaway.gtfs.model.Trip;
import org.opentripplanner.common.model.T2;
import org.opentripplanner.routing.edgetype.PatternBoard;
import org.opentripplanner.routing.graph.Edge;
import org.opentripplanner.routing.graph.Graph;
import org.opentripplanner.routing.graph.Vertex;
import org.opentripplanner.routing.services.GraphService;
import org.opentripplanner.routing.trippattern.TripPattern;
import org.opentripplanner.routing.trippattern.TripPattern;
import org.opentripplanner.routing.vertextype.TransitStopDepart;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.google.transit.realtime.GtfsRealtime;
import com.google.transit.realtime.GtfsRealtime.FeedEntity;
import com.google.transit.realtime.GtfsRealtime.FeedHeader;
import com.google.transit.realtime.GtfsRealtime.FeedMessage;
import com.google.transit.realtime.GtfsRealtime.TripUpdate;
import com.google.transit.realtime.GtfsRealtime.TripUpdate.StopTimeUpdate;

import static org.opentripplanner.common.IterableLibrary.filter;

public class StoptimeUpdaterOld {

@Autowired
private GraphService graphService;

public static final int NEVER = Integer.MIN_VALUE;
public static final int UPDATED = Integer.MAX_VALUE;

private static class UpdatedTrip {
TripPattern pattern;
int tripIndex;
int[] scheduledArrivals;
int[] scheduledDepartures;
int[] arrivals;
int[] departures;
int[] updateTimes; // 0 for never
boolean dirty = false;

public UpdatedTrip (AgencyAndId tripId) {
TripPattern pattern = (TripPattern) patternForTripId.get(tripId);
tripIndex = pattern.getTrips().indexOf(tripId);
scheduledArrivals = pattern.getArrivals(tripIndex);
scheduledDepartures = pattern.getDepartures(tripIndex);
arrivals = scheduledArrivals;
departures = scheduledDepartures;
updateTimes = new int[scheduledDepartures.length];
Arrays.fill(updateTimes, NEVER);
}

public synchronized void update(long time, int stopIndex, int arrival, int departure) {
if ( ! dirty) {
arrivals = arrivals.clone();
departures = departures.clone();
dirty = true;
}
arrivals[stopIndex] = arrival;
departures[stopIndex] = departure;
updateTimes[stopIndex] = UPDATED;
}

public synchronized boolean flush() {
if (dirty) {
interpolate();
pattern.setArrivals(arrivals);
pattern.setDepartures(departures);
dirty = false;
return true;
}
return false;
}

}

private static Map<AgencyAndId, TripPattern> patternForTripId = new HashMap<AgencyAndId, TripPattern>();
private static Map<AgencyAndId, UpdatedTrip> updatedTrips = new HashMap<AgencyAndId, UpdatedTrip>();

@PostConstruct
public void setup () {
Graph g = graphService.getGraph();
// index trip patterns on trip ids they contain
for (TransitStopDepart tsd : filter(g.getVertices(), TransitStopDepart.class)) {
for (PatternBoard pb : filter(tsd.getOutgoing(), PatternBoard.class)) {
TripPattern pattern = pb.getPattern();
for (Trip trip : pattern.getTrips()) {
patternForTripId.put(trip.getId(), pattern);
}
}
}
}

public static void main(String[] params) {
File file = new File("/var/otp/data/nl/gtfs-rt.protobuf");
try {
InputStream is = new FileInputStream(file);
FeedMessage feed = GtfsRealtime.FeedMessage.parseFrom(is);
System.out.println(feed);

FeedHeader header = feed.getHeader();
long timestamp = header.getTimestamp();

for (FeedEntity entity : feed.getEntityList()) {
System.out.println(entity);
// TripUpdate tUpdate = entity.getTripUpdate();
// String trip = tUpdate.getTrip().getTripId();
// AgencyAndId tripId = new AgencyAndId("agency", trip);
// UpdatedTrip uTrip = getOrMakeUpdatedTrip(tripId);
// for (StopTimeUpdate sUpdate : tUpdate.getStopTimeUpdateList()) {
// uTrip.update(time, stopIndex, arrival, departure);
// }
}


} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

private static UpdatedTrip getOrMakeUpdatedTrip(AgencyAndId tripId) {
UpdatedTrip ut = updatedTrips.get(tripId);
if (ut == null) {
ut = new UpdatedTrip(tripId);
updatedTrips.put(tripId, ut);
}
return ut;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opentripplanner.updater.stoptime;

import org.opentripplanner.routing.trippattern.UpdateList;

public interface UpdateStreamer {

public UpdateList getUpdates();

}

0 comments on commit 1fdb5a6

Please sign in to comment.