Skip to content
This repository has been archived by the owner on Feb 11, 2024. It is now read-only.

MQTT&Docker support basics #6

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM python:3.6.8-slim-stretch

ADD . /app
WORKDIR /app
RUN cd /app && python setup.py install
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,31 @@ lw301_server --help

CLI options overrided config options.

### Using Docker image

#### How to pack

```bash
docker build . -t lw301-server
```

#### How to use

- mkdir /var/log/lw301
- Create and run shell script run_lw301.sh

```bash
#!/bin/sh
docker rm lw301 --force
docker run \
--name lw301 -d \
--restart always \
-p __OUTBOUND_IP_ADDR___:80:47265 \
-v /var/log/lw301:/log \
--enable-mqtt --mqtt-user=test --mqtt-password=test --mqtt-host=127.0.0.1 \
lw301-server lw301_server --address=0.0.0.0 --log_file_prefix=/log/server.log
```


## Triggers

Expand Down
14 changes: 13 additions & 1 deletion lw301_server
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ from lw301_server_app.handler.logging import LoggingHandler
from lw301_server_app.handler.update import UpdateHandler
from lw301_server_app.handler import api
from lw301_server_app.trigger.influxdb import InfluxDbTrigger
from lw301_server_app.trigger.mqtt import MqttTrigger

logger = logging.getLogger(os.path.basename(__file__).replace('.py', ''))

Expand All @@ -39,8 +40,10 @@ def define_options():

# trigger options
define('enable-influxdb', default=False, type=bool, help='enable influxdb trigger')
InfluxDbTrigger.add_options()
define('enable-mqtt', default=False, type=bool, help='enable mqtt trigger')

InfluxDbTrigger.add_options()
MqttTrigger.add_options()

def parse_options():
define_options()
Expand All @@ -60,6 +63,11 @@ def init_triggers(options: OptionParser, ioloop: IOLoop):
if options.enable_influxdb:
logger.info('influxdb trigger enabled')
triggers.append(InfluxDbTrigger(ioloop, options))

if options.enable_mqtt:
logger.info('mqtt trigger enabled')
triggers.append(MqttTrigger(ioloop, options))

return triggers


Expand All @@ -79,4 +87,8 @@ if __name__ == "__main__":
ioloop.start()
except KeyboardInterrupt:
logger.info('server stopped')

for t in application.settings['triggers']:
t.before_stop()

sys.exit(0)
3 changes: 3 additions & 0 deletions lw301_server_app/trigger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ def __init__(self, ioloop: IOLoop, app_options: OptionParser):
async def on_new_data(self, measurement, value):
pass

def before_stop(self):
pass

@staticmethod
def add_options():
pass
78 changes: 78 additions & 0 deletions lw301_server_app/trigger/mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# coding: utf-8
from logging import getLogger
import time
import json

from tornado.ioloop import IOLoop
from tornado.options import define, OptionParser
from tornado.httpclient import HTTPRequest, HTTPClientError

import paho.mqtt.client as mqtt

from lw301_server_app.trigger import Trigger


class MqttTrigger(Trigger):

_as_tags = ('mac', 'channel')

log = getLogger('mqtt_trigger')


@staticmethod
def add_options():
define('mqtt-host', type=str)
define('mqtt-port', type=int, default=1883)
define('mqtt-clientid', type=str, default='lw301')
define('mqtt-user', type=str, default=None)
define('mqtt-password', type=str, default=None)

def __init__(self, ioloop: IOLoop, app_options: OptionParser):
super().__init__(ioloop, app_options)
# self.http_client = AsyncHTTPClient()
self.client = mqtt.Client()

if self.app_options.mqtt_user is not None:
self.client.username_pw_set(
self.app_options.mqtt_user,
self.app_options.mqtt_password,
)

self.client.connect(
self.app_options.mqtt_host,
self.app_options.mqtt_port
)
self.client.loop_start()
# test self.client.publish("lw301/1/2", 10.2)

def before_stop(self):
self.client.loop_stop()

async def on_new_data(self, measurement, value):
v = value._asdict()

ch = "" if ('channel' not in v or v['channel'] is None) else "_{}".format(v['channel'])

self.client.publish("lw301/{}{}/{}".format(v['mac'], ch, measurement), json.dumps(v))

# # delay processing for next ioloop tick
# self.ioloop.add_callback(
# self._send_data,
# value=value,
# measurement=measurement,
# write_url=self.app_options.influxdb_writeurl,
# user=self.app_options.influxdb_user,
# password=self.app_options.influxdb_password,
# )
#
# def _value(self, value):
# if isinstance(value, int):
# return '{}i'.format(value)
# elif isinstance(value, float):
# return '{:0.2f}'.format(value)
# return '{}'.format(value)
#
# def _tag_value(self, value):
# if isinstance(value, int):
# value = str(value)
# return self._value(value)
15 changes: 15 additions & 0 deletions lw301_server_tests/test_mqtt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# coding: utf-8
from unittest import TestCase

from lw301_server_app import protocol


class TestMqttTrigger(TestCase):

def test_mqtt_send(self):
pass
# raw = b'mac=00001234abcd&id=c2&pv=0&lb=0&ac=0&reg=0001&lost=0000&baro=982&ptr=0&wfor=0&p=1'
# body = protocol.parse_body(raw)
# self.assertIsNone(body.channel)
# self.assertIsNone(body.sensor_values)
# self.assertEqual(body.global_values, {'pressure_hPa': 982})
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
setup(
name='lw301-server',
version='0.0.1',
install_requires=['tornado>=5.1, <6.0'],
install_requires=['tornado>=5.1, <6.0', 'paho-mqtt'],
tests_require=['nose', 'pycodestyle'],
test_suite='nose.collector',
scripts=['lw301_server'],
Expand Down