-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy path__init__.py
816 lines (702 loc) · 30.2 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
"""Python printer library for PrusaConnect.
Copyright (C) 2023 PrusaResearch
"""
import configparser
import os
import re
from logging import getLogger
from queue import Empty, Queue
from time import sleep, time
from typing import Any, Callable, Dict, List, Optional
from gcode_metadata import get_metadata
from requests import RequestException, Response, Session # type: ignore
# pylint: disable=redefined-builtin
from requests.exceptions import ConnectionError # type: ignore
from . import const, errors
from .camera_controller import CameraController
from .clock import ClockWatcher
from .command import Command, CommandFailed
from .conditions import API, HTTP, INTERNET, TOKEN, CondState
from .download import DownloadMgr, Transfer
from .files import Filesystem, InotifyHandler, delete
from .models import (
CameraRegister,
Event,
LoopObject,
Register,
Sheet,
Telemetry,
)
from .util import RetryingSession, get_timestamp
__version__ = "0.7.1"
__date__ = "9 Oct 2023" # version date
__copyright__ = "(c) 2023 Prusa 3D"
__author_name__ = "Prusa Link Developers"
__author_email__ = "link@prusa3d.cz"
__author__ = f"{__author_name__} <{__author_email__}>"
__description__ = "Python printer library for PrusaConnect"
__credits__ = "Ondřej Tůma, Martin Užák, Michal Zoubek, Tomáš Jozífek"
__url__ = "https://github.com/prusa3d/Prusa-Connect-SDK-Printer"
# pylint: disable=invalid-name
# pylint: disable=too-few-public-methods
# pylint: disable=too-many-arguments
# pylint: disable=too-many-instance-attributes
# NOTE: Temporary for pylint with python3.9
# pylint: disable=unsubscriptable-object
log = getLogger("connect-printer")
re_conn_reason = re.compile(r"] (.*)")
__all__ = ["Printer"]
CommandArgs = Optional[List[Any]]
def default_register_handler(token):
"""Default register handler.
It blocks communication with Connect in loop method!
"""
assert token
class Printer:
"""Printer representation object.
To process inotify_handler, please create your own thread,
calling printer.inotify_handler() in a loop.
"""
# pylint: disable=too-many-public-methods
queue: "Queue[LoopObject]"
server: Optional[str] = None
token: Optional[str] = None
conn: Session
NOT_INITIALISED_MSG = "Printer has not been initialized properly"
def __init__(self,
type_: Optional[const.PrinterType] = None,
sn: Optional[str] = None,
fingerprint: Optional[str] = None,
max_retries: int = 1):
self.__type = type_
self.__sn = sn
self.__fingerprint = fingerprint
self.firmware = None
self.network_info = {
"lan_mac": None,
"lan_ipv4": None,
"lan_ipv6": None,
"wifi_mac": None,
"wifi_ipv4": None,
"wifi_ipv6": None,
"wifi_ssid": None,
"hostname": None,
"username": None,
"digest": None,
}
self.api_key: Optional[str] = None
self.code: Optional[str] = None
self.__ready: bool = False
self.__state: const.State = const.State.BUSY
self.job_id: Optional[int] = None
self.mbl: Optional[List[float]] = None
self.sheet_settings: Optional[List[Sheet]] = None
self.active_sheet: Optional[int] = None # index
if max_retries > 1:
self.conn = RetryingSession(max_retries=max_retries)
else:
self.conn = Session()
self.queue = Queue()
self.command = Command(self.event_cb)
self.set_handler(const.Command.SEND_INFO, self.send_info)
self.set_handler(const.Command.SEND_FILE_INFO, self.get_file_info)
self.set_handler(const.Command.CREATE_FOLDER, self.create_folder)
self.set_handler(const.Command.CREATE_DIRECTORY, self.create_folder)
self.set_handler(const.Command.DELETE_FILE, self.delete_file)
self.set_handler(const.Command.DELETE_FOLDER, self.delete_folder)
self.set_handler(const.Command.DELETE_DIRECTORY, self.delete_folder)
self.set_handler(const.Command.START_URL_DOWNLOAD,
self.start_url_download)
self.set_handler(const.Command.START_CONNECT_DOWNLOAD,
self.start_connect_download)
self.set_handler(const.Command.STOP_TRANSFER, self.transfer_stop)
self.set_handler(const.Command.SEND_TRANSFER_INFO, self.transfer_info)
self.set_handler(const.Command.SET_PRINTER_READY,
self.set_printer_ready)
self.set_handler(const.Command.CANCEL_PRINTER_READY,
self.cancel_printer_ready)
self.fs = Filesystem(sep=os.sep, event_cb=self.event_cb)
self.inotify_handler = InotifyHandler(self.fs)
# Handler blocks communication with Connect in loop method!
self.register_handler = default_register_handler
self.__printed_file_cb = lambda: None
self.download_finished_cb = lambda transfer: None # noaq: ARG005
self.clock_watcher = ClockWatcher()
if self.token and not self.is_initialised():
log.warning(self.NOT_INITIALISED_MSG)
self.transfer = Transfer()
self.download_mgr = DownloadMgr(self.fs, self.transfer,
self.get_connection_details,
self.event_cb, self.__printed_file_cb,
self.download_finished_cb)
self.camera_controller = CameraController(self.conn, self.server,
self.send_cb)
self.__running_loop = False
@staticmethod
def connect_url(host: str, tls: bool, port: int = 0):
"""Format url from settings value.
>>> Printer.connect_url('connect', True)
'https://connect'
>>> Printer.connect_url('connect', False)
'http://connect'
>>> Printer.connect_url('connect', False, 8000)
'http://connect:8000'
"""
protocol = 'https' if tls else 'http'
if port:
return f"{protocol}://{host}:{port}"
return f"{protocol}://{host}"
@property
def printed_file_cb(self):
"""Returns path of currently printed file"""
return self.__printed_file_cb
@printed_file_cb.setter
def printed_file_cb(self, value):
"""Sets path of currently printed file"""
self.__printed_file_cb = value
self.download_mgr.printed_file_cb = value
@property
def ready(self):
"""Returns ready flag.
Ready flag can be set with set_state method. It is additional
flag for IDLE state, which has info about user confirmation
*ready to print*.
"""
return self.__ready
@property
def state(self):
"""Returns printer state."""
return self.__state
@property
def fingerprint(self):
"""Returns printer fingerprint."""
return self.__fingerprint
@fingerprint.setter
def fingerprint(self, value):
"""Set fingerprint if is not set."""
if self.__fingerprint is not None:
raise RuntimeError("Fingerprint is already set.")
self.__fingerprint = value
@property
def sn(self):
"""Returns printer serial number"""
return self.__sn
@sn.setter
def sn(self, value):
"""Set serial number if is not set."""
if self.__sn is not None:
raise RuntimeError("Serial number is already set.")
self.__sn = value
@property
def type(self):
"""Returns printer type"""
return self.__type
@type.setter
def type(self, value):
"""Set the printer type if is not set."""
if self.__type is not None:
raise RuntimeError("Printer type is already set.")
self.__type = value
def is_initialised(self):
"""Returns True if the printer is initialised"""
initialised = bool(self.__sn and self.__fingerprint
and self.__type is not None)
if not initialised:
errors.API.ok = False
API.state = CondState.NOK
return initialised
def make_headers(self, timestamp: Optional[float] = None) -> dict:
"""Returns request headers from connection variables."""
timestamp = get_timestamp(timestamp)
headers = {
"Fingerprint": self.fingerprint,
"Timestamp": str(timestamp),
}
if self.token:
headers['Token'] = self.token
if self.clock_watcher.clock_adjusted():
log.debug("Clock adjustment detected. Resetting watcher")
headers['Clock-Adjusted'] = "1"
self.clock_watcher.reset()
return headers
def set_state(self,
state: const.State,
source: const.Source,
ready: Optional[bool] = None,
**kwargs):
"""Set printer state and push event about that to queue.
:source: the initiator of printer state
:ready: If state is PRINTING, ready argument is ignored,
and flag is set to False.
"""
if state == const.State.PRINTING:
self.__ready = False
elif ready is not None:
self.__ready = ready
self.__state = state
self.event_cb(const.Event.STATE_CHANGED, source, state=state, **kwargs)
def event_cb(self,
event: const.Event,
source: const.Source,
timestamp: Optional[float] = None,
command_id: Optional[int] = None,
**kwargs) -> None:
"""Create event and push it to queue."""
if not self.token:
log.debug("Skipping event, no token: %s", event.value)
return
if self.job_id:
kwargs['job_id'] = self.job_id
if self.transfer.in_progress and self.transfer.start_ts:
kwargs['transfer_id'] = self.transfer.transfer_id
if 'state' not in kwargs:
kwargs['state'] = self.state
event_ = Event(event, source, timestamp, command_id, **kwargs)
log.debug("Putting event to queue: %s", event_)
if not self.is_initialised():
log.warning("Printer fingerprint and/or SN is not set")
self.queue.put(event_)
def telemetry(self,
state: Optional[const.State] = None,
timestamp: Optional[float] = None,
**kwargs) -> None:
"""Create telemetry end push it to queue."""
if state:
log.warning("State argument is deprecated. Use set_state method.")
if not self.token:
log.debug("Skipping telemetry, no token.")
return
if self.command.state is not None:
kwargs['command_id'] = self.command.command_id
if self.job_id:
kwargs['job_id'] = self.job_id
if self.transfer.in_progress and self.transfer.start_ts:
kwargs['transfer_id'] = self.transfer.transfer_id
kwargs['transfer_progress'] = self.transfer.progress
kwargs['transfer_time_remaining'] = self.transfer.time_remaining()
kwargs['transfer_transferred'] = self.transfer.transferred
kwargs['time_transferring'] = self.transfer.time_transferring()
if self.is_initialised():
telemetry = Telemetry(self.__state, timestamp, **kwargs)
else:
telemetry = Telemetry(self.__state, timestamp)
log.warning("Printer fingerprint and/or SN is not set")
self.queue.put(telemetry)
def send_cb(self, loop_object: LoopObject):
"""Enqueues any supported loop object for sending,
without modifying it"""
self.queue.put(loop_object)
def connection_from_config(self, path: str):
"""Loads connection details from config."""
if not os.path.exists(path):
raise FileNotFoundError(f"ini file: `{path}` doesn't exist")
config = configparser.ConfigParser()
config.read(path)
host = config['service::connect']['hostname']
tls = config['service::connect'].getboolean('tls')
port = config['service::connect'].getint('port', fallback=0)
server = Printer.connect_url(host, tls, port)
token = config['service::connect']['token']
self.set_connection(server, token)
def set_connection(self, server, token):
"""Sets the connection details"""
self.server = server
self.token = token
self.camera_controller.server = server
errors.TOKEN.ok = True
TOKEN.state = CondState.OK
def get_connection_details(self):
"""Returns currently set server and headers"""
return self.server, self.make_headers()
def get_info(self) -> Dict[str, Any]:
"""Returns kwargs for Command.finish method as reaction
to SEND_INFO."""
# pylint: disable=unused-argument
if self.__type is not None:
type_, ver, sub = self.__type.value
else:
type_, ver, sub = (None, None, None)
return {
"source": const.Source.CONNECT,
"event": const.Event.INFO,
"state": self.__state,
"type": type_,
"version": ver,
"subversion": sub,
"firmware": self.firmware,
"sdk": __version__,
"network_info": self.network_info,
"api_key": self.api_key,
"files": self.fs.to_dict_legacy(),
"sn": self.sn,
"fingerprint": self.fingerprint,
"mbl": self.mbl,
"sheet_settings": self.sheet_settings,
"active_sheet": self.active_sheet,
}
def send_info(self, caller: Command) -> Dict[str, Any]:
"""Accept command arguments and adapt the call for the getter"""
# pylint: disable=unused-argument
return self.get_info()
def start_url_download(self, caller: Command) -> Dict[str, Any]:
"""Download an URL specified by url, to_select and to_print flags
in `caller`"""
if not caller.kwargs:
raise ValueError(
f"{const.Command.START_URL_DOWNLOAD} requires kwargs")
try:
retval = self.download_mgr.start(
const.TransferType.FROM_WEB,
caller.kwargs["path"],
caller.kwargs["url"],
to_print=caller.kwargs.get("printing", False),
to_select=caller.kwargs.get("selecting", False),
start_cmd_id=caller.command_id)
retval['source'] = const.Source.CONNECT
return retval
except KeyError as err:
raise ValueError(f"{const.Command.START_URL_DOWNLOAD} requires "
f"kwarg {err}.") from None
def start_connect_download(self, caller: Command) -> Dict[str, Any]:
"""Download a gcode from Connect, compose an URL using
Connect config"""
if not caller.kwargs:
raise ValueError(
f"{const.Command.START_CONNECT_DOWNLOAD} requires kwargs")
if not self.server:
raise RuntimeError("Printer.server must be set!")
try:
uri = "/p/teams/{team_id}/files/{hash}/raw".format(**caller.kwargs)
retval = self.download_mgr.start(
const.TransferType.FROM_CONNECT,
caller.kwargs["path"],
self.server + uri,
to_print=caller.kwargs.get("printing", False),
to_select=caller.kwargs.get("selecting", False),
start_cmd_id=caller.command_id,
hash_=caller.kwargs["hash"],
team_id=caller.kwargs["team_id"])
retval['source'] = const.Source.CONNECT
return retval
except KeyError as err:
raise ValueError(
f"{const.Command.START_CONNECT_DOWNLOAD} requires "
f"kwarg {err}.") from None
def transfer_stop(self, caller: Command) -> Dict[str, Any]:
"""Stop current transfer, if any"""
# pylint: disable=unused-argument
transfer_id = (caller.kwargs or {}).get("transfer_id")
if transfer_id and transfer_id != self.transfer.transfer_id:
raise RuntimeError("Wrong transfer_id")
self.transfer.stop()
return {"source": const.Source.CONNECT}
def transfer_info(self, caller: Command) -> Dict[str, Any]:
"""Provide info of the running transfer"""
kwargs = caller.kwargs or {}
transfer_id = kwargs.get('transfer_id')
if transfer_id and transfer_id != self.transfer.transfer_id:
raise CommandFailed("Not current transfer.")
info = self.download_mgr.info()
info['source'] = const.Source.CONNECT
info['event'] = const.Event.TRANSFER_INFO
return info
def set_printer_ready(self, caller: Command) -> Dict[str, Any]:
"""Set READY state"""
# pylint: disable=unused-argument
self.set_state(const.State.READY, const.Source.CONNECT, ready=True)
return {'source': const.Source.CONNECT}
def cancel_printer_ready(self, caller: Command) -> Dict[str, Any]:
"""Cancel READY state and switch printer back to IDLE"""
# pylint: disable=unused-argument
if self.ready:
self.set_state(const.State.IDLE, const.Source.CONNECT, ready=False)
return {'source': const.Source.CONNECT}
raise ValueError("Can't cancel, printer isn't ready")
def get_file_info(self, caller: Command) -> Dict[str, Any]:
"""Returns file info for a given file, if it exists."""
# pylint: disable=unused-argument
if not caller.kwargs or "path" not in caller.kwargs:
raise ValueError("SEND_FILE_INFO requires kwargs")
path = caller.kwargs["path"]
node = self.fs.get(path)
if node is None:
raise ValueError(f"File does not exist: {path}")
if node.is_dir:
raise ValueError("FILE_INFO doesn't work for folders")
info = {
"source": const.Source.CONNECT,
"event": const.Event.FILE_INFO,
"path": path,
}
try:
path_ = os.path.split(self.fs.get_os_path(path))
if not path_[1].startswith("."):
meta = get_metadata(self.fs.get_os_path(path))
info.update(node.attrs)
info.update(meta.data)
# include the biggest thumbnail, if available
if meta.thumbnails:
biggest = b""
for _, data in meta.thumbnails.items():
if len(data) > len(biggest):
biggest = data
info['preview'] = biggest.decode()
except FileNotFoundError:
log.debug("File not found: %s", path)
return info
def delete_file(self, caller: Command) -> Dict[str, Any]:
"""Handler for delete a file."""
if not caller.kwargs or "path" not in caller.kwargs:
raise ValueError(f"{caller.command_name} requires kwargs")
if self.fs.get(caller.kwargs["path"]).to_dict()["read_only"]:
raise ValueError("File is read only")
if self.printed_file_cb() == caller.kwargs["path"]:
raise ValueError("This file is currently printed")
abs_path = self.inotify_handler.get_abs_os_path(caller.kwargs["path"])
delete(abs_path, False)
return {"source": const.Source.CONNECT}
def delete_folder(self, caller: Command) -> Dict[str, Any]:
"""Handler for delete a folder."""
if not caller.kwargs or "path" not in caller.kwargs:
raise ValueError(f"{caller.command_name} requires kwargs")
if self.fs.get(caller.kwargs["path"]).to_dict()["read_only"]:
raise ValueError("Folder is read only")
if self.printed_file_cb():
if caller.kwargs["path"] in self.printed_file_cb():
raise ValueError(
"The file inside of this folder is currently printed")
abs_path = self.inotify_handler.get_abs_os_path(caller.kwargs["path"])
delete(abs_path, True, force=caller.kwargs.get("force", False))
return {"source": const.Source.CONNECT}
def create_folder(self, caller: Command) -> Dict[str, Any]:
"""Handler for create a folder."""
if not caller.kwargs or "path" not in caller.kwargs:
raise ValueError(f"{caller.command_name} requires kwargs")
relative_path_parameter = caller.kwargs["path"]
abs_path = self.inotify_handler.get_abs_os_path(
relative_path_parameter)
os.makedirs(abs_path, exist_ok=True)
return {"source": const.Source.CONNECT}
def set_handler(self, command: const.Command,
handler: Callable[[Command], Dict[str, Any]]):
"""Set handler for the command.
Handler must return **kwargs dictionary for Command.finish method,
which means that source must be set at least.
"""
self.command.handlers[command] = handler
def handler(self, command: const.Command):
"""Wrap function to handle the command.
Handler must return **kwargs dictionary for Command.finish method,
which means that source must be set at least.
.. code:: python
@printer.command(const.GCODE)
def gcode(prn, gcode):
...
"""
def wrapper(handler: Callable[[Command], Dict[str, Any]]):
self.set_handler(command, handler)
return handler
return wrapper
def parse_command(self, res: Response):
"""Parse telemetry response.
When response from connect is command (HTTP Status: 200 OK), it
will set a command object, if the printer is initialized properly.
"""
if res.status_code == 200:
command_id: Optional[int] = None
try:
command_id_string = res.headers.get("Command-Id", default="")
command_id = int(command_id_string)
except (TypeError, ValueError):
log.error("Invalid Command-Id header. Headers: %s",
res.headers)
self.event_cb(const.Event.REJECTED,
const.Source.CONNECT,
reason="Invalid Command-Id header")
return res
if not self.is_initialised():
self.event_cb(const.Event.REJECTED,
const.Source.WUI,
command_id=command_id,
reason=self.NOT_INITIALISED_MSG)
return res
content_type = res.headers.get("content-type", default="")
log.debug("parse_command res: %s", res.text)
try:
if content_type.startswith("application/json"):
data = res.json()
command_name = data.get("command", "")
if self.command.check_state(command_id, command_name):
self.command.accept(command_id,
command_name=command_name,
args=data.get("args"),
kwargs=data.get('kwargs'))
elif content_type == "text/x.gcode":
command_name = const.Command.GCODE.value
if self.command.check_state(command_id, command_name):
force = ("Force" in res.headers
and res.headers["Force"] == "1")
self.command.accept(command_id,
command_name, [res.text],
{"gcode": res.text},
force=force)
else:
raise ValueError("Invalid command content type")
except Exception as e: # pylint: disable=broad-except
log.exception("")
self.event_cb(const.Event.REJECTED,
const.Source.CONNECT,
command_id=command_id,
reason=str(e))
elif res.status_code == 204: # no cmd in telemetry
pass
else:
log.info("Got unexpected telemetry response (%s): %s",
res.status_code, res.text)
return res
def register(self):
"""Register the printer with Connect and return a registration
temporary code, or fail with a RuntimeError."""
if not self.server:
raise RuntimeError("Server is not set")
# type-version-subversion is deprecated and replaced by printer_type
data = {
"sn": self.sn,
"fingerprint": self.fingerprint,
"printer_type": str(self.__type),
"firmware": self.firmware,
}
res = self.conn.post(self.server + "/p/register",
headers=self.make_headers(),
json=data,
timeout=const.CONNECTION_TIMEOUT)
if res.status_code != 200:
errors.API.ok = False
API.state = CondState.NOK
if res.status_code >= 500:
errors.HTTP.ok = False
HTTP.state = CondState.NOK
else:
errors.HTTP.ok = True
HTTP.state = CondState.OK
log.debug("Status code: {res.status_code}")
raise RuntimeError(res.text)
self.code = res.headers["Code"]
self.queue.put(Register(self.code))
errors.API.ok = True
API.state = CondState.OK
return self.code
def loop(self):
"""Calls loop_step in a loop. Handles any unexpected Exceptions"""
self.__running_loop = True
while self.__running_loop:
try:
self.camera_controller.tick()
# pylint: disable=broad-except
except Exception:
log.exception(
"Unexpected exception from the camera module caught in"
" SDK loop!")
try:
self.loop_step()
# pylint: disable=broad-except
except Exception:
log.exception("Unexpected exception caught in SDK loop!")
def loop_step(self):
"""
Gets an item LoopObject from queue, sends it and handles the response
The LoopObject is either an Event - in which case it's just sent,
a Telemetry, in which case the response might contain a command to
execute, a Register object in which case the response contains the
credentials for further communication.
"""
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
try:
# Get the item to send
item = self.queue.get(timeout=const.TIMESTAMP_PRECISION)
except Empty:
return
# Make sure we're able to send it
if not self.server:
log.warning("Server is not set, skipping item: %s", item)
return
if not issubclass(type(item), LoopObject):
log.warning("Enqueued an unknown item: %s", item)
return
if item.needs_token and not self.token:
errors.TOKEN.ok = False
TOKEN.state = CondState.NOK
log.warning("No token, skipping item: %s", item)
return
# Send it
headers = self.make_headers(item.timestamp)
try:
res = item.send(self.conn, self.server, headers)
except ConnectionError as err:
errors.HTTP.ok = False
HTTP.state = CondState.NOK
log.error(err)
except RequestException as err:
errors.INTERNET.ok = False
INTERNET.state = CondState.NOK
log.error(err)
except Exception: # pylint: disable=broad-except
errors.INTERNET.ok = False
INTERNET.state = CondState.NOK
log.exception('Unhandled error')
else:
# Handle the response
if isinstance(item, Telemetry):
self.parse_command(res)
elif isinstance(item, Register):
if res.status_code == 200:
self.token = res.headers["Token"]
errors.TOKEN.ok = True
TOKEN.state = CondState.OK
log.info("New token was set.")
self.register_handler(self.token)
self.code = None
elif res.status_code == 202 and item.timeout > time():
self.queue.put(item)
sleep(1)
elif isinstance(item, CameraRegister):
camera = item.camera
# pylint: disable=unused-argument
if res.status_code == 200:
camera_token = res.headers["Token"]
camera.set_token(camera_token)
else:
log.warning(res.text)
self.deduce_state_from_code(res.status_code)
if res.status_code > 400:
log.warning(res.text)
elif res.status_code == 400:
log.debug(res.text)
@staticmethod
def deduce_state_from_code(status_code):
"""Deduce our state from the HTTP status code"""
if 299 >= status_code >= 200:
errors.API.ok = True
API.state = CondState.OK
elif status_code == 403:
errors.TOKEN.ok = False
TOKEN.state = CondState.NOK
elif status_code > 400:
errors.API.ok = False
API.state = CondState.NOK
def stop_loop(self):
"""Set internal variable, to stop the loop method."""
self.__running_loop = False
def attach(self, folderpath: str, storage: str):
"""Create a listing of `folderpath` and attach it under `storage`.
This requires linux kernel with inotify support enabled to work.
"""
self.fs.from_dir(folderpath, storage)
self.inotify_handler = InotifyHandler(self.fs)
def detach(self, storage: str):
"""Detach `storage`.
This requires linux kernel with inotify support enabled to work.
"""
self.fs.detach(storage)
self.inotify_handler = InotifyHandler(self.fs)