-
Notifications
You must be signed in to change notification settings - Fork 26
/
log.py
561 lines (464 loc) · 21 KB
/
log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
import os
import pathlib
import fnmatch
import tempfile
import argparse
from csv import DictReader
from textwrap import dedent
from itertools import cycle
from bisect import bisect_right
from collections import OrderedDict
from typing import Callable, Tuple, Dict, Optional, Any, Sequence, Union
import h5py
import tree
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from typing_extensions import TypedDict
from . import core as jiminy
from .state import State
FieldNested = Union[
Dict[str, 'FieldNested'], Sequence['FieldNested'],
str] # type: ignore[misc]
class TrajectoryDataType(TypedDict, total=False):
# List of State objects of increasing time.
evolution_robot: Sequence[State]
# Jiminy robot. None if omitted.
robot: Optional[jiminy.Robot]
# Whether to use theoretical or actual model
use_theoretical_model: bool
def _is_log_binary(fullpath: str) -> bool:
"""Return True if the given fullpath appears to be binary log file.
File is considered to be binary log if it contains a NULL byte.
See https://stackoverflow.com/a/11301631/4820605 for reference.
"""
try:
with open(fullpath, 'rb') as f:
for block in f:
if b'\0' in block:
return True
except IOError:
pass
return False
def read_log(fullpath: str,
file_format: Optional[str] = None
) -> Tuple[Dict[str, np.ndarray], Dict[str, str]]:
"""Read a logfile from jiminy.
This function supports both text (csv) and binary log.
:param fullpath: Name of the file to load.
:param file_format: Name of the file to load.
:returns: Pair of dictionaries containing respectively the logged values,
and the constants.
"""
# Handling of file file_format
if file_format is None:
file_ext = pathlib.Path(fullpath).suffix
if file_ext == '.data':
file_format = 'binary'
elif file_ext == '.csv' or file_ext == '.txt':
file_format = 'csv'
elif file_ext == '.h5' or file_ext == '.hdf5':
file_format = 'hdf5'
if file_format is None and not _is_log_binary(fullpath):
file_format = 'csv'
if file_format is None:
raise ValueError(
"Impossible to determine the file format automatically. Please "
"specify it manually.")
if file_format not in ['binary', 'csv', 'hdf5']:
raise ValueError(
"Invalid 'file_format' argument. It must be either 'binary', "
"'csv' or 'hdf5'.")
if file_format == 'binary':
# Read binary file using C++ parser.
data_dict, const_dict = jiminy.Engine.read_log_binary(fullpath)
elif file_format == 'csv':
# Read text csv file.
const_dict = {}
with open(fullpath, 'r') as log:
const_str = next(log).split(', ')
for c in const_str:
# Extract the name and value of the constant.
# Note that newline is stripped at the end of the value.
name, value = c.split('=')
value = value.strip('\n')
# Gather the constants in a dictionary.
const_dict[name] = value.strip('\n')
# Read data from the log file, skipping the first line, namely the
# constants definition.
data = {}
reader = DictReader(log)
for key, value in reader.__next__().items():
data[key] = [value]
for row in reader:
for key, value in row.items():
data[key].append(value)
for key, value in data.items():
data[key] = np.array(value, dtype=np.float64)
# Convert every element to array to provide same API as the C++ parser,
# removing spaces present before the keys.
data_dict = {k.strip(): np.array(v) for k, v in data.items()}
elif file_format == 'hdf5':
def parse_constant(key: str, value: str) -> Any:
"""Process some particular constants based on its name or type.
"""
if isinstance(value, bytes):
return value.decode()
return value
with h5py.File(fullpath, 'r') as f:
# Load constants
const_dict = {}
for key, dataset in f['constants'].items():
const_dict[key] = parse_constant(key, dataset[()])
for key, value in dict(f['constants'].attrs).items():
const_dict[key] = parse_constant(key, value)
# Extract time
time = f['Global.Time'][()] / f['Global.Time'].attrs['unit']
# Load variables (1D time-series)
data_dict = {'Global.Time': time}
for key, value in f['variables'].items():
data_dict[key] = value['value'][()]
return data_dict, const_dict
def extract_data_from_log(log_data: Dict[str, np.ndarray],
fieldnames: FieldNested,
namespace: Optional[str] = 'HighLevelController',
*,
as_dict: bool = False) -> Optional[Union[
Tuple[Optional[np.ndarray], ...],
Dict[str, Optional[np.ndarray]]]]:
"""Extract values associated with a set of fieldnames in a specific
namespace.
:param log_data: Data from the log file, in a dictionnary.
:param fieldnames: Structured fieldnames.
:param namespace: Namespace of the fieldnames. None to disable.
Optional: 'HighLevelController' by default.
:param keep_structure: Whether or not to return a dictionary mapping
flattened fieldnames to values.
Optional: True by default.
:returns:
`np.ndarray` or None for each fieldname individually depending if it is
found or not. It
"""
# Key are the concatenation of the path and value
keys = [
".".join(map(str, filter(
lambda key: isinstance(key, str), (*fieldname_path, fieldname))))
for fieldname_path, fieldname in tree.flatten_with_path(fieldnames)]
# Extract value from log if it exists
values = [
log_data.get(".".join(filter(None, (namespace, key))), None)
for key in keys]
# Return None if no value was found
if not values or all(elem is None for elem in values):
return None
if as_dict:
# Return flat mapping from fieldnameswithout prefix to scalar values
values = OrderedDict(zip(keys, values))
return values
def extract_trajectory_data_from_log(log_data: Dict[str, np.ndarray],
robot: jiminy.Model
) -> TrajectoryDataType:
"""Extract the minimal required information from raw log data in order to
replay the simulation in a viewer.
.. note::
It extracts the required data for replay, namely temporal evolution of:
- robot configuration: to display of the robot on the scene,
- robot velocity: to update velocity-dependent markers such as DCM,
- external forces: to update force-dependent markers.
:param log_data: Data from the log file, in a dictionnary.
:param robot: Jiminy robot.
:returns: Trajectory dictionary. The actual trajectory corresponds to the
field "evolution_robot" and it is a list of State object. The
other fields are additional information.
"""
times = log_data["Global.Time"]
try:
# Extract the joint positions, velocities and external forces over time
positions = np.stack([
log_data.get(".".join(("HighLevelController", field)), [])
for field in robot.logfile_position_headers], axis=-1)
velocities = np.stack([
log_data.get(".".join(("HighLevelController", field)), [])
for field in robot.logfile_velocity_headers], axis=-1)
forces = np.stack([
log_data.get(".".join(("HighLevelController", field)), [])
for field in robot.logfile_f_external_headers], axis=-1)
# Determine available data
has_positions = len(positions) > 0
has_velocities = len(velocities) > 0
has_forces = len(forces) > 0
# Determine whether to use the theoretical or flexible model
use_theoretical_model = not robot.is_flexible
# Create state sequence
evolution_robot = []
q, v, f_ext = None, None, None
for i, t in enumerate(times):
if has_positions:
q = positions[i]
if has_velocities:
v = velocities[i]
if has_forces:
f_ext = [forces[i, (6 * (j - 1)):(6 * j)]
for j in range(1, robot.pinocchio_model.njoints)]
evolution_robot.append(State(t=t, q=q, v=v, f_ext=f_ext))
traj_data = {"evolution_robot": evolution_robot,
"robot": robot,
"use_theoretical_model": use_theoretical_model}
except KeyError: # The current options are inconsistent with log data
# Toggle flexibilities
model_options = robot.get_model_options()
dyn_options = model_options["dynamics"]
dyn_options["enableFlexibleModel"] = not robot.is_flexible
robot.set_model_options(model_options)
# Get viewer data
traj_data = extract_trajectory_data_from_log(log_data, robot)
# Restore back flexibilities
dyn_options["enableFlexibleModel"] = not robot.is_flexible
robot.set_model_options(model_options)
return traj_data
def build_robot_from_log(
log_constants: Dict[str, str],
mesh_package_dirs: Union[str, Sequence[str]] = ()) -> jiminy.Robot:
"""Build robot from log constants.
.. note::
model options and `robot.pinocchio_model` are guarentee to be the same
as during the simulation until the next call to `reset` method.
.. note::
It returns a valid and fully initialized robot, that can be used to
perform new simulation if added to a Jiminy Engine, but the original
controller is lost.
.. warning::
It does ot require the original URDF file to exist, but the original
mesh paths (if any) must be valid since they are not bundle in the log
archive for now.
:param log_file: Path of the simulation log file, in any format.
:param mesh_package_dirs: Prepend custom mesh package seach path
directories to the ones provided by log file. It
may be necessary to specify it to read log
generated on a different environment.
:returns: Reconstructed robot, and parsed log data as returned by
`jiminy_py.log.read_log` method.
"""
# Make sure provided 'mesh_package_dirs' is a list
mesh_package_dirs = list(mesh_package_dirs)
# Extract robot info
pinocchio_model_str = log_constants[
"HighLevelController.pinocchio_model"]
urdf_file = log_constants["HighLevelController.urdf_file"]
has_freeflyer = int(log_constants["HighLevelController.has_freeflyer"])
if "HighLevelController.mesh_package_dirs" in log_constants.keys():
mesh_package_dirs += log_constants[
"HighLevelController.mesh_package_dirs"].split(";")
all_options = jiminy.load_config_json_string(
log_constants["HighLevelController.options"])
# Create temporary URDF file
fd, urdf_path = tempfile.mkstemp(suffix=".urdf")
os.write(fd, urdf_file.encode())
os.close(fd)
# Build robot
robot = jiminy.Robot()
robot.initialize(urdf_path, has_freeflyer, mesh_package_dirs)
robot.set_options(all_options["system"]["robot"])
robot.pinocchio_model.loadFromString(pinocchio_model_str)
robot.pinocchio_data.loadFromString(
robot.pinocchio_model.createData().saveToString())
return robot
def emulate_sensors_data_from_log(log_data: Dict[str, np.ndarray],
robot: jiminy.Model
) -> Callable[[float], None]:
"""Helper to make it easy to emulate sensor data update based on log data.
.. note::
It returns an update_hook that can forwarding the `Viewer.replay` to
display sensor information such as contact forces for instance.
:param log_data: Data from the log file, in a dictionnary.
:param robot: Jiminy robot.
:returns: Callable taking update time in argument and returning nothing.
Note that it does not through an exception if out-of-range, but
rather clip to desired time to the available data range.
"""
# Extract time from log
times = log_data['Global.Time']
# Filter sensors whose data is available
sensors_set, sensors_log = [], []
for sensor_type, sensors_names in robot.sensors_names.items():
sensor_fieldnames = getattr(jiminy, sensor_type).fieldnames
for name in sensors_names:
sensor = robot.get_sensor(sensor_type, name)
log_fieldnames = [
'.'.join((sensor.name, field)) for field in sensor_fieldnames]
if log_fieldnames[0] in log_data.keys():
sensor_log = np.stack([
log_data[field] for field in log_fieldnames], axis=-1)
sensors_set.append(sensor)
sensors_log.append(sensor_log)
def update_hook(t: float, q: np.ndarray, v: np.ndarray) -> None:
nonlocal times, sensors_set, sensors_log
# Get surrounding indices in log data
i = bisect_right(times, t)
i_prev, i_next = max(i - 1, 0), min(i, len(times) - 1)
# Early return if no more data is available
if i_next == i_prev:
return
# Compute current time ratio
ratio = (t - times[i_prev]) / (times[i_next] - times[i_prev])
# Update sensors data
for sensor, sensor_log, in zip(sensors_set, sensors_log):
value_prev, value_next = sensor_log[i_prev], sensor_log[i_next]
value = value_prev + (value_next - value_prev) * ratio
sensor.data = value
return update_hook
def plot_log():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter, description=dedent("""
Plot data from a jiminy log file using matplotlib.
Specify a list of fields to plot, separated by a colon for \
plotting on the same subplot.
Example: h1 h2:h3:h4 generates two subplots, one with h1, one \
with h2, h3, and h4.
Wildcard token '*' can be used. In such a case:
- If *h2* matches several fields:
each field will be plotted individually in subplots.
- If :*h2* or :*h2*:*h3*:*h4* matches several fields:
each field will be plotted in the same subplot.
- If *h2*:*h3*:*h4* matches several fields:
each match of h2, h3, and h4 will be plotted jointly in subplots.
Note that if the number of matches for h2, h3, h4 differs, only \
the minimum number will be plotted.
Enter no plot command (only the file name) to view the list of \
fields available inside the file."
"""))
parser.add_argument("input", help="Input logfile.")
parser.add_argument(
"-c", "--compare", type=str, default=None, help=dedent("""
Colon-separated list of comparison log files.
The same data as the original log will be plotted in the same \
subplot, with different line styes. These logfiles must be of the \
same length and contain the same header as the original log file.
Note that you can click on the figure top legend to show / hide \
data from specific files.
"""))
main_arguments, plotting_commands = parser.parse_known_args()
# Load log file
log_data, _ = read_log(main_arguments.input)
# If no plotting commands, display the list of headers instead
if len(plotting_commands) == 0:
print("Available data:", *map(
lambda s: f"- {s}", log_data.keys()), sep="\n")
exit(0)
# Load comparision logs, if any.
compare_data = OrderedDict()
if main_arguments.compare is not None:
for fullpath in main_arguments.compare.split(':'):
compare_data[fullpath], _ = read_log(fullpath)
# Define linestyle cycle that will be used for comparison logs
linestyles = ["--", "-.", ":"]
# Parse plotting arguments.
plotted_elements = []
for cmd in plotting_commands:
# Check that the command is valid, i.e. that all elements exits.
# If it is the case, add it to the list.
same_subplot = (cmd[0] == ':')
headers = cmd.strip(':').split(':')
# Expand each element according to wildcard expression
matching_headers = []
for h in headers:
match = sorted(fnmatch.filter(log_data.keys(), h))
if len(match) > 0:
matching_headers.append(match)
else:
print(f"No matching headers for expression {h}")
if len(matching_headers) == 0:
continue
# Compute number of subplots
if same_subplot:
plotted_elements.append([
e for l_sub in matching_headers for e in l_sub])
else:
n_subplots = min([len(header) for header in matching_headers])
for i in range(n_subplots):
plotted_elements.append(
[header[i] for header in matching_headers])
# Create figure.
n_plot = len(plotted_elements)
if n_plot == 0:
print("Nothing to plot. Exiting...")
return
fig = plt.figure()
# Create subplots, arranging them in a rectangular fashion.
# Do not allow for n_cols to be more than n_rows + 2.
n_cols = n_plot
n_rows = 1
while n_cols > n_rows + 2:
n_rows = n_rows + 1
n_cols = np.ceil(n_plot / (1.0 * n_rows))
axes = []
for i in range(n_plot):
ax = fig.add_subplot(int(n_rows), int(n_cols), i+1)
if i > 0:
ax.get_shared_x_axes().join(axes[0], ax)
axes.append(ax)
# Store lines in dictionnary {file_name: plotted lines}, to enable to
# toggle individually the visibility the data related to each of them.
main_name = os.path.basename(main_arguments.input)
plotted_lines = {main_name: []}
for c in compare_data:
plotted_lines[os.path.basename(c)] = []
plt.gcf().canvas.set_window_title(main_arguments.input)
t = log_data['Global.Time']
# Plot each element.
for ax, plotted_elem in zip(axes, plotted_elements):
for name in plotted_elem:
line = ax.step(t, log_data[name], label=name)
plotted_lines[main_name].append(line[0])
linecycler = cycle(linestyles)
for c in compare_data:
line = ax.step(compare_data[c]['Global.Time'],
compare_data[c][name],
next(linecycler),
color=line[0].get_color())
plotted_lines[os.path.basename(c)].append(line[0])
# Add legend and grid for each plot.
for ax in axes:
ax.set_xlabel('time (s)')
ax.legend()
ax.grid()
# If a compare plot is present, add overall legend specifying line types
plt.subplots_adjust(
bottom=0.05,
top=0.98,
left=0.06,
right=0.98,
wspace=0.1,
hspace=0.12)
if len(compare_data) > 0:
linecycler = cycle(linestyles)
# Dictionnary: line in legend to log name
legend_lines = {Line2D([0], [0], color='k'): main_name}
for data_str in compare_data:
legend_line_object = Line2D(
[0], [0], color='k', linestyle=next(linecycler))
legend_lines[legend_line_object] = os.path.basename(data_str)
legend = fig.legend(
legend_lines.keys(), legend_lines.values(), loc='upper center',
ncol=3)
# Create a dict {picker: log name} for both the lines and the legend
picker_to_name = {}
for legline, name in zip(legend.get_lines(), legend_lines.values()):
legline.set_picker(10) # 10 pts tolerance
picker_to_name[legline] = name
for legline, name in zip(legend.get_texts(), legend_lines.values()):
legline.set_picker(10) # 10 pts tolerance
picker_to_name[legline] = name
# Increase top margin to fit legend
fig.canvas.draw()
legend_height = legend.get_window_extent().inverse_transformed(
fig.transFigure).height
plt.subplots_adjust(top=0.98-legend_height)
# Make legend interactive
def legend_clicked(event):
file_name = picker_to_name[event.artist]
for line in plotted_lines[file_name]:
line.set_visible(not line.get_visible())
fig.canvas.draw()
fig.canvas.mpl_connect('pick_event', legend_clicked)
plt.show()