Skip to content

Commit

Permalink
Custom PBF parsing (#82)
Browse files Browse the repository at this point in the history
Pull in pbf parsing to give more control over threading model and performance.
  • Loading branch information
msbarry authored Mar 1, 2022
1 parent 7e8596b commit 2f05f94
Show file tree
Hide file tree
Showing 34 changed files with 1,840 additions and 726 deletions.
2 changes: 2 additions & 0 deletions NOTICE.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The `planetiler-core` module includes the following software:
- `VectorTileEncoder`
from [java-vector-tile](https://github.com/ElectronicChartCentre/java-vector-tile) (Apache license)
- `Imposm3Parsers` from [imposm3](https://github.com/omniscale/imposm3) (Apache license)
- `PbfDecoder` from [osmosis](https://github.com/openstreetmap/osmosis) (Public Domain)
- `PbfFieldDecoder` from [osmosis](https://github.com/openstreetmap/osmosis) (Public Domain)

Additionally, the `planetiler-basemap` module is based on [OpenMapTiles](https://github.com/openmaptiles/openmaptiles):

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package com.onthegomap.planetiler.benchmarks;

import com.graphhopper.reader.ReaderElementUtils;
import com.graphhopper.reader.ReaderNode;
import com.graphhopper.reader.ReaderRelation;
import com.graphhopper.reader.ReaderWay;
import com.onthegomap.planetiler.basemap.BasemapProfile;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.expression.MultiExpression;
import com.onthegomap.planetiler.reader.SourceFeature;
import com.onthegomap.planetiler.reader.osm.OsmElement;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.stats.ProgressLoggers;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.util.Translations;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -25,47 +22,53 @@
*/
public class BasemapMapping {

public static void main(String[] args) throws IOException {
public static void main(String[] args) throws Exception {
var profile = new BasemapProfile(Translations.nullProvider(List.of()), PlanetilerConfig.defaults(),
Stats.inMemory());
var random = new Random(0);
var input = new OsmInputFile(Path.of("data", "sources", "north-america_us_massachusetts.pbf"));
List<SourceFeature> inputs = new ArrayList<>();
input.readTo(readerElem -> {
if (random.nextDouble() < 0.2) {
if (inputs.size() % 1_000_000 == 0) {
System.err.println(inputs.size());
}
var props = ReaderElementUtils.getTags(readerElem);
inputs.add(new SourceFeature(props, "", "", null, readerElem.getId()) {
@Override
public Geometry latLonGeometry() {
return null;
}
var logger = ProgressLoggers.create()
.addRateCounter("inputs", inputs::size)
.addProcessStats();
try (var reader = OsmInputFile.readFrom(Path.of("data", "sources", "massachusetts.osm.pbf"))) {
reader.forEachBlock(block -> {
for (var element : block.decodeElements()) {
if (random.nextDouble() < 0.2) {
if (inputs.size() % 1_000_000 == 0) {
logger.log();
}
inputs.add(new SourceFeature(element.tags(), "", "", null, element.id()) {
@Override
public Geometry latLonGeometry() {
return null;
}

@Override
public Geometry worldGeometry() {
return null;
}
@Override
public Geometry worldGeometry() {
return null;
}

@Override
public boolean isPoint() {
return readerElem instanceof ReaderNode;
}
@Override
public boolean isPoint() {
return element instanceof OsmElement.Node;
}

@Override
public boolean canBePolygon() {
return readerElem instanceof ReaderWay || readerElem instanceof ReaderRelation;
}
@Override
public boolean canBePolygon() {
return element instanceof OsmElement.Way || element instanceof OsmElement.Relation;
}

@Override
public boolean canBeLine() {
return readerElem instanceof ReaderWay;
@Override
public boolean canBeLine() {
return element instanceof OsmElement.Way;
}
});
}
});
}
}, "reader", 3);
}
});
}

logger.log();
System.err.println("read " + inputs.size() + " elems");

long startStart = System.nanoTime();
Expand All @@ -79,8 +82,10 @@ public boolean canBeLine() {
}
if (count == 0) {
startStart = System.nanoTime();
logger.log();
System.err.println("finished warmup");
} else {
logger.log();
System.err.println(
"took:" + Duration.ofNanos(System.nanoTime() - start).toMillis() + "ms found:" + i + " avg:" + (Duration
.ofNanos(System.nanoTime() - startStart).toMillis() / count) + "ms");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.onthegomap.planetiler.benchmarks;

import com.onthegomap.planetiler.Profile;
import com.onthegomap.planetiler.collection.LongLongMap;
import com.onthegomap.planetiler.config.Arguments;
import com.onthegomap.planetiler.config.PlanetilerConfig;
import com.onthegomap.planetiler.reader.osm.OsmInputFile;
import com.onthegomap.planetiler.reader.osm.OsmReader;
import com.onthegomap.planetiler.stats.Stats;
import com.onthegomap.planetiler.stats.Timer;
import java.io.IOException;
import java.nio.file.Path;

public class BenchmarkOsmRead {

public static void main(String[] args) throws IOException {
OsmInputFile file = new OsmInputFile(Path.of("data/sources/northeast.osm.pbf"), true);
var profile = new Profile.NullProfile();
var stats = Stats.inMemory();
var config = PlanetilerConfig.from(Arguments.of());

while (true) {
Timer timer = Timer.start();
try (
var nodes = LongLongMap.noop();
var reader = new OsmReader("osm", file, nodes, profile, stats)
) {
reader.pass1(config);
}
System.err.println(timer.stop());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class LocalCounter {
}
LocalCounter counter = new LocalCounter();
ProgressLoggers loggers = ProgressLoggers.create()
.addRatePercentCounter("entries", entries, () -> counter.count)
.addRatePercentCounter("entries", entries, () -> counter.count, true)
.newLine()
.addProcessStats();
AtomicReference<String> writeRate = new AtomicReference<>();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public Planetiler addOsmSource(String name, Path defaultPath, String defaultUrl)
throw new IllegalArgumentException("Currently only one OSM input file is supported");
}
Path path = getPath(name, "OSM input file", defaultPath, defaultUrl);
var thisInputFile = new OsmInputFile(path);
var thisInputFile = new OsmInputFile(path, config.osmLazyReads());
osmInputFile = thisInputFile;
return appendStage(new Stage(
name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@ public interface IterableOnce<T> extends Iterable<T>, Supplier<T> {
@Override
default Iterator<T> iterator() {
return new Iterator<>() {
T next = get();
T next = null;
boolean stale = true;

private void advance() {
if (stale) {
next = get();
stale = false;
}
}

@Override
public boolean hasNext() {
advance();
return next != null;
}

@Override
public T next() {
T result = next;
next = get();
return result;
advance();
stale = true;
return next;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class SortedTable implements LongLongMap {
private final AppendStore.Longs keys;
private final AppendStore.Longs values;
private long lastChunk = -1;
private long lastKey = -1;

public SortedTable(AppendStore.Longs keys, AppendStore.Longs values) {
this.keys = keys;
Expand All @@ -165,6 +166,10 @@ public SortedTable(AppendStore.Longs keys, AppendStore.Longs values) {

@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = keys.size();
long chunk = key >>> 8;
if (chunk != lastChunk) {
Expand Down Expand Up @@ -236,13 +241,18 @@ class SparseArray implements LongLongMap {
private final AppendStore.Longs values;
private int lastChunk = -1;
private int lastOffset = 0;
private long lastKey = -1;

public SparseArray(AppendStore.Longs values) {
this.values = values;
}

@Override
public void put(long key, long value) {
if (key <= lastKey) {
throw new IllegalArgumentException("Nodes must be sorted ascending by ID, " + key + " came after " + lastKey);
}
lastKey = key;
long idx = values.size();
int chunk = (int) (key >>> 8);
int offset = (int) (key & 255);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public record PlanetilerConfig(
double minFeatureSizeAtMaxZoom,
double minFeatureSizeBelowMaxZoom,
double simplifyToleranceAtMaxZoom,
double simplifyToleranceBelowMaxZoom
double simplifyToleranceBelowMaxZoom,
boolean osmLazyReads
) {

public static final int MIN_MINZOOM = 0;
Expand Down Expand Up @@ -86,7 +87,10 @@ public static PlanetilerConfig from(Arguments arguments) {
256d / 4096),
arguments.getDouble("simplify_tolerance",
"Default value for the tile pixel tolerance to use when simplifying features below the maximum zoom level",
0.1d)
0.1d),
arguments.getBoolean("osm_lazy_reads",
"Read OSM blocks from disk in worker threads",
false)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public static void writeOutput(FeatureGroup features, Mbtiles output, DiskBacked
*/
WorkQueue<TileBatch> writerQueue = new WorkQueue<>("mbtiles_writer_queue", queueSize, 1, stats);
encodeBranch = pipeline
.<TileBatch>fromGenerator("reader", next -> {
.<TileBatch>fromGenerator("read", next -> {
writer.readFeaturesAndBatch(batch -> {
next.accept(batch);
writerQueue.accept(batch); // also send immediately to writer
Expand All @@ -130,29 +130,29 @@ public static void writeOutput(FeatureGroup features, Mbtiles output, DiskBacked
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
}, 1)
.addBuffer("reader_queue", queueSize)
.sinkTo("encoder", config.threads(), writer::tileEncoderSink);
.sinkTo("encode", config.threads(), writer::tileEncoderSink);

// the tile writer will wait on the result of each batch to ensure tiles are written in order
writeBranch = pipeline.readFromQueue(writerQueue)
// use only 1 thread since tileWriter needs to be single-threaded
.sinkTo("writer", 1, writer::tileWriter);
.sinkTo("write", 1, writer::tileWriter);
} else {
/*
* If we don't need to emit tiles in order, just send the features to the encoder, and when it finishes with
* a tile send that to the writer.
*/
encodeBranch = pipeline
// use only 1 thread since readFeaturesAndBatch needs to be single-threaded
.fromGenerator("reader", writer::readFeaturesAndBatch, 1)
.fromGenerator("read", writer::readFeaturesAndBatch, 1)
.addBuffer("reader_queue", queueSize)
.addWorker("encoder", config.threads(), writer::tileEncoder)
.addBuffer("writer_queue", queueSize)
// use only 1 thread since tileWriter needs to be single-threaded
.sinkTo("writer", 1, writer::tileWriter);
.sinkTo("write", 1, writer::tileWriter);
}

var loggers = ProgressLoggers.create()
.addRatePercentCounter("features", features.numFeaturesWritten(), writer.featuresProcessed)
.addRatePercentCounter("features", features.numFeaturesWritten(), writer.featuresProcessed, true)
.addFileSize(features)
.addRateCounter("tiles", writer::tilesEmitted)
.addFileSize(fileSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public final void process(FeatureGroup writer, PlanetilerConfig config) {
});

var loggers = ProgressLoggers.create()
.addRatePercentCounter("read", featureCount, featuresRead)
.addRatePercentCounter("read", featureCount, featuresRead, true)
.addRateCounter("write", featuresWritten)
.addFileSize(writer)
.newLine()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ default boolean hasTag(String key, Object value1, Object value2) {
return value1.equals(actual) || value2.equals(actual);
}

/** Returns true if the value for {@code key} is {@code value1} or {@code value2}. */
default boolean hasTag(String key, Object... values) {
Object actual = getTag(key);
for (Object value : values) {
if (value.equals(actual)) {
return true;
}
}
return false;
}

/** Returns the {@link Object#toString()} value for {@code key} or {@code null} if not present. */
default String getString(String key) {
Object value = getTag(key);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.onthegomap.planetiler.reader.osm;

import java.io.Closeable;
import java.util.function.Consumer;

/**
* An osm.pbf input file that iterates through {@link Block Blocks} of raw bytes that can be decompressed/parsed in
* worker threads using {@link Block#decodeElements()}.
*/
public interface OsmBlockSource extends Closeable {

/** Calls {@code consumer} for each block from the input file sequentially in a single thread. */
void forEachBlock(Consumer<Block> consumer);

@Override
default void close() {
}

/**
* An individual block of raw bytes from an osm.pbf file that can be decompressed/parsed with {@link
* #decodeElements()}.
*/
interface Block {

/** Create a fake block from existing elements - useful for tests. */
static Block of(Iterable<? extends OsmElement> items) {
return () -> items;
}

/** Decompress and parse OSM elements from this block. */
Iterable<? extends OsmElement> decodeElements();

default int id() {
return -1;
}
}
}
Loading

0 comments on commit 2f05f94

Please sign in to comment.