diff --git a/python_banyan/banyan_base_aio/banyan_base_aio.py b/python_banyan/banyan_base_aio/banyan_base_aio.py index 544bb4c..f2d4c58 100644 --- a/python_banyan/banyan_base_aio/banyan_base_aio.py +++ b/python_banyan/banyan_base_aio/banyan_base_aio.py @@ -103,7 +103,8 @@ def __init__(self, back_plane_ip_address=None, subscriber_port='43125', # fix for "not implemented" bugs in Python 3.8 if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - self.event_loop = asyncio.get_event_loop() + self.event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.event_loop) # if using numpy apply the msgpack_numpy monkey patch if numpy: diff --git a/python_banyan/utils/tcp_gateway/Running_Demos.md b/python_banyan/utils/tcp_gateway/Running_Demos.md new file mode 100644 index 0000000..88bb1d9 --- /dev/null +++ b/python_banyan/utils/tcp_gateway/Running_Demos.md @@ -0,0 +1,26 @@ +# Running The Demos + +There are two demos available. Currently, data can only be sent in one +direction at a time since I have not figured out how to perform concurrency in +MicroPython. The threading +library is said to be experimental. However, I did locate [this code](https://github.com/fadushin/esp8266/blob/790958fa332592c80a0f81f25cdaa9513d596f64/micropython/uhttpd/uhttpd/__init__.py#L354) which may solve +the concurrency issue. I have yet to have a chance to see if the code works. + +## Sending Messages To The Local Simulated Server From The Pico +Here are the steps used: + +1. Start the backplane. +2. Start the [messages_from_pico.py](https://github.com/MrYsLab/python_banyan/blob/tcp_gateway/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_from_pico.py) MicroPython script. +3. Start [sim_messages_from_pico.py](https://github.com/MrYsLab/python_banyan/blob/tcp_gateway/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_from_pico.py) +4. Start the [tcp_gateway.py](https://github.com/MrYsLab/python_banyan/blob/tcp_gateway/python_banyan/utils/tcp_gateway/tcp_gateway.py) + +You should see the messages if you look at the console window for sim_messages_from_pico.py. + + +## Sending Messages From the Local Simulated Server To The Pico +1. Start the backplane. +2. Start the [messages_to_pico.py](https://github.com/MrYsLab/python_banyan/blob/tcp_gateway/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_to_pico.py) MicroPython script. +3. Start the [tcp_gateway.py](https://github.com/MrYsLab/python_banyan/blob/tcp_gateway/python_banyan/utils/tcp_gateway/tcp_gateway.py) +4. Start the [sim_messages_to_pico.py](https://github.com/MrYsLab/python_banyan/blob/tcp_gateway/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_to_pico.py) script. + +Check the Thonny shell for incoming messages. \ No newline at end of file diff --git a/python_banyan/utils/tcp_gateway/__init__.py b/python_banyan/utils/tcp_gateway/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python_banyan/utils/tcp_gateway/micropython_script.py b/python_banyan/utils/tcp_gateway/micropython_script.py new file mode 100644 index 0000000..844eebc --- /dev/null +++ b/python_banyan/utils/tcp_gateway/micropython_script.py @@ -0,0 +1,43 @@ +import network, rp2, time +import socket +import umsgpack +from machine import Pin + +led = Pin("LED", Pin.OUT) +led.on() + +# set your WiFi Country +rp2.country('US') + +wlan = network.WLAN(network.STA_IF) +wlan.active(True) + +# set power mode to get WiFi power-saving off (if needed) +wlan.config(pm = 0xa11140) + +wlan.connect('YOUR_SSID', 'YOUR_PASSWORD') + + +while not wlan.isconnected() and wlan.status() >= 0: + print("Waiting to connect:") + time.sleep(1) + +print(wlan.ifconfig()) + +# create a socket server +serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +addr = ("",31335) +serversocket.bind(addr) +serversocket.listen() +(clientsocket, address) = serversocket.accept() +print((clientsocket, address)) +led.off() +while True: + data = clientsocket.recv(1024) + if data: + z = umsgpack.loads(data) + print(z) + + + diff --git a/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_from_pico.py b/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_from_pico.py new file mode 100644 index 0000000..233d47f --- /dev/null +++ b/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_from_pico.py @@ -0,0 +1,63 @@ +import network +import rp2 +import time +import socket +import umsgpack +from machine import Pin + +led = Pin("LED", Pin.OUT) +led.on() + +# set your Wi-Fi Country +rp2.country('US') + +wlan = network.WLAN(network.STA_IF) +wlan.active(True) + +# set power mode to get Wi-Fi power-saving off (if needed) +wlan.config(pm=0xa11140) + +wlan.connect('A-Net', 'Sam2Curly') + +while not wlan.isconnected() and wlan.status() >= 0: + print("Waiting to connect:") + time.sleep(1) + +print(wlan.ifconfig()) + +# create a socket server +server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +addr = ("", 31335) +server_socket.bind(addr) +server_socket.listen() +(client_socket, address) = server_socket.accept() +print((client_socket, address)) +led.off() + +count = 0 +while True: + payload = {'pico_data': count} + count += 1 + packed = umsgpack.dumps(payload) + + # get the length of the payload and express as a bytearray + p_length = bytearray(len(packed).to_bytes(1, 'big')) + + # append the length to the packed bytarray + p_length.extend(packed) + + # convert from bytearray to bytes + packed = bytes(p_length) + + print(f'packed: {packed} length: {len(packed)}') + # bpacked = bytearray(packed) + # l = len(packed) + # print(f'l = {l}') + # lenn = bytearray(l) + # z = lenn.extend(bpacked) + # q = bytes(z) + # print(f'z = {z}') + client_socket.sendall(packed) + time.sleep(.01) + print(f'sending {count}') diff --git a/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_to_pico.py b/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_to_pico.py new file mode 100644 index 0000000..7a9e749 --- /dev/null +++ b/python_banyan/utils/tcp_gateway/pico_micropython_scripts/messages_to_pico.py @@ -0,0 +1,50 @@ +import network, rp2, time +import socket +import umsgpack +from machine import Pin + +led = Pin("LED", Pin.OUT) +led.on() + +# set your WiFi Country +rp2.country('US') + +wlan = network.WLAN(network.STA_IF) +wlan.active(True) + +# set power mode to get WiFi power-saving off (if needed) +wlan.config(pm = 0xa11140) + +wlan.connect('A-Net', 'Sam2Curly') + + +while not wlan.isconnected() and wlan.status() >= 0: + print("Waiting to connect:") + time.sleep(1) + +print(wlan.ifconfig()) + +# create a socket server +serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + +addr = ("",31335) +serversocket.bind(addr) +serversocket.listen() +(clientsocket, address) = serversocket.accept() +print((clientsocket, address)) +led.off() + +count = 0 +while True: + data = clientsocket.recv(1024) + if data: + z = umsgpack.loads(data) + print(z) + # payload = {'from_the_pico': count} + # count += 1 + # packed = umsgpack.dumps(payload) + # clientsocket.sendall(packed) + # print('sending') + + + diff --git a/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_from_pico.py b/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_from_pico.py new file mode 100644 index 0000000..b917b1b --- /dev/null +++ b/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_from_pico.py @@ -0,0 +1,74 @@ +# zmq_banyan_local_station_simulator.py + +# requires Banyan 'backplane' + +""" +Python-Banyan Providence: + +Copyright (c) 2018-2019 Alan Yorinks All right reserved. + +Python Banyan is free software; you can redistribute it and/or +modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE +Version 3 as published by the Free Software Foundation; either +or (at your option) any later version. +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE +along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +""" + +import time + +from python_banyan.banyan_base import BanyanBase + + +class Bpub(BanyanBase): + + def __init__(self): + + # initialize the base class + super(Bpub, self).__init__(process_name='Bpub') + self.set_subscriber_topic('from_pico') + + # send a message + # self.publish_payload({'from_banyan': 'hello_world'}, 'figura') + # self.count = 0 + + # get the reply messages + try: + self.receive_loop() + except KeyboardInterrupt: + self.clean_up() + sys.exit(0) + + while True: + # send command to turn Led_0 ON + # self.publish_payload({'Led_0': self.count}, 'figura') + # self.count += 1 + time.sleep(0.01) + # print('published ON') + + # send command to turn Led_0 OFF + # self.publish_payload({'Led_0': self.count}, 'figura') + # self.count += 1 + + # time.sleep(0.01) + # print('published OFF') + + def incoming_message_processing(self, topic, payload): + """ + Process incoming messages received from the echo client + :param topic: Message Topic string + :param payload: Message Data + """ + + # When a message is received and its number is zero, finish up. + print(f'topic: {topic} payload: {payload}') + + +b = Bpub() diff --git a/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_to_pico.py b/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_to_pico.py new file mode 100644 index 0000000..d64da0e --- /dev/null +++ b/python_banyan/utils/tcp_gateway/simulated_local_station/sim_messages_to_pico.py @@ -0,0 +1,62 @@ +# zmq_banyan_local_station_simulator.py + +# requires Banyan 'backplane' + +""" +Python-Banyan Providence: + +Copyright (c) 2018-2019 Alan Yorinks All right reserved. + +Python Banyan is free software; you can redistribute it and/or +modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE +Version 3 as published by the Free Software Foundation; either +or (at your option) any later version. +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +General Public License for more details. + +You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE +along with this library; if not, write to the Free Software +Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +""" + + +import time + +from python_banyan.banyan_base import BanyanBase + + +class Bpub(BanyanBase): + + + def __init__(self): + + # initialize the base class + super(Bpub, self).__init__(process_name='Bpub') + + # send a message + self.publish_payload({'from_banyan': 'hello_world'}, 'figura') + self.count = 0 + + while True: + + # send command to turn Led_0 ON + self.publish_payload({'Led_0': self.count}, 'figura') + self.count += 1 + time.sleep(0.01) + #print('published ON') + + # send command to turn Led_0 OFF + self.publish_payload({'Led_0': self.count}, 'figura') + self.count += 1 + + time.sleep(0.01) + #print('published OFF') + + # exit + self.clean_up() + + +b = Bpub() diff --git a/python_banyan/utils/tcp_gateway/tcp_gateway.py b/python_banyan/utils/tcp_gateway/tcp_gateway.py new file mode 100644 index 0000000..a1d93a0 --- /dev/null +++ b/python_banyan/utils/tcp_gateway/tcp_gateway.py @@ -0,0 +1,212 @@ +""" +tcp_gateway.py + + Copyright (c) 2022 Alan Yorinks All right reserved. + + The tcp_gateway is free software; you can redistribute it and/or + modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE + Version 3 as published by the Free Software Foundation; either + or (at your option) any later version. + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE + along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + +""" + +import argparse +import asyncio +import json +import signal +import sys +import zmq + +from python_banyan.banyan_base_aio import BanyanBaseAIO +from python_banyan.utils.tcp_gateway.tcp_socket import TcpSocket + + +class TcpGateWay(BanyanBaseAIO): + """ + This class is a gateway between a python-banyan network and a TCP socket. + + The TcpGateway serves as a simple connector between a python-banyan network and a TCP + server. It acts as a TCP client and connects to a TCP server. + The connected TCP server must have the capability to both MessagePack encode and + decode messages. The TcpGateway does not provide any encoding or decoding of its own. + """ + + # noinspection PyUnusedLocal + def __init__(self, tcp_server_ip_address=None, tcp_server_ip_port=31335, + subscriber_list=None, banyan_pub_topic=None, back_plane_ip_address=None, + subscriber_port='43125', publisher_port='43124', + process_name='TCPGateWay', event_loop=None, auto_start=True): + """ + :param tcp_server_ip_address: TCP IP address of server in string format + :param tcp_server_ip_port: TCP IP port of server in numerical format + :param subscriber_list: A list of banyan topics + :param banyan_pub_topic: published message topic + :param back_plane_ip_address: Banyan backplane IP address - will be auto-detected + :param subscriber_port: Banyan subscriber port + :param publisher_port: Banyan publisher port + :param process_name: TCP Gateway + :param event_loop: asyncio event loop + :param auto_start: automatically start the tasks + """ + # initialize the base class + super(TcpGateWay, self).__init__(back_plane_ip_address, subscriber_port, + publisher_port, process_name=process_name) + + if not tcp_server_ip_address: + raise RuntimeError("A TCP IP address for the TCP server must be provided.") + + self.tcp_server_ip_address = tcp_server_ip_address + + self.tcp_server_ip_port = tcp_server_ip_port + + # save the publication topic + if not banyan_pub_topic: + raise RuntimeError('A publishing topic must be provided') + + # encode the topic + self.banyan_pub_topic = banyan_pub_topic.encode() + + if not subscriber_list: + raise RuntimeError('A list of subscription topics must be provided') + + self.subscriber_list = subscriber_list + + if not self.back_plane_ip_address: + self.back_plane_ip_address = back_plane_ip_address + + self.subscriber_port = subscriber_port + + self.publisher_port = publisher_port + + # if the user passes in a single topic, convert the topic to a list of topics + if type(self.subscriber_list) is not list: + self.subscriber_list = [self.subscriber_list] + + if not event_loop: + self.event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.event_loop) + + # tcp client socket + self.sock = None + + self.tcp_recv_task = None + + if auto_start: + self.event_loop.run_until_complete(self.start_aio()) + + async def start_aio(self): + """ + Start up the tasks + """ + + await self.begin() + + self.sock = TcpSocket(self.tcp_server_ip_address, self.tcp_server_ip_port, + self.event_loop) + await self.sock.start() + + self.the_task = self.event_loop.create_task(self._receive_tcp_data()) + + while True: + await asyncio.sleep(.1) + + async def _receive_tcp_data(self): + while True: + # get the length of the next packet + byte_val = await self.sock.read(1) + length = int.from_bytes(byte_val, "big") + + pico_packet = await self.sock.read(length) + + await self.publisher.send_multipart([self.banyan_pub_topic, pico_packet]) + + async def receive_loop(self): + """ + This is the receive loop for Banyan messages. + + This method may be overwritten to meet the needs + of the application before handling received messages. + + """ + while True: + data = await self.subscriber.recv_multipart() + msg = await self.unpack(data[1]) + # print(f'Message from PC host: {msg}') + await self.sock.write(data[1]) + + +def tcp_gateway(): + # allow user to bypass the IP address auto-discovery. This is necessary if the component resides on a computer + # other than the computing running the backplane. + + parser = argparse.ArgumentParser() + + parser.add_argument("-a", dest="tcp_ip_address", default="None", + help="IP address TCP Server") + parser.add_argument("-b", dest="back_plane_ip_address", default="None", + help="None or IP address used by Back Plane", ) + parser.add_argument("-e", dest="banyan_pub_topic", default="from_pico", + help="Topic for messages to the host PC") + parser.add_argument("-g", dest="subscription_list", nargs='+', default="figura", + help="Banyan topics space delimited: topic1 topic2 topic3"), + parser.add_argument("-l", dest="event_loop", default="None", + help="asyncio event loop") + parser.add_argument("-n", dest="tcp_port", default=31335, + help="TCP Server Port Number") + parser.add_argument("-p", dest="publisher_port", default='43124', + help="Publisher IP port") + parser.add_argument("-s", dest="subscriber_port", default='43125', + help="Subscriber IP port") + parser.add_argument("-z", dest="process_name", default="TcpGateway", + help="Name of this gateway") + + args = parser.parse_args() + kw_options = {} + + if args.back_plane_ip_address != 'None': + kw_options['back_plane_ip_address'] = args.back_plane_ip_address + else: + args.back_plane_ip_address = None + + if args.event_loop == 'None': + args.event_loop = None + else: + kw_options['event_loop'] = args.event_loop + + kw_options = { + 'back_plane_ip_address': args.back_plane_ip_address, + 'tcp_server_ip_address': args.tcp_ip_address, + 'tcp_server_ip_port': args.tcp_port, + 'publisher_port': args.publisher_port, + 'subscriber_port': args.subscriber_port, + 'process_name': args.process_name, + 'subscriber_list': args.subscription_list, + 'banyan_pub_topic': args.banyan_pub_topic, + 'event_loop': args.event_loop + } + + TcpGateWay(**kw_options) + + +# noinspection PyShadowingNames,PyUnusedLocal,PyUnusedLocal +# signal handler function called when Control-C occurs +# noinspection PyShadowingNames,PyUnusedLocal,PyUnusedLocal +def signal_handler(sig, frame): + print('Exiting Through Signal Handler') + sys.exit(0) + + +# listen for SIGINT +signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGTERM, signal_handler) + +if __name__ == '__main__': + tcp_gateway() diff --git a/python_banyan/utils/tcp_gateway/tcp_socket.py b/python_banyan/utils/tcp_gateway/tcp_socket.py new file mode 100644 index 0000000..200ed1a --- /dev/null +++ b/python_banyan/utils/tcp_gateway/tcp_socket.py @@ -0,0 +1,79 @@ + +""" + Copyright (c) 2020 Alan Yorinks All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU AFFERO GENERAL PUBLIC LICENSE + Version 3 as published by the Free Software Foundation; either + or (at your option) any later version. + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + General Public License for more details. + + You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE + along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +""" + + +import asyncio +import sys + + +# noinspection PyStatementEffect,PyUnresolvedReferences,PyUnresolvedReferences +class TcpSocket: + """ + This class encapsulates management of a tcp/ip connection that communicates + with a TCP server + """ + def __init__(self, ip_address, ip_port, loop): + self.ip_address = ip_address + self.ip_port = ip_port + self.loop = loop + self.reader = None + self.writer = None + + async def start(self): + """ + This method opens an IP connection on the IP device + + :return: None + """ + try: + self.reader, self.writer = await asyncio.open_connection( + self.ip_address, self.ip_port) + print(f'Successfully connected to: {self.ip_address}:{self.ip_port}') + except OSError: + print("Can't open connection to " + self.ip_address) + sys.exit(0) + + async def write(self, data): + """ + This method writes sends data to the IP device + :param data: + + :return: None + """ + # we need to convert data formats, + # so all of the below. + output_list = [] + + # create an array of integers from the data to be sent + for x in data: + # output_list.append((ord(x))) + output_list.append(x) + + # now convert the integer list to a bytearray + to_wifi = bytearray(output_list) + self.writer.write(to_wifi) + await self.writer.drain() + + async def read(self, num_bytes=1): + """ + This method reads one byte of data from IP device + + :return: Next byte + """ + buffer = await self.reader.read(num_bytes) + return buffer diff --git a/setup.py b/setup.py index 292ea3c..e7892fc 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( name='python-banyan', - version='3.11', + version='3.13', packages=[ 'python_banyan', 'python_banyan.banyan_base', @@ -17,6 +17,7 @@ 'python_banyan.utils.monitor', 'python_banyan.utils.banyan_launcher', 'python_banyan.utils.mqtt_gateway', + 'python_banyan.utils.tcp_gateway', 'python_banyan.backplane', ], install_requires=[ @@ -36,7 +37,8 @@ 'bls = python_banyan.utils.banyan_launcher.bls:bls', 'blc = python_banyan.utils.banyan_launcher.blc:blc', 'blk = python_banyan.utils.banyan_launcher.blk:blk', - 'mgw = python_banyan.utils.mqtt_gateway.mqtt_gateway:mqtt_gateway' + 'mgw = python_banyan.utils.mqtt_gateway.mqtt_gateway:mqtt_gateway', + 'tgw = python_banyan.utils.tcp_gateway.tcp_gateway:tcp_gateway', ] },