-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for sending realtime UDP frames #67
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -320,11 +320,11 @@ def set_mode(self, mode): | |
""" | ||
Sets new LED operation mode. | ||
|
||
:param str mode: Mode to set. One of 'movie', 'demo', 'off'. | ||
:param str mode: Mode to set. One of 'movie', 'rt', 'demo', 'off'. | ||
:raises ApplicationError: on application error | ||
:rtype: None | ||
""" | ||
assert mode in ("movie", "demo", "off") | ||
assert mode in ("movie", "rt", "demo", "off") | ||
json_payload = {"mode": mode} | ||
url = urljoin(self.base_url, "led/mode") | ||
response = self.session.post(url, json=json_payload) | ||
|
@@ -411,7 +411,7 @@ def set_timer(self, time_on, time_off, time_now=None): | |
assert all(key in app_response.keys() for key in required_keys) | ||
|
||
|
||
class HighControlInterface(ControlInterface): | ||
xclass HighControlInterface(ControlInterface): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto!!! |
||
""" | ||
High level interface to control specific device | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
# -*- coding: utf-8 -*- | ||
|
||
""" | ||
xled.realtime | ||
~~~~~~~~~~~~~ | ||
|
||
Functions to support the realtime mode of the device. | ||
""" | ||
|
||
from __future__ import absolute_import | ||
|
||
import base64 | ||
import math | ||
import socket | ||
|
||
from xled.control import ControlInterface | ||
|
||
#: UDP port to send realtime frames to | ||
REALTIME_UDP_PORT_NUMBER = 7777 | ||
|
||
|
||
class RealtimeChannel(object): | ||
""" | ||
Main interface to send realtime frames to device. | ||
|
||
:param control: An activated ControlInterface for the device to control | ||
:param int leds_number: the number of leds in a frame | ||
:param int bytes_per_led: the number of bytes per led (3 or 4) | ||
""" | ||
|
||
def __init__(self, control, leds_number, bytes_per_led): | ||
self.control = control | ||
self.leds_number = leds_number | ||
self.bytes_per_led = bytes_per_led | ||
|
||
def start_realtime(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure about this interface / API. I understand that it might be useful to have it here. On the other hand if we have start here we would need a stop here - otherwise we end up with a device that stops getting new data and displays just last frame for couple of seconds until it timeouts. And how would the stop look? In the case of a) one this would just duplicate corresponding methods from control one. Or leak underlying API. In the end - do we need b) might be interface that is easier for a caller. But in the end it would need much more logic for a class that is indended for RealtimeChannel. So I guess in that case it might be better to have all the logic in higher level interface where one would not call start nor stop and only send data. Maybe even define Context manager (with |
||
self.control.set_mode('rt') | ||
|
||
def send_frame(self, data): | ||
""" | ||
Sends a realtime frame. Before calling this, start_realtime() must have | ||
been called. | ||
|
||
:param bytearray data: byte array containing the raw frame data | ||
:rtype: None | ||
""" | ||
data_size = self.leds_number*self.bytes_per_led | ||
assert len(data) == data_size | ||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: | ||
if data_size < 900 and self.leds_number < 256: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I have found recently (see https://github.com/scrool/xled-docs/pull/13 ) very first packet doesn't depend on the number of the LEDs but rather on the firmware version. I have Generation II device with 250 LEDs that initially used protocol version 2 and only later switched to protocol version three (like bellow). |
||
# Send single frame | ||
packet = bytearray(b'\x01') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would like to see construction of the packet in network-independent code. Ideally also together with tests. With this approach, in a single function, we would unecesarily need to mock networking just to test out building of a packet. |
||
packet.extend(base64.b64decode(self.control.session.access_token)) | ||
packet.extend(bytes([self.leds_number])) | ||
packet.extend(data) | ||
sock.sendto(packet, (self.control.host, REALTIME_UDP_PORT_NUMBER)) | ||
else: | ||
# Send multi frame | ||
packet_size = 900//self.bytes_per_led | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same reasoning as above with protocol version and separation of the packet construction. |
||
for i in range(0, math.ceil(data_size/packet_size)): | ||
packet_data = data[:(900//self.bytes_per_led)] | ||
data = data[(900//self.bytes_per_led):] | ||
packet = [ b'\x03', base64.b64decode(self.control.session.access_token), | ||
b'\x00\x00', bytes([i])] | ||
packet.append(packet_data) | ||
sock.sendto(packet, (self.control.host, REALTIME_UDP_PORT_NUMBER)) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
x
here is an typo.