Skip to content

Commit

Permalink
Use phew as a http server. Make code async
Browse files Browse the repository at this point in the history
  • Loading branch information
North101 committed Jun 30, 2024
1 parent f416fe1 commit cd77f40
Show file tree
Hide file tree
Showing 6 changed files with 878 additions and 68 deletions.
77 changes: 77 additions & 0 deletions access-front-door/src/lib/phew/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
__version__ = "0.0.2"

# highly recommended to set a lowish garbage collection threshold
# to minimise memory fragmentation as we sometimes want to
# allocate relatively large blocks of ram.
import gc, os, machine
gc.threshold(50000)

# phew! the Pico (or Python) HTTP Endpoint Wrangler
from . import logging

# determine if remotely mounted or not, changes some behaviours like
# logging truncation
remote_mount = False
try:
os.statvfs(".") # causes exception if remotely mounted (mpremote/pyboard.py)
except:
remote_mount = True

def get_ip_address():
import network
try:
return network.WLAN(network.STA_IF).ifconfig()[0]
except:
return None

def is_connected_to_wifi():
import network, time
wlan = network.WLAN(network.STA_IF)
return wlan.isconnected()

# helper method to quickly get connected to wifi
def connect_to_wifi(ssid, password, timeout_seconds=30):
import network, time

statuses = {
network.STAT_IDLE: "idle",
network.STAT_CONNECTING: "connecting",
network.STAT_WRONG_PASSWORD: "wrong password",
network.STAT_NO_AP_FOUND: "access point not found",
network.STAT_CONNECT_FAIL: "connection failed",
network.STAT_GOT_IP: "got ip address"
}

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
start = time.ticks_ms()
status = wlan.status()

logging.debug(f" - {statuses[status]}")
while not wlan.isconnected() and (time.ticks_ms() - start) < (timeout_seconds * 1000):
new_status = wlan.status()
if status != new_status:
logging.debug(f" - {statuses[status]}")
status = new_status
time.sleep(0.25)

if wlan.status() == network.STAT_GOT_IP:
return wlan.ifconfig()[0]
return None


# helper method to put the pico into access point mode
def access_point(ssid, password = None):
import network

# start up network in access point mode
wlan = network.WLAN(network.AP_IF)
wlan.config(essid=ssid)
if password:
wlan.config(password=password)
else:
wlan.config(security=0) # disable password
wlan.active(True)

return wlan
32 changes: 32 additions & 0 deletions access-front-door/src/lib/phew/dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import uasyncio, usocket
from . import logging

async def _handler(socket, ip_address):
while True:
try:
yield uasyncio.core._io_queue.queue_read(socket)
request, client = socket.recvfrom(256)
response = request[:2] # request id
response += b"\x81\x80" # response flags
response += request[4:6] + request[4:6] # qd/an count
response += b"\x00\x00\x00\x00" # ns/ar count
response += request[12:] # origional request body
response += b"\xC0\x0C" # pointer to domain name at byte 12
response += b"\x00\x01\x00\x01" # type and class (A record / IN class)
response += b"\x00\x00\x00\x3C" # time to live 60 seconds
response += b"\x00\x04" # response length (4 bytes = 1 ipv4 address)
response += bytes(map(int, ip_address.split("."))) # ip address parts
socket.sendto(response, client)
except Exception as e:
logging.error(e)

def run_catchall(ip_address, port=53):
logging.info("> starting catch all dns server on port {}".format(port))

_socket = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
_socket.setblocking(False)
_socket.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
_socket.bind(usocket.getaddrinfo(ip_address, port, 0, usocket.SOCK_DGRAM)[0][-1])

loop = uasyncio.get_event_loop()
loop.create_task(_handler(_socket, ip_address))
111 changes: 111 additions & 0 deletions access-front-door/src/lib/phew/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import machine, os, gc

log_file = "log.txt"

LOG_INFO = 0b00001
LOG_WARNING = 0b00010
LOG_ERROR = 0b00100
LOG_DEBUG = 0b01000
LOG_EXCEPTION = 0b10000
LOG_ALL = LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_DEBUG | LOG_EXCEPTION

_logging_types = LOG_ALL

# the log file will be truncated if it exceeds _log_truncate_at bytes in
# size. the defaults values are designed to limit the log to at most
# three blocks on the Pico
_log_truncate_at = 11 * 1024
_log_truncate_to = 8 * 1024

def datetime_string():
dt = machine.RTC().datetime()
return "{0:04d}-{1:02d}-{2:02d} {4:02d}:{5:02d}:{6:02d}".format(*dt)

def file_size(file):
try:
return os.stat(file)[6]
except OSError:
return None

def set_truncate_thresholds(truncate_at, truncate_to):
global _log_truncate_at
global _log_truncate_to
_log_truncate_at = truncate_at
_log_truncate_to = truncate_to

def enable_logging_types(types):
global _logging_types
_logging_types = _logging_types | types

def disable_logging_types(types):
global _logging_types
_logging_types = _logging_types & ~types

# truncates the log file down to a target size while maintaining
# clean line breaks
def truncate(file, target_size):
# get the current size of the log file
size = file_size(file)

# calculate how many bytes we're aiming to discard
discard = size - target_size
if discard <= 0:
return

with open(file, "rb") as infile:
with open(file + ".tmp", "wb") as outfile:
# skip a bunch of the input file until we've discarded
# at least enough
while discard > 0:
chunk = infile.read(1024)
discard -= len(chunk)

# try to find a line break nearby to split first chunk on
break_position = max(
chunk.find (b"\n", -discard), # search forward
chunk.rfind(b"\n", -discard) # search backwards
)
if break_position != -1: # if we found a line break..
outfile.write(chunk[break_position + 1:])

# now copy the rest of the file
while True:
chunk = infile.read(1024)
if not chunk:
break
outfile.write(chunk)

# delete the old file and replace with the new
os.remove(file)
os.rename(file + ".tmp", file)


def log(level, text):
datetime = datetime_string()
log_entry = "{0} [{1:8} /{2:>4}kB] {3}".format(datetime, level, round(gc.mem_free() / 1024), text)
print(log_entry)
with open(log_file, "a") as logfile:
logfile.write(log_entry + '\n')

if _log_truncate_at and file_size(log_file) > _log_truncate_at:
truncate(log_file, _log_truncate_to)

def info(*items):
if _logging_types & LOG_INFO:
log("info", " ".join(map(str, items)))

def warn(*items):
if _logging_types & LOG_WARNING:
log("warning", " ".join(map(str, items)))

def error(*items):
if _logging_types & LOG_ERROR:
log("error", " ".join(map(str, items)))

def debug(*items):
if _logging_types & LOG_DEBUG:
log("debug", " ".join(map(str, items)))

def exception(*items):
if _logging_types & LOG_EXCEPTION:
log("exception", " ".join(map(str, items)))
Loading

0 comments on commit cd77f40

Please sign in to comment.