Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for sending realtime UDP frames #71

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions xled/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from xled.control import ControlInterface, HighControlInterface # noqa: F401
from xled.device import Device # noqa: F401
from xled.discover import DiscoveryInterface # noqa: F401
from xled.realtime import RealtimeChannel # noqa: F401

from .__version__ import __title__, __description__, __version__ # noqa: F401
from .__version__ import __author__, __author_email__ # noqa: F401
Expand Down
4 changes: 2 additions & 2 deletions xled/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,11 @@ def set_mode(self, mode):
"""
Sets new LED operation mode.

:param str mode: Mode to set. One of 'movie', 'demo', 'off'.
:param str mode: Mode to set. One of 'movie', 'rt', 'demo', 'off'.
:raises ApplicationError: on application error
:rtype: None
"""
assert mode in ("movie", "demo", "off")
assert mode in ("movie", "rt", "demo", "off")
json_payload = {"mode": mode}
url = urljoin(self.base_url, "led/mode")
response = self.session.post(url, json=json_payload)
Expand Down
68 changes: 68 additions & 0 deletions xled/realtime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# -*- coding: utf-8 -*-

"""
xled.realtime
~~~~~~~~~~~~~

Functions to support the realtime mode of the device.
"""

from __future__ import absolute_import

import base64
import math
import socket

from xled.control import ControlInterface

#: UDP port to send realtime frames to
REALTIME_UDP_PORT_NUMBER = 7777


class RealtimeChannel(object):
"""
Main interface to send realtime frames to device.

:param control: An activated ControlInterface for the device to control
:param int leds_number: the number of leds in a frame
:param int bytes_per_led: the number of bytes per led (3 or 4)
"""

def __init__(self, control, leds_number, bytes_per_led):
self.control = control
self.leds_number = leds_number
self.bytes_per_led = bytes_per_led

def start_realtime(self):
self.control.set_mode('rt')

def send_frame(self, data):
"""
Sends a realtime frame. Before calling this, start_realtime() must have
been called.

:param bytearray data: byte array containing the raw frame data
:rtype: None
"""
data_size = self.leds_number*self.bytes_per_led
assert len(data) == data_size
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
if data_size < 900 and self.leds_number < 256:
# Send single frame
packet = bytearray(b'\x01')
packet.extend(base64.b64decode(self.control.session.access_token))
packet.extend(bytes([self.leds_number]))
packet.extend(data)
sock.sendto(packet, (self.control.host, REALTIME_UDP_PORT_NUMBER))
else:
# Send multi frame
packet_size = 900//self.bytes_per_led
for i in range(0, math.ceil(data_size/packet_size)):
packet_data = data[:(900//self.bytes_per_led)]
data = data[(900//self.bytes_per_led):]
packet = [ b'\x03', base64.b64decode(self.control.session.access_token),
b'\x00\x00', bytes([i])]
packet.append(packet_data)
sock.sendto(packet, (self.control.host, REALTIME_UDP_PORT_NUMBER))