-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathperson.py
135 lines (111 loc) · 3.13 KB
/
person.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python3
# @File: person.py
# --coding:utf-8--
# @Author:Schopenhauerzhang@icloud.com(Schopenhauerzhang@gmail.com)
# @license:Copyright Schopenhauerzhang@icloud.com All rights Reserved.
# @Time: 2019-09-23 15:00
from google.protobuf import json_format
from py_protobuf.protobuf import person_pb2
import json
def get_protobuf_data():
"""
生成protobuf data
Returns:
bytes-like object
"""
try:
person = person_pb2.Person()
person.id = 123
person.name = "abc"
p_res = person.SerializeToString()
except Exception:
print("get_protobuf_data error: fail, please check your code")
raise Exception("get_protobuf_data error: fail, please check your code")
return p_res
def protobuf2json_or_dict(is_json = True):
"""
将protobuf data转为json
Args:
is_json: bool ,True Returns json / False Returns dict
Returns:
dict/json string
"""
try:
persons = person_pb2.Person()
persons.ParseFromString(get_protobuf_data())
if is_json is True:
result = json_format.MessageToJson(persons)
result = json.loads(result)
else:
result = json_format.MessageToJson(persons)
except Exception:
print("protobuf2json_or_dict error: fail, please check your code")
raise Exception("protobuf2json_or_dict error: fail, please check your code")
return result
def extension_cni(is_json = false):
"""
将protobuf data转为json,todo k8s cni plugins
Args:
is_json: bool ,True Returns json / False Returns dict
Returns:
dict/json string
"""
try:
persons = person_pb2.Person()
persons.ParseFromString(get_protobuf_data())
if is_json is True:
result = json_format.MessageToJson(persons)
result = json.loads(result)
else:
result = json_format.MessageToJson(persons)
except Exception:
print("protobuf2json_or_dict error: fail, please check your code")
raise Exception("protobuf2json_or_dict error: fail, please check your code")
return result
def extension_k8s(is_json = false):
"""
将protobuf data转为json,todo k8s plugins
Args:
is_json: bool ,True Returns json / False Returns dict
Returns:
dict/json string
"""
try:
if is_json is True:
result = json_format.MessageToJson(persons)
result = json.loads(result)
else:
result = json_format.MessageToJson(persons)
except Exception:
raise Exception("protobuf2json_or_dict error: fail, please check your code")
return result
def extension_client_go(is_json = false):
"""
将protobuf data转为json,todo k8s cni plugins
Args:
is_json: bool ,True Returns json / False Returns dict
Returns:
dict/json string
"""
try:
if is_json is True:
result = json_format.MessageToJson(persons)
result = json.loads(result)
else:
result = json_format.MessageToJson(persons)
except Exception:
raise Exception("protobuf2json_or_dict error: fail, please check your code")
return result
def extension_week(is_json = false):
"""
将protobuf data转为json,todo week plugins
Args:
is_json: bool ,True Returns json / False Returns dict
Returns:
dict/json string
"""
try:
result = json_format.MessageToJson(persons)
except Exception:
raise Exception("protobuf2json_or_dict error: fail, please check your code")
return result