Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and optimize worker task scheduling #10417

Merged
merged 9 commits into from
Mar 11, 2021
99 changes: 39 additions & 60 deletions src/source/geojson_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type Actor from '../util/actor.js';
import type {Callback} from '../types/callback.js';
import type {GeoJSON, GeoJSONFeature} from '@mapbox/geojson-types';
import type {GeoJSONSourceSpecification, PromoteIdSpecification} from '../style-spec/types.js';
import type {Cancelable} from '../types/cancelable.js';

/**
* A source containing GeoJSON.
Expand Down Expand Up @@ -79,9 +80,10 @@ class GeoJSONSource extends Evented implements Source {
map: Map;
actor: Actor;
_loaded: boolean;
_coalesce: ?boolean;
_metadataFired: ?boolean;
_collectResourceTiming: boolean;
_resourceTiming: Array<PerformanceResourceTiming>;
_removed: boolean;
_pendingLoad: ?Cancelable;

/**
* @private
Expand All @@ -100,7 +102,6 @@ class GeoJSONSource extends Evented implements Source {
this.tileSize = 512;
this.isTileClipped = true;
this.reparseOverscaled = true;
this._removed = false;
this._loaded = false;

this.actor = dispatcher.getActor();
Expand All @@ -110,7 +111,6 @@ class GeoJSONSource extends Evented implements Source {
this._options = extend({}, options);

this._collectResourceTiming = options.collectResourceTiming;
this._resourceTiming = [];

if (options.maxzoom !== undefined) this.maxzoom = options.maxzoom;
if (options.type) this.type = options.type;
Expand Down Expand Up @@ -147,29 +147,9 @@ class GeoJSONSource extends Evented implements Source {
}, options.workerOptions);
}

load() {
this.fire(new Event('dataloading', {dataType: 'source'}));
this._updateWorkerData((err) => {
if (err) {
this.fire(new ErrorEvent(err));
return;
}

const data: Object = {dataType: 'source', sourceDataType: 'metadata'};
if (this._collectResourceTiming && this._resourceTiming && (this._resourceTiming.length > 0)) {
data.resourceTiming = this._resourceTiming;
this._resourceTiming = [];
}

// although GeoJSON sources contain no metadata, we fire this event to let the SourceCache
// know its ok to start requesting tiles.
this.fire(new Event('data', data));
});
}

onAdd(map: Map) {
this.map = map;
this.load();
this.setData(this._data);
}

/**
Expand All @@ -180,21 +160,7 @@ class GeoJSONSource extends Evented implements Source {
*/
setData(data: GeoJSON | string) {
this._data = data;
this.fire(new Event('dataloading', {dataType: 'source'}));
this._updateWorkerData((err) => {
if (err) {
this.fire(new ErrorEvent(err));
return;
}

const data: Object = {dataType: 'source', sourceDataType: 'content'};
if (this._collectResourceTiming && this._resourceTiming && (this._resourceTiming.length > 0)) {
data.resourceTiming = this._resourceTiming;
this._resourceTiming = [];
}
this.fire(new Event('data', data));
});

this._updateWorkerData();
return this;
}

Expand Down Expand Up @@ -262,7 +228,15 @@ class GeoJSONSource extends Evented implements Source {
* handles loading the geojson data and preparing to serve it up as tiles,
* using geojson-vt or supercluster as appropriate.
*/
_updateWorkerData(callback: Callback<void>) {
_updateWorkerData() {
// if there's an earlier loadData to finish, wait until it finishes and then do another update
if (this._pendingLoad) {
this._coalesce = true;
return;
}

this.fire(new Event('dataloading', {dataType: 'source'}));

this._loaded = false;
const options = extend({}, this.workerOptions);
const data = this._data;
Expand All @@ -276,24 +250,28 @@ class GeoJSONSource extends Evented implements Source {
// target {this.type}.loadData rather than literally geojson.loadData,
// so that other geojson-like source types can easily reuse this
// implementation
this.actor.send(`${this.type}.loadData`, options, (err, result) => {
if (this._removed || (result && result.abandoned)) {
return;
}

this._pendingLoad = this.actor.send(`${this.type}.loadData`, options, (err, result) => {
this._loaded = true;
this._pendingLoad = null;

if (result && result.resourceTiming && result.resourceTiming[this.id])
this._resourceTiming = result.resourceTiming[this.id].slice(0);
// Any `loadData` calls that piled up while we were processing
// this one will get coalesced into a single call when this
// 'coalesce' message is processed.
// We would self-send from the worker if we had access to its
// message queue. Waiting instead for the 'coalesce' to round-trip
// through the foreground just means we're throttling the worker
// to run at a little less than full-throttle.
this.actor.send(`${this.type}.coalesce`, {source: options.source}, null);
callback(err);
if (err) {
this.fire(new ErrorEvent(err));

} else {
// although GeoJSON sources contain no metadata, we fire this event at first
// to let the SourceCache know its ok to start requesting tiles.
const data: Object = {dataType: 'source', sourceDataType: this._metadataFired ? 'content' : 'metadata'};
if (this._collectResourceTiming && result && result.resourceTiming && result.resourceTiming[this.id]) {
data.resourceTiming = result.resourceTiming[this.id];
}
this.fire(new Event('data', data));
this._metadataFired = true;
}

if (this._coalesce) {
this._updateWorkerData();
this._coalesce = false;
}
});
}

Expand Down Expand Up @@ -333,7 +311,7 @@ class GeoJSONSource extends Evented implements Source {
tile.loadVectorData(data, this.map.painter, message === 'reloadTile');

return callback(null);
});
}, undefined, message === 'loadTile');
}

abortTile(tile: Tile) {
Expand All @@ -350,8 +328,9 @@ class GeoJSONSource extends Evented implements Source {
}

onRemove() {
this._removed = true;
this.actor.send('removeSource', {type: this.type, source: this.id});
if (this._pendingLoad) {
this._pendingLoad.cancel();
}
}

serialize() {
Expand Down
81 changes: 1 addition & 80 deletions src/source/geojson_worker_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ function loadGeoJSONTile(params: RequestedTileParameters, callback: LoadVectorDa
});
}

export type SourceState =
| 'Idle' // Source empty or data loaded
| 'Coalescing' // Data finished loading, but discard 'loadData' messages until receiving 'coalesced'
| 'NeedsLoadData'; // 'loadData' received while coalescing, trigger one more 'loadData' on receiving 'coalesced'

/**
* The {@link WorkerSource} implementation that supports {@link GeoJSONSource}.
* This class is designed to be easily reused to support custom source types
Expand All @@ -94,11 +89,6 @@ export type SourceState =
*/
class GeoJSONWorkerSource extends VectorTileWorkerSource {
loadGeoJSON: LoadGeoJSON;
_state: SourceState;
_pendingCallback: Callback<{
resourceTiming?: {[_: string]: Array<PerformanceResourceTiming>},
abandoned?: boolean }>;
_pendingLoadDataParams: LoadGeoJSONParameters;
_geoJSONIndex: GeoJSONIndex

/**
Expand Down Expand Up @@ -131,39 +121,7 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
* @param callback
* @private
*/
loadData(params: LoadGeoJSONParameters, callback: Callback<{
resourceTiming?: {[_: string]: Array<PerformanceResourceTiming>},
abandoned?: boolean }>) {
if (this._pendingCallback) {
// Tell the foreground the previous call has been abandoned
this._pendingCallback(null, {abandoned: true});
}
this._pendingCallback = callback;
this._pendingLoadDataParams = params;

if (this._state &&
this._state !== 'Idle') {
this._state = 'NeedsLoadData';
} else {
this._state = 'Coalescing';
this._loadData();
}
}

/**
* Internal implementation: called directly by `loadData`
* or by `coalesce` using stored parameters.
*/
_loadData() {
if (!this._pendingCallback || !this._pendingLoadDataParams) {
assert(false);
return;
}
const callback = this._pendingCallback;
const params = this._pendingLoadDataParams;
delete this._pendingCallback;
delete this._pendingLoadDataParams;

loadData(params: LoadGeoJSONParameters, callback: Callback<{resourceTiming?: {[_: string]: Array<PerformanceResourceTiming>}}>) {
const requestParam = params && params.request;
const perf = requestParam && requestParam.collectResourceTiming;

Expand Down Expand Up @@ -209,35 +167,6 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
});
}

/**
* While processing `loadData`, we coalesce all further
* `loadData` messages into a single call to _loadData
* that will happen once we've finished processing the
* first message. {@link GeoJSONSource#_updateWorkerData}
* is responsible for sending us the `coalesce` message
* at the time it receives a response from `loadData`
*
* State: Idle
* ↑ |
* 'coalesce' 'loadData'
* | (triggers load)
* | ↓
* State: Coalescing
* ↑ |
* (triggers load) |
* 'coalesce' 'loadData'
* | ↓
* State: NeedsLoadData
*/
coalesce() {
if (this._state === 'Coalescing') {
this._state = 'Idle';
} else if (this._state === 'NeedsLoadData') {
this._state = 'Coalescing';
this._loadData();
}
}

/**
* Implements {@link WorkerSource#reloadTile}.
*
Expand Down Expand Up @@ -289,14 +218,6 @@ class GeoJSONWorkerSource extends VectorTileWorkerSource {
}
}

removeSource(params: {source: string}, callback: Callback<mixed>) {
if (this._pendingCallback) {
// Don't leak callbacks
this._pendingCallback(null, {abandoned: true});
}
callback();
}

getClusterExpansionZoom(params: {clusterId: number}, callback: Callback<number>) {
try {
callback(null, this._geoJSONIndex.getClusterExpansionZoom(params.clusterId));
Expand Down
2 changes: 1 addition & 1 deletion src/source/raster_dem_tile_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class RasterDEMTileSource extends RasterTileSource implements Source {

if (!tile.actor || tile.state === 'expired') {
tile.actor = this.dispatcher.getActor();
tile.actor.send('loadDEMTile', params, done.bind(this));
tile.actor.send('loadDEMTile', params, done.bind(this), undefined, true);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/source/vector_tile_source.js
Original file line number Diff line number Diff line change
Expand Up @@ -227,13 +227,13 @@ class VectorTileSource extends Evented implements Source {
expires: data.expires,
rawData: data.rawData.slice(0)
};
if (tile.actor) tile.actor.send('loadTile', params, done.bind(this));
if (tile.actor) tile.actor.send('loadTile', params, done.bind(this), undefined, true);
}
}, true);
tile.request = {cancel};

} else {
tile.request = tile.actor.send('loadTile', params, done.bind(this));
tile.request = tile.actor.send('loadTile', params, done.bind(this), undefined, true);
}

} else if (tile.state === 'loading') {
Expand Down Expand Up @@ -277,14 +277,14 @@ class VectorTileSource extends Evented implements Source {
delete tile.request;
}
if (tile.actor) {
tile.actor.send('abortTile', {uid: tile.uid, type: this.type, source: this.id}, undefined);
tile.actor.send('abortTile', {uid: tile.uid, type: this.type, source: this.id});
}
}

unloadTile(tile: Tile) {
tile.unloadVectorData();
if (tile.actor) {
tile.actor.send('removeTile', {uid: tile.uid, type: this.type, source: this.id}, undefined);
tile.actor.send('removeTile', {uid: tile.uid, type: this.type, source: this.id});
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/source/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ export default class Worker {
// use a wrapped actor so that we can attach a target mapId param
// to any messages invoked by the WorkerSource
const actor = {
send: (type, data, callback, mustQueue, _, metadata) => {
send: (type, data, callback, _, mustQueue, metadata) => {
this.actor.send(type, data, callback, mapId, mustQueue, metadata);
},
scheduler: this.actor.scheduler
Expand Down
6 changes: 3 additions & 3 deletions src/source/worker_tile.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class WorkerTile {
glyphMap = result;
maybePrepare.call(this);
}
}, undefined, undefined, taskMetadata);
}, undefined, false, taskMetadata);
} else {
glyphMap = {};
}
Expand All @@ -182,7 +182,7 @@ class WorkerTile {
iconMap = result;
maybePrepare.call(this);
}
}, undefined, undefined, taskMetadata);
}, undefined, false, taskMetadata);
} else {
iconMap = {};
}
Expand All @@ -195,7 +195,7 @@ class WorkerTile {
patternMap = result;
maybePrepare.call(this);
}
}, undefined, undefined, taskMetadata);
}, undefined, false, taskMetadata);
} else {
patternMap = {};
}
Expand Down
13 changes: 6 additions & 7 deletions src/util/actor.js
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,12 @@ class Actor {
cancel.cancel();
}
} else {
if (isWorker() || data.mustQueue) {
// In workers, store the tasks that we need to process before actually processing them. This
// is necessary because we want to keep receiving messages, and in particular,
// <cancel> messages. Some tasks may take a while in the worker thread, so before
// executing the next task in our queue, postMessage preempts this and <cancel>
// messages can be processed. We're using a MessageChannel object to get throttle the
// process() flow to one at a time.
if (data.mustQueue || isWorker()) {
// for worker tasks that are often cancelled, such as loadTile, store them before actually
// processing them. This is necessary because we want to keep receiving <cancel> messages.
// Some tasks may take a while in the worker thread, so before executing the next task
// in our queue, postMessage preempts this and <cancel> messages can be processed.
// We're using a MessageChannel object to get throttle the process() flow to one at a time.
const callback = this.callbacks[id];
const metadata = (callback && callback.metadata) || {type: "message"};
this.cancelCallbacks[id] = this.scheduler.add(() => this.processTask(id, data), metadata);
Expand Down
Loading