-
Notifications
You must be signed in to change notification settings - Fork 1
/
index.js
100 lines (96 loc) · 2.8 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
var combine = require('stream-combiner2')
var through = require('through2')
var {Readable} = require('readable-stream')
var BlobParser = require('osm-pbf-parser/lib/blob_parser')
var BlobEncoder = require('osm-pbf-parser/lib/blob_encoder')
var BlobDecompressor = require('osm-pbf-parser/lib/decompress')
var PrimitivesParser = require('osm-pbf-parser/lib/primitives')
var dataType = Buffer.from([0x07, 0x4f, 0x53, 0x4d, 0x44, 0x61, 0x74, 0x61])
module.exports = function (opts) {
var primitive = new PrimitivesParser()
if (opts.header) {
primitive._osmheader = opts.header
find()
} else {
getHeader(opts.read, function (err, header) {
if (err) return console.error(err)
primitive._osmheader = header
combineStream.emit('header', header)
find()
})
}
var combineStream = combine.obj([
new BlobParser(),
new BlobDecompressor(),
primitive
])
return combineStream
function find () {
var start, end, pending = 2
findAlignment(opts.read, opts.start, opts.size, function (err, offset) {
if (err) return combineStream.emit('error', err)
start = offset
if (--pending === 0) parse(start, end)
})
findAlignment(opts.read, opts.end, opts.size, function (err, offset) {
if (err) return combineStream.emit('error', err)
end = offset
if (--pending === 0) parse(start, end)
})
}
function parse (start, end) {
var offset = start
var len = 4096
var r = new Readable({
read: function (size) {
if (offset >= end) return r.push(null)
opts.read(offset, Math.min(len, opts.size - offset), function (err, buf) {
if (err) return combineStream.emit('error', err)
offset += len
r.push(buf)
})
}
})
r.pipe(combineStream)
}
}
function getHeader (read, cb) {
var primitive = new PrimitivesParser()
var combineStream = combine.obj([
new BlobParser(),
new BlobDecompressor(),
primitive
])
var stop = false
combineStream.once('error', function (err) {
stop = true
cb(err)
})
combineStream.pipe(through.obj(function (items, enc, next) {
stop = true
cb(null, primitive._osmheader)
}))
var offset = 0
var len = 4096
read(offset, len, function onRead (err, buf) {
if (stop) return
if (err) return cb(err)
combineStream.write(buf)
offset += len
read(offset, len, onRead)
})
}
function findAlignment (read, offset, size, cb) {
var len = 4096
if (offset + len >= size) return cb(null, size)
read(offset, len, function onRead (err, buf) {
if (err) return cb(err)
var index = buf.indexOf(dataType)
if (index < 5) {
offset += len - dataType.length
if (offset + len >= size) return cb(null, size)
return read(offset, len, onRead)
}
cb(null, offset + index - 5)
})
}