forked from dakhnod/fzfs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flipper_api.py
151 lines (114 loc) · 4.7 KB
/
flipper_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#!/usr/bin/env python3
# pylint: disable=protected-access
# pylint: disable=no-member
# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
import threading
from flipperzero_protobuf_py.flipperzero_protobuf_compiled import flipper_pb2, storage_pb2
from flipperzero_protobuf_py.flipper_protobuf import ProtoFlipper
from flipperzero_protobuf_py.cli_helpers import *
class FlipperAPI():
def __init__(self, flipper_serial) -> None:
self.serial_port = flipper_serial
self.proto = None
self.flipper = flipper_serial
self.mutex=threading.Lock()
def connect(self):
with self.mutex:
self.proto = ProtoFlipper(self.flipper)
print("Ping result: ")
print_hex(self.proto.cmd_system_ping())
def _cmd_storage_list_directory(self, path):
cmd_data = storage_pb2.ListRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_list_request')
def _cmd_storage_stat(self, path):
cmd_data = storage_pb2.StatRequest()
cmd_data.path = path
return self.proto._cmd_send_and_read_answer(cmd_data, 'storage_stat_request')
def _cmd_storage_read(self, path):
cmd_data = storage_pb2.ReadRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_read_request')
def _cmd_storage_mkdir(self, path):
cmd_data = storage_pb2.MkdirRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_mkdir_request')
def _cmd_storage_rmdir(self, path):
cmd_data = storage_pb2.RmdirRequest()
cmd_data.path = path
self.proto._cmd_send(cmd_data, 'storage_rmdir_request')
def _cmd_storage_rename(self, old_path, new_path):
cmd_data = storage_pb2.RenameRequest()
cmd_data.old_path = old_path
cmd_data.new_path = new_path
self.proto._cmd_send(cmd_data, 'storage_rename_request')
def _cmd_storage_delete(self, path, recursive):
cmd_data = storage_pb2.DeleteRequest()
cmd_data.path = path
cmd_data.recursive = recursive
self.proto._cmd_send(cmd_data, 'storage_delete_request')
def _cmd_storage_write(self, path, data):
cmd_data = storage_pb2.WriteRequest()
cmd_data.path = path
cmd_data.file.data = data
self.proto._cmd_send(cmd_data, 'storage_write_request')
def check_response_status(self, response):
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
raise InvalidNameError()
def list_directory(self, path, additional_data):
with self.mutex:
self._cmd_storage_list_directory(path)
files = []
while True:
packet = self.proto._cmd_read_answer()
self.check_response_status(packet)
for file in packet.storage_list_response.file:
files.append({**{
'name': file.name,
'type': storage_pb2.File.FileType.Name(file.type)
}, **additional_data})
if not packet.has_next:
break
return files
def stat(self, path):
with self.mutex:
response = self._cmd_storage_stat(path)
if response.command_status == flipper_pb2.CommandStatus.ERROR_STORAGE_INVALID_NAME:
raise InvalidNameError()
response = response.storage_stat_response
return {'size': response.file.size}
def read_file_contents(self, path):
with self.mutex:
self._cmd_storage_read(path)
contents = []
while True:
packet = self.proto._cmd_read_answer()
print(packet)
self.check_response_status(packet)
contents.extend(packet.storage_read_response.file.data)
if not packet.has_next:
break
return {'data': contents}
def mkdir(self, path):
with self.mutex:
print(f'mkdir {path}')
self._cmd_storage_mkdir(path)
def rmdir(self, path):
with self.mutex:
print(f'rmdir {path}')
self._cmd_storage_dirdir(path)
def rename(self, old_path, new_path):
with self.mutex:
self._cmd_storage_rename(old_path, new_path)
def delete(self, path, recursive):
with self.mutex:
self._cmd_storage_delete(path, recursive)
def write(self, path, data):
with self.mutex:
self._cmd_storage_write(path, data)
def close(self):
with self.mutex:
self.proto.cmd_flipper_stop_session()
class InvalidNameError(RuntimeError):
pass