-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
sonic_yang.py
664 lines (590 loc) · 22.1 KB
/
sonic_yang.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
import yang as ly
import syslog
from json import dump
from glob import glob
from sonic_yang_ext import SonicYangExtMixin, SonicYangException
"""
Yang schema and data tree python APIs based on libyang python
Here, sonic_yang_ext_mixin extends funtionality of sonic_yang,
i.e. it is mixin not parent class.
"""
class SonicYang(SonicYangExtMixin):
def __init__(self, yang_dir, debug=False):
self.yang_dir = yang_dir
self.ctx = None
self.module = None
self.root = None
# logging vars
self.SYSLOG_IDENTIFIER = "sonic_yang"
self.DEBUG = debug
# yang model files, need this map it to module
self.yangFiles = list()
# map from TABLE in config DB to container and module
self.confDbYangMap = dict()
# JSON format of yang model [similar to pyang conversion]
self.yJson = list()
# config DB json input, will be cropped as yang models
self.jIn = dict()
# YANG JSON, this is traslated from config DB json
self.xlateJson = dict()
# reverse translation from yang JSON, == config db json
self.revXlateJson = dict()
# below dict store the input config tables which have no YANG models
self.tablesWithOutYang = dict()
# below dict will store preProcessed yang objects, which may be needed by
# all yang modules, such as grouping.
self.preProcessedYang = dict()
try:
self.ctx = ly.Context(yang_dir)
except Exception as e:
self.fail(e)
return
def __del__(self):
pass
def sysLog(self, debug=syslog.LOG_INFO, msg=None, doPrint=False):
# log debug only if enabled
if self.DEBUG == False and debug == syslog.LOG_DEBUG:
return
if doPrint:
print("{}({}):{}".format(self.SYSLOG_IDENTIFIER, debug, msg))
syslog.openlog(self.SYSLOG_IDENTIFIER)
syslog.syslog(debug, msg)
syslog.closelog()
return
def fail(self, e):
print(e)
raise e
"""
load_schema_module(): load a Yang model file
input: yang_file - full path of a Yang model file
returns: Exception if error
"""
def _load_schema_module(self, yang_file):
try:
return self.ctx.parse_module_path(yang_file, ly.LYS_IN_YANG)
except Exception as e:
print("Failed to load yang module file: " + yang_file)
self.fail(e)
"""
load_schema_module_list(): load all Yang model files in the list
input: yang_files - a list of Yang model file full path
returns: Exception if error
"""
def _load_schema_module_list(self, yang_files):
for file in yang_files:
try:
self._load_schema_module(file)
except Exception as e:
self.fail(e)
"""
load_schema_modules(): load all Yang model files in the directory
input: yang_dir - the directory of the yang model files to be loaded
returns: Exception if error
"""
def _load_schema_modules(self, yang_dir):
py = glob(yang_dir+"/*.yang")
for file in py:
try:
self._load_schema_module(file)
except Exception as e:
self.fail(e)
"""
load_schema_modules_ctx(): load all Yang model files in the directory to context: ctx
input: yang_dir, context
returns: Exception if error, returrns context object if no error
"""
def _load_schema_modules_ctx(self, yang_dir=None):
if not yang_dir:
yang_dir = self.yang_dir
ctx = ly.Context(yang_dir)
py = glob(yang_dir+"/*.yang")
for file in py:
try:
ctx.parse_module_path(str(file), ly.LYS_IN_YANG)
except Exception as e:
print("Failed to parse yang module file: " + file)
self.fail(e)
return ctx
"""
load_data_file(): load a Yang data json file
input: data_file - the full path of the yang json data file to be loaded
returns: Exception if error
"""
def _load_data_file(self, data_file):
try:
data_node = self.ctx.parse_data_path(data_file, ly.LYD_JSON, ly.LYD_OPT_CONFIG | ly.LYD_OPT_STRICT)
except Exception as e:
print("Failed to load data file: " + str(data_file))
self.fail(e)
else:
self.root = data_node
"""
get module name from xpath
input: path
returns: module name
"""
def _get_module_name(self, schema_xpath):
module_name = schema_xpath.split(':')[0].strip('/')
return module_name
"""
get_module(): get module object from Yang module name
input: yang module name
returns: Schema_Node object
"""
def _get_module(self, module_name):
mod = self.ctx.get_module(module_name)
return mod
"""
load_data_model(): load both Yang module fileis and data json files
input: yang directory, list of yang files and list of data files (full path)
returns: returns (context, root) if no error, or Exception if failed
"""
def _load_data_model(self, yang_dir, yang_files, data_files, output=None):
if (self.ctx is None):
self.ctx = ly.Context(yang_dir)
try:
self._load_schema_module_list(yang_files)
if len(data_files) == 0:
return (self.ctx, self.root)
self._load_data_file(data_files[0])
for i in range(1, len(data_files)):
self._merge_data(data_files[i])
except Exception as e:
print("Failed to load data files")
self.fail(e)
return
if output is not None:
self._print_data_mem(output)
return (self.ctx, self.root)
"""
print_data_mem(): print the data tree
input: option: "JSON" or "XML"
"""
def _print_data_mem(self, option):
if (option == "JSON"):
mem = self.root.print_mem(ly.LYD_JSON, ly.LYP_WITHSIBLINGS | ly.LYP_FORMAT)
else:
mem = self.root.print_mem(ly.LYD_XML, ly.LYP_WITHSIBLINGS | ly.LYP_FORMAT)
return mem
"""
save_data_file_json(): save the data tree in memory into json file
input: outfile - full path of the file to save the data tree to
"""
def _save_data_file_json(self, outfile):
mem = self.root.print_mem(ly.LYD_JSON, ly.LYP_FORMAT)
with open(outfile, 'w') as out:
dump(mem, out, indent=4)
"""
get_module_tree(): get yang module tree in JSON or XMAL format
input: module name
returns: JSON or XML format of the input yang module schema tree
"""
def _get_module_tree(self, module_name, format):
result = None
try:
module = self.ctx.get_module(str(module_name))
except Exception as e:
print("Cound not get module: " + str(module_name))
self.fail(e)
else:
if (module is not None):
if (format == "XML"):
#libyang bug with format
result = module.print_mem(ly.LYD_JSON, ly.LYP_FORMAT)
else:
result = module.print_mem(ly.LYD_XML, ly.LYP_FORMAT)
return result
"""
validate_data(): validate data tree
input:
node: root of the data tree
ctx: context
returns: Exception if failed
"""
def _validate_data(self, node=None, ctx=None):
if not node:
node = self.root
if not ctx:
ctx = self.ctx
try:
node.validate(ly.LYD_OPT_CONFIG, ctx)
except Exception as e:
self.fail(e)
"""
validate_data_tree(): validate the data tree. (Public)
returns: Exception if failed
"""
def validate_data_tree(self):
try:
self._validate_data(self.root, self.ctx)
except Exception as e:
print("Failed to validate data tree")
raise SonicYangException("Failed to validate data tree\n{}".\
format(str(e)))
"""
find_parent_data_node(): find the parent node object
input: data_xpath - xpath of the data node
returns: parent node
"""
def _find_parent_data_node(self, data_xpath):
if (self.root is None):
print("data not loaded")
return None
try:
data_node = self._find_data_node(data_xpath)
except Exception as e:
print("Failed to find data node from xpath: " + str(data_xpath))
self.fail(e)
else:
if data_node is not None:
return data_node.parent()
return None
"""
get_parent_data_xpath(): find the parent data node's xpath
input: data_xpath - xpathof the data node
returns: - xpath of parent data node
- Exception if error
"""
def _get_parent_data_xpath(self, data_xpath):
path=""
try:
data_node = self._find_parent_data_node(data_xpath)
except Exception as e:
print("Failed to find parent node from xpath: " + str(data_xpath))
self.fail(e)
else:
if data_node is not None:
path = data_node.path()
return path
"""
new_data_node(): create a new data node in the data tree
input:
xpath: xpath of the new node
value: value of the new node
returns: new Data_Node object if success, Exception if falied
"""
def _new_data_node(self, xpath, value):
val = str(value)
try:
data_node = self.root.new_path(self.ctx, xpath, val, 0, 0)
except Exception as e:
print("Failed to add data node for path: " + str(xpath))
self.fail(e)
else:
return data_node
"""
find_data_node(): find the data node from xpath
input: data_xpath: xpath of the data node
returns - Data_Node object if found
- None if not exist
- Exception if there is error
"""
def _find_data_node(self, data_xpath):
try:
set = self.root.find_path(data_xpath)
except Exception as e:
print("Failed to find data node from xpath: " + str(data_xpath))
self.fail(e)
else:
if set is not None:
for data_node in set.data():
if (data_xpath == data_node.path()):
return data_node
return None
"""
find_schema_node(): find the schema node from schema xpath
example schema xpath:
"/sonic-port:sonic-port/sonic-port:PORT/sonic-port:PORT_LIST/sonic-port:port_name"
input: xpath of the node
returns: Schema_Node oject or None if not found
"""
def _find_schema_node(self, schema_xpath):
try:
schema_set = self.ctx.find_path(schema_xpath)
for schema_node in schema_set.schema():
if (schema_xpath == schema_node.path()):
return schema_node
except Exception as e:
self.fail(e)
return None
else:
for schema_node in schema_set.schema():
if schema_xapth == schema_node.path():
return schema_node
return None
"""
find_data_node_schema_xpath(): find the xpath of the schema node from data xpath
data xpath example:
"/sonic-port:sonic-port/PORT/PORT_LIST[port_name='Ethernet0']/port_name"
input: data_xpath - xpath of the data node
returns: - xpath of the schema node if success
- Exception if error
"""
def _find_data_node_schema_xpath(self, data_xpath):
path = ""
try:
set = self.root.find_path(data_xpath)
except Exception as e:
self.fail(e)
else:
for data_node in set.data():
if data_xpath == data_node.path():
return data_node.schema().path()
return path
"""
add_node(): add a node to Yang schema or data tree
input: xpath and value of the node to be added
returns: Exception if failed
"""
def _add_data_node(self, data_xpath, value):
try:
self._new_data_node(data_xpath, value)
#check if the node added to the data tree
self._find_data_node(data_xpath)
except Exception as e:
print("add_node(): Failed to add data node for xpath: " + str(data_xpath))
self.fail(e)
"""
merge_data(): merge a data file to the existing data tree
input: yang model directory and full path of the data json file to be merged
returns: Exception if failed
"""
def _merge_data(self, data_file, yang_dir=None):
#load all yang models to ctx
if not yang_dir:
yang_dir = self.yang_dir
try:
ctx = self._load_schema_modules_ctx(yang_dir)
#source data node
source_node = ctx.parse_data_path(str(data_file), ly.LYD_JSON, ly.LYD_OPT_CONFIG | ly.LYD_OPT_STRICT)
#merge
self.root.merge(source_node, 0)
except Exception as e:
self.fail(e)
"""
_deleteNode(): delete a node from the schema/data tree, internal function
input: xpath of the schema/data node
returns: True - success False - failed
"""
def _deleteNode(self, xpath=None, node=None):
if node is None:
node = self._find_data_node(xpath)
if (node):
node.unlink()
dnode = self._find_data_node(xpath)
if (dnode is None):
#deleted node not found
return True
else:
print('Could not delete Node')
return False
else:
print("failed to find node, xpath: " + xpath)
return False
"""
find_data_node_value(): find the value of a node from the data tree
input: data_xpath of the data node
returns: value string of the node
"""
def _find_data_node_value(self, data_xpath):
output = ""
try:
data_node = self._find_data_node(data_xpath)
except Exception as e:
print("find_data_node_value(): Failed to find data node from xpath: {}".format(data_xpath))
self.fail(e)
else:
if (data_node is not None):
subtype = data_node.subtype()
if (subtype is not None):
value = subtype.value_str()
return value
return output
"""
set the value of a node in the data tree
input: xpath of the data node
returns: Exception if failed
"""
def _set_data_node_value(self, data_xpath, value):
try:
self.root.new_path(self.ctx, data_xpath, str(value), ly.LYD_ANYDATA_STRING, ly.LYD_PATH_OPT_UPDATE)
except Exception as e:
print("set data node value failed for xpath: " + str(data_xpath))
self.fail(e)
"""
find_data_nodes(): find the set of nodes for the xpath
input: xpath of the data node
returns: list of xpath of the dataset
"""
def _find_data_nodes(self, data_xpath):
list = []
node = self.root.child()
try:
node_set = node.find_path(data_xpath);
except Exception as e:
self.fail(e)
else:
if node_set is None:
raise Exception('data node not found')
for data_set in node_set.data():
data_set.schema()
list.append(data_set.path())
return list
"""
find_schema_dependencies(): find the schema dependencies from schema xpath
input: schema_xpath of the schema node
returns: - list of xpath of the dependencies
- Exception if schema node not found
"""
def _find_schema_dependencies(self, schema_xpath):
ref_list = []
try:
schema_node = self._find_schema_node(schema_xpath)
except Exception as e:
print("Cound not find the schema node from xpath: " + str(schema_xpath))
self.fail(e)
return ref_list
schema_node = ly.Schema_Node_Leaf(schema_node)
backlinks = schema_node.backlinks()
if backlinks.number() > 0:
for link in backlinks.schema():
print("backlink schema: {}".format(link.path()))
ref_list.append(link.path())
return ref_list
"""
find_data_dependencies(): find the data dependencies from data xpath
input: data_xpath - xpath of data node. (Public)
returns: - list of xpath
- Exception if error
"""
def find_data_dependencies(self, data_xpath):
ref_list = []
node = self.root
try:
data_node = self._find_data_node(data_xpath)
except Exception as e:
print("find_data_dependencies(): Failed to find data node from xpath: {}".format(data_xapth))
return ref_list
try:
value = str(self._find_data_node_value(data_xpath))
schema_node = ly.Schema_Node_Leaf(data_node.schema())
backlinks = schema_node.backlinks()
if backlinks is not None and backlinks.number() > 0:
for link in backlinks.schema():
node_set = node.find_path(link.path())
for data_set in node_set.data():
data_set.schema()
casted = data_set.subtype()
if value == casted.value_str():
ref_list.append(data_set.path())
except Exception as e:
print('Failed to find node or dependencies for {}'.format(data_xpath))
raise SonicYangException("Failed to find node or dependencies for \
{}\n{}".format(data_xpath, str(e)))
return ref_list
"""
get_module_prefix: get the prefix of a Yang module
input: name of the Yang module
output: prefix of the Yang module
"""
def _get_module_prefix(self, module_name):
prefix = ""
try:
module = self._get_module(module_name)
except Exception as e:
self.fail(e)
return prefix
else:
return module.prefix()
"""
str_to_type: map string to type of node
input: string
output: type
"""
def _str_to_type(self, type_str):
mapped_type = {
"LY_TYPE_DER":ly.LY_TYPE_DER,
"LY_TYPE_BINARY":ly.LY_TYPE_BINARY,
"LY_TYPE_BITS":ly.LY_TYPE_BITS,
"LY_TYPE_BOOL":ly.LY_TYPE_BOOL,
"LY_TYPE_DEC64":ly.LY_TYPE_DEC64,
"LY_TYPE_EMPTY":ly.LY_TYPE_EMPTY,
"LY_TYPE_ENUM":ly.LY_TYPE_ENUM,
"LY_TYPE_IDENT":ly.LY_TYPE_IDENT,
"LY_TYPE_INST":ly.LY_TYPE_INST,
"LY_TYPE_LEAFREF":ly.LY_TYPE_LEAFREF,
"LY_TYPE_STRING":ly.LY_TYPE_STRING,
"LY_TYPE_UNION":ly.LY_TYPE_UNION,
"LY_TYPE_INT8":ly.LY_TYPE_INT8,
"LY_TYPE_UINT8":ly.LY_TYPE_UINT8,
"LY_TYPE_INT16":ly.LY_TYPE_INT16,
"LY_TYPE_UINT16":ly.LY_TYPE_UINT16,
"LY_TYPE_INT32":ly.LY_TYPE_INT32,
"LY_TYPE_UINT32":ly.LY_TYPE_UINT32,
"LY_TYPE_INT64":ly.LY_TYPE_INT64,
"LY_TYPE_UINT64":ly.LY_TYPE_UINT64,
"LY_TYPE_UNKNOWN":ly.LY_TYPE_UNKNOWN
}
if type_str not in mapped_type:
return ly.LY_TYPE_UNKNOWN
return mapped_type[type_str]
def _get_data_type(self, schema_xpath):
try:
schema_node = self._find_schema_node(schema_xpath)
except Exception as e:
print("get_data_type(): Failed to find schema node from xpath: {}".format(schema_xpath))
self.fail(e)
return None
if (schema_node is not None):
return schema_node.subtype().type().base()
return ly.LY_TYPE_UNKNOWN
"""
get_leafref_type: find the type of node that leafref references to
input: data_xpath - xpath of a data node
output: type of the node this leafref references to
"""
def _get_leafref_type(self, data_xpath):
data_node = self._find_data_node(data_xpath)
if (data_node is not None):
subtype = data_node.subtype()
if (subtype is not None):
if data_node.schema().subtype().type().base() != ly.LY_TYPE_LEAFREF:
print("get_leafref_type() node type for data xpath: {} is not LEAFREF".format(data_xpath))
return ly.LY_TYPE_UNKNOWN
else:
return subtype.value_type()
return ly.LY_TYPE_UNKNOWN
"""
get_leafref_path(): find the leafref path
input: schema_xpath - xpath of a schema node
output: path value of the leafref node
"""
def _get_leafref_path(self, schema_xpath):
schema_node = self._find_schema_node(schema_xpath)
if (schema_node is not None):
subtype = schema_node.subtype()
if (subtype is not None):
if subtype.type().base() != ly.LY_TYPE_LEAFREF:
return None
else:
return subtype.type().info().lref().path()
return None
"""
get_leafref_type_schema: find the type of node that leafref references to
input: schema_xpath - xpath of a schema node
output: type of the node this leafref references to
"""
def _get_leafref_type_schema(self, schema_xpath):
schema_node = self._find_schema_node(schema_xpath)
if (schema_node is not None):
subtype = schema_node.subtype()
if (subtype is not None):
if subtype.type().base() != ly.LY_TYPE_LEAFREF:
return None
else:
subtype.type().info().lref().path()
target = subtype.type().info().lref().target()
target_path = target.path()
target_type = self._get_data_type(target_path)
return target_type
return None