-
Notifications
You must be signed in to change notification settings - Fork 507
/
pssh-box.py
executable file
·448 lines (372 loc) · 14.7 KB
/
pssh-box.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
#!/usr/bin/python3
# Copyright 2016 Google LLC. All rights reserved.
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd
"""A utility to parse and generate PSSH boxes."""
# This file itself is considered an invalid module name because of the dash in
# the filename: pssh-box.py
# pylint: disable=invalid-name
import argparse
import base64
import itertools
import os
import struct
import sys
def to_code_point(value):
"""
Return the unicode code point with `int` passthrough
"""
if isinstance(value, int):
return value
return ord(value)
_script_dir = os.path.dirname(os.path.realpath(__file__))
_proto_path = os.path.join(_script_dir, 'pssh-box-protos')
_widevine_proto_path = os.path.join(_proto_path, 'packager/media/base')
assert os.path.exists(_proto_path), (
'Failed to find proto, please run built/installed version. ' +
' e.g. build/packager/pssh-box.py')
sys.path.insert(0, _proto_path)
sys.path.insert(0, _widevine_proto_path)
# pylint: disable=wrong-import-position
import widevine_pssh_data_pb2
COMMON_SYSTEM_ID = base64.b16decode('1077EFECC0B24D02ACE33C1E52E2FB4B')
WIDEVINE_SYSTEM_ID = base64.b16decode('EDEF8BA979D64ACEA3C827DCD51D21ED')
PLAYREADY_SYSTEM_ID = base64.b16decode('9A04F07998404286AB92E65BE0885F95')
class BinaryReader(object):
"""A helper class used to read binary data from an binary string."""
def __init__(self, data, little_endian):
self.data = data
self.little_endian = little_endian
self.position = 0
def has_data(self):
"""Returns whether the reader has any data left to read."""
return self.position < len(self.data)
def read_bytes(self, count):
"""Reads the given number of bytes into an array."""
if len(self.data) < self.position + count:
raise Exception('Invalid PSSH box, not enough data')
ret = self.data[self.position:self.position+count]
self.position += count
return ret
def read_int(self, size):
"""Reads an integer of the given size (in bytes)."""
data = self.read_bytes(size)
ret = 0
for i in range(0, size):
if self.little_endian:
ret |= (to_code_point(data[i]) << (8 * i))
else:
ret |= (to_code_point(data[i]) << (8 * (size - i - 1)))
return ret
class Pssh(object):
"""Defines a PSSH box and related functions."""
def __init__(self, version, system_id, key_ids, pssh_data):
"""Parses a PSSH box from the given data.
Args:
version: The version number of the box
system_id: A binary string of the System ID
key_ids: An array of binary strings for the key IDs
pssh_data: A binary string of the PSSH data
"""
self.version = version
self.system_id = system_id
self.key_ids = key_ids or []
self.pssh_data = pssh_data or ''
def binary_string(self):
"""Converts the PSSH box to a binary string."""
ret = b'pssh' + _create_bin_int(self.version << 24)
ret += self.system_id
if self.version == 1:
ret += _create_bin_int(len(self.key_ids))
for key in self.key_ids:
ret += key
ret += _create_bin_int(len(self.pssh_data))
ret += self.pssh_data
return _create_bin_int(len(ret) + 4) + ret
def human_string(self):
"""Converts the PSSH box to a human readable string."""
system_name = ''
convert_data = None
if self.system_id == WIDEVINE_SYSTEM_ID:
system_name = 'Widevine'
convert_data = _parse_widevine_data
elif self.system_id == PLAYREADY_SYSTEM_ID:
system_name = 'PlayReady'
convert_data = _parse_playready_data
elif self.system_id == COMMON_SYSTEM_ID:
system_name = 'Common'
lines = [
'PSSH Box v%d' % self.version,
' System ID: %s %s' % (system_name, _create_uuid(self.system_id))
]
if self.version == 1:
lines.append(' Key IDs (%d):' % len(self.key_ids))
lines.extend([' ' + _create_uuid(key) for key in self.key_ids])
lines.append(' PSSH Data (size: %d):' % len(self.pssh_data))
if self.pssh_data:
if convert_data:
lines.append(' ' + system_name + ' Data:')
try:
extra = convert_data(self.pssh_data)
lines.extend([' ' + x for x in extra])
# pylint: disable=broad-except
except Exception as e:
lines.append(' ERROR: ' + str(e))
else:
lines.extend([
' Raw Data (base64):',
' ' + base64.b64encode(self.pssh_data)
])
return '\n'.join(lines)
def _split_list_on(elems, sep):
"""Splits the given list on the given separator."""
return [list(g) for k, g in itertools.groupby(elems, lambda x: x == sep)
if not k]
def _create_bin_int(value):
"""Creates a binary string as 4-byte array from the given integer."""
return struct.pack('>i', value)
def _create_uuid(data):
"""Creates a human readable UUID string from the given binary string."""
ret = base64.b16encode(data).decode().lower()
return (ret[:8] + '-' + ret[8:12] + '-' + ret[12:16] + '-' + ret[16:20] +
'-' + ret[20:])
def _generate_widevine_data(key_ids, content_id, provider, protection_scheme):
"""Generate widevine pssh data."""
wv = widevine_pssh_data_pb2.WidevinePsshData()
wv.key_id.extend(key_ids)
if provider:
wv.provider = provider
if content_id:
wv.content_id = content_id
# 'cenc' is the default, so omitted to save bytes.
if protection_scheme and protection_scheme != 'cenc':
wv.protection_scheme = struct.unpack('>L', protection_scheme.encode())[0]
return wv.SerializeToString()
def _parse_widevine_data(data):
"""Parses Widevine PSSH box from the given binary string."""
wv = widevine_pssh_data_pb2.WidevinePsshData()
wv.ParseFromString(data)
ret = []
if wv.key_id:
ret.append('Key IDs (%d):' % len(wv.key_id))
ret.extend([' ' + _create_uuid(x) for x in wv.key_id])
if wv.HasField('provider'):
ret.append('Provider: ' + wv.provider)
if wv.HasField('content_id'):
ret.append('Content ID: ' + base64.b16encode(wv.content_id).decode())
if wv.HasField('policy'):
ret.append('Policy: ' + wv.policy)
if wv.HasField('crypto_period_index'):
ret.append('Crypto Period Index: %d' % wv.crypto_period_index)
if wv.HasField('protection_scheme'):
protection_scheme = struct.pack('>L', wv.protection_scheme)
ret.append('Protection Scheme: %s' % protection_scheme)
return ret
def _parse_playready_data(data):
"""Parses PlayReady PSSH data from the given binary string."""
reader = BinaryReader(data, little_endian=True)
size = reader.read_int(4)
if size != len(data):
raise Exception('Length incorrect')
ret = []
count = reader.read_int(2)
while count > 0:
count -= 1
record_type = reader.read_int(2)
record_len = reader.read_int(2)
record_data = reader.read_bytes(record_len)
ret.append('Record (size %d):' % record_len)
if record_type == 1:
xml = record_data.decode('utf-16 LE')
ret.extend([
' Record Type: Rights Management Header (1)',
' Record XML:',
' ' + xml
])
elif record_type == 3:
ret.extend([
' Record Type: License Store (1)',
' License Data:',
' ' + base64.b64encode(record_data)
])
else:
raise Exception('Invalid record type %d' % record_type)
if reader.has_data():
raise Exception('Extra data after records')
return ret
def _parse_boxes(data):
"""Parses one or more PSSH boxes for the given binary data."""
reader = BinaryReader(data, little_endian=False)
boxes = []
while reader.has_data():
start = reader.position
size = reader.read_int(4)
box_type = reader.read_bytes(4)
if box_type != b'pssh':
raise Exception(
'Invalid box type 0x%s, not \'pssh\'' % box_type.encode('hex'))
version_and_flags = reader.read_int(4)
version = version_and_flags >> 24
if version > 1:
raise Exception('Invalid PSSH version %d' % version)
system_id = reader.read_bytes(16)
key_ids = []
if version == 1:
count = reader.read_int(4)
while count > 0:
key = reader.read_bytes(16)
key_ids.append(key)
count -= 1
pssh_data_size = reader.read_int(4)
pssh_data = reader.read_bytes(pssh_data_size)
if start + size != reader.position:
raise Exception('Box size does not match size of data')
pssh = Pssh(version, system_id, key_ids, pssh_data)
boxes.append(pssh)
return boxes
def _create_argument_parser():
"""Creates an argument parser."""
def hex_16_bytes(string):
if not string or len(string) != 32:
raise argparse.ArgumentTypeError(
'Must be a 32-character hex string, %d given' % len(string))
return base64.b16decode(string.upper())
def hex_bytes(string):
return base64.b16decode(string.upper())
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
usage='[--base64 | --hex | --human] options [-- options [-- ...]',
epilog="""\
This utility can be used to generate one or more PSSH boxes. This is done by
passing a system ID and either --key-id or --pssh-data. Multiple boxes can be
generated by separating boxes with --. Using --key-id will generate v1 pssh
boxes, if none are given, it will generate v0.
You can also import PSSH boxes using --from-base64 and --from-hex. These must
be valid PSSH boxes, but can be multiple concatenated together. These arguments
can appear anywhere in the string. If it appears 'inside' another definition,
it will appear before the generated one.
An alternative to --pssh-data is to generate Widevine PSSH data. This is only
valid with --widevine-system-id. Passing --content-id will make it generate
Widevine PSSH data instead. You can optionally add --provider and/or
--protection-scheme. It will generate a v0 PSSH box for compatibility
reasons.""")
formats = parser.add_mutually_exclusive_group()
formats.add_argument('--base64',
dest='format',
action='store_const',
const='base64',
help='Output base64 encoded')
formats.add_argument('--hex',
dest='format',
action='store_const',
const='hex',
help='Output hexadecimal encoded')
formats.add_argument('--human',
dest='format',
action='store_const',
const='human',
help='Output a human readable string')
inputs = parser.add_mutually_exclusive_group()
inputs.add_argument('--from-base64',
metavar='<base64-string>',
dest='input',
type=base64.b64decode,
help='Parse the given base64 encoded PSSH box')
inputs.add_argument('--from-hex',
metavar='<hex-string>',
dest='input',
type=hex_bytes,
help='Parse the given hexadecimal encoded PSSH box')
system_ids = parser.add_mutually_exclusive_group()
system_ids.add_argument('--system-id',
metavar='<hex-string>',
dest='system_id',
type=hex_16_bytes,
help='Sets the system ID')
system_ids.add_argument('--common-system-id',
dest='system_id',
action='store_const',
const=COMMON_SYSTEM_ID,
help='Use the Common system ID')
system_ids.add_argument('--widevine-system-id',
dest='system_id',
action='store_const',
const=WIDEVINE_SYSTEM_ID,
help='Use the Widevine system ID')
extra = parser.add_argument_group()
extra.add_argument('--key-id',
metavar='<hex-string>',
action='append',
type=hex_16_bytes,
help='Adds a key ID (can appear multiple times)')
extra.add_argument('--pssh-data',
metavar='<base64-string>',
type=base64.b64decode,
help='Sets the extra data')
extra.add_argument('--content-id',
metavar='<hex-string>',
type=hex_bytes,
help='Sets the content ID of the Widevine PSSH data')
extra.add_argument('--provider',
metavar='<string>',
help='Sets the provider of the Widevine PSSH data')
extra.add_argument('--protection-scheme',
choices=['cenc', 'cbcs', 'cens', 'cbc1'],
help='Set the protection scheme of the Widevine PSSH data')
return parser
def main(all_args):
boxes = []
output_format = None
parser = _create_argument_parser()
if not all_args:
parser.print_help()
sys.exit(1)
arg_groups = _split_list_on(all_args, '--')
for args in arg_groups:
ns = parser.parse_args(args)
if ns.format:
if output_format:
raise Exception('Can only specify one of: --base64, --hex, --human')
else:
output_format = ns.format
if ns.input:
boxes.extend(_parse_boxes(ns.input))
pssh_data = ns.pssh_data
if pssh_data and ns.content_id:
raise Exception('Cannot specify both --pssh-data and --content-id')
if ns.protection_scheme:
if ns.system_id != WIDEVINE_SYSTEM_ID:
raise Exception(
'--protection-scheme only valid with Widevine system ID')
if ns.content_id:
if ns.system_id != WIDEVINE_SYSTEM_ID:
raise Exception('--content-id only valid with Widevine system ID')
# Ignore if we have no data.
if not pssh_data and not ns.key_id and not ns.system_id:
continue
if not ns.system_id:
raise Exception('System ID is required')
if ns.system_id == WIDEVINE_SYSTEM_ID:
# Always generate version 0 for Widevine for backward compatibility.
version = 0
if not pssh_data:
if not ns.key_id and not ns.content_id:
raise Exception('Widevine system needs key-id or content-id or both')
pssh_data = _generate_widevine_data(ns.key_id, ns.content_id,
ns.provider, ns.protection_scheme)
else:
version = 1 if ns.key_id else 0
boxes.append(Pssh(version, ns.system_id, ns.key_id, pssh_data))
if output_format == 'human' or not output_format:
for box in boxes:
print(box.human_string())
else:
box_data = b''.join([x.binary_string() for x in boxes])
if output_format == 'hex':
print(base64.b16encode(box_data).decode())
else:
print(base64.b64encode(box_data).decode())
if __name__ == '__main__':
main(sys.argv[1:])