-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathbag.js
133 lines (113 loc) Β· 4.65 KB
/
bag.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Copyright (c) 2018-present, Cruise LLC
// This source code is licensed under the Apache License, Version 2.0,
// found in the LICENSE file in the root directory of this source tree.
// You may not use this file except in compliance with the License.
// @flow
import BagReader, { type Decompress } from "./BagReader";
import { MessageReader } from "./MessageReader";
import ReadResult from "./ReadResult";
import { BagHeader, ChunkInfo, Connection, MessageData } from "./record";
import type { Time } from "./types";
import * as TimeUtil from "./TimeUtil";
import { parseMessageDefinition } from "./parseMessageDefinition";
export type ReadOptions = {|
decompress?: Decompress,
noParse?: boolean,
topics?: string[],
startTime?: Time,
endTime?: Time,
freeze?: ?boolean,
|};
// the high level rosbag interface
// create a new bag by calling:
// `const bag = await Bag.open('./path-to-file.bag')` in node or
// `const bag = await Bag.open(files[0])` in the browser
//
// after that you can consume messages by calling
// `await bag.readMessages({ topics: ['/foo'] },
// (result) => console.log(result.topic, result.message))`
export default class Bag {
reader: BagReader;
header: BagHeader;
connections: { [conn: number]: Connection };
chunkInfos: ChunkInfo[];
startTime: ?Time;
endTime: ?Time;
// you can optionally create a bag manually passing in a bagReader instance
constructor(bagReader: BagReader) {
this.reader = bagReader;
}
// eslint-disable-next-line no-unused-vars
static open = (file: File | string) => {
throw new Error(
"This method should have been overridden based on the environment. Make sure you are correctly importing the node or web version of Bag."
);
};
// if the bag is manually created with the constructor, you must call `await open()` on the bag
// generally this is called for you if you're using `const bag = await Bag.open()`
async open() {
this.header = await this.reader.readHeaderAsync();
const { connectionCount, chunkCount, indexPosition } = this.header;
const result = await this.reader.readConnectionsAndChunkInfoAsync(indexPosition, connectionCount, chunkCount);
this.connections = {};
result.connections.forEach((connection) => {
this.connections[connection.conn] = connection;
});
this.chunkInfos = result.chunkInfos;
if (chunkCount > 0) {
// Get the earliest startTime among all chunks
this.startTime = this.chunkInfos
.map((x) => x.startTime)
.reduce((prev, current) => (TimeUtil.compare(prev, current) <= 0 ? prev : current));
// Get the latest endTime among all chunks
this.endTime = this.chunkInfos
.map((x) => x.endTime)
.reduce((prev, current) => (TimeUtil.compare(prev, current) > 0 ? prev : current));
}
}
async readMessages(opts: ReadOptions, callback: (msg: ReadResult<any>) => void) {
const connections = this.connections;
const startTime = opts.startTime || { sec: 0, nsec: 0 };
const endTime = opts.endTime || { sec: Number.MAX_VALUE, nsec: Number.MAX_VALUE };
const topics =
opts.topics ||
Object.keys(connections).map((id: any) => {
return connections[id].topic;
});
const filteredConnections = Object.keys(connections)
.filter((id: any) => {
return topics.indexOf(connections[id].topic) !== -1;
})
.map((id) => +id);
const { decompress = {} } = opts;
// filter chunks to those which fall within the time range we're attempting to read
const chunkInfos = this.chunkInfos.filter((info) => {
return TimeUtil.compare(info.startTime, endTime) <= 0 && TimeUtil.compare(startTime, info.endTime) <= 0;
});
function parseMsg(msg: MessageData, chunkOffset: number): ReadResult<any> {
const connection = connections[msg.conn];
const { topic, type } = connection;
const { data, time: timestamp } = msg;
let message = null;
if (!opts.noParse) {
// lazily create a reader for this connection if it doesn't exist
connection.reader =
connection.reader ||
new MessageReader(parseMessageDefinition(connection.messageDefinition, type), type, { freeze: opts.freeze });
message = connection.reader.readMessage(data);
}
return new ReadResult(topic, message, timestamp, data, chunkOffset, chunkInfos.length, opts.freeze);
}
for (let i = 0; i < chunkInfos.length; i++) {
const info = chunkInfos[i];
const messages = await this.reader.readChunkMessagesAsync(
info,
filteredConnections,
startTime,
endTime,
decompress
);
messages.forEach((msg) => callback(parseMsg(msg, i)));
}
}
}