Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor access-front-door #5

Merged
merged 8 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.venv/
.vscode/

access-front-door/src/env.py
69 changes: 68 additions & 1 deletion access-front-door/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,70 @@
# Access Front Door

This is the code required for opening the front door latch.
This is the code required for opening the front door latch.


## Setting up environment variables

1. Copy or rename `access-front-door/src/env.example.py` to `access-front-door/src/env.py`
North101 marked this conversation as resolved.
Show resolved Hide resolved
2. Set `WIFI_SSID` and `WIFI_PASSWORD` to your wifi ssid and password
3. Update any other variables


## Micropython type hints in VSCode

### Install these extensions:
* https://marketplace.visualstudio.com/items?itemName=ms-python.python
* https://marketplace.visualstudio.com/items?itemName=ms-python.vscode-pylance


### Setup virtualenv and install stubs
```bash
# setup virtualenv
python3 -m venv .venv

# activate virtualenv
source ./.venv/bin/activate

North101 marked this conversation as resolved.
Show resolved Hide resolved
# install micropython stubs
pip3 install -U micropython-esp32-stubs

# install mpremote (optional, lets you connect via command line)
pip3 install -U mpremote
alistairjcbrown marked this conversation as resolved.
Show resolved Hide resolved
```

### Configure vscode
`.vscode/settings.json`
```json
{
"python.languageServer": "Pylance",
"python.analysis.typeCheckingMode": "basic",
"python.analysis.diagnosticSeverityOverrides": {
"reportMissingModuleSource": "none"
},
"python.analysis.typeshedPaths": [
// Replace <python_version> with whatever the folder name is in .venv/lib/
".venv/lib/<python_version>/site-packages",
],
"python.analysis.extraPaths": [
// Allow importing from lib/
"access-front-door/src/lib",
],
"pylint.args": [
// Fixes imports
"--init-hook 'import sys; sys.path.append(\".\")'",
],
}
```


### Copy code to device via command line (requires mpremote)
```bash
# make sure you are in the access-front-door directory
cd access-front-door

# list connected devices
./run.sh

# copy code and run main.py on device
./run.sh [device_id]
```
68 changes: 0 additions & 68 deletions access-front-door/main.py

This file was deleted.

10 changes: 10 additions & 0 deletions access-front-door/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# bin/bash
if [[ -z "$1" ]]; then
echo "usage: $0 [device_id]"
mpremote devs
exit 1;
fi

mpremote connect id:$1 reset
mpremote connect id:$1 cp -r src/* :
mpremote connect id:$1 run ./src/main.py
alistairjcbrown marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 5 additions & 0 deletions access-front-door/src/env.example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
WIFI_SSID = ""
WIFI_PASSWORD = ""
SHARED_PASSWORD = "access-front-door-psk"
HOSTNAME = "access-front-door"
DEFAULT_UNLOCK_DURATION = 10
77 changes: 77 additions & 0 deletions access-front-door/src/lib/phew/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
__version__ = "0.0.2"

# highly recommended to set a lowish garbage collection threshold
# to minimise memory fragmentation as we sometimes want to
# allocate relatively large blocks of ram.
import gc, os, machine
gc.threshold(50000)

# phew! the Pico (or Python) HTTP Endpoint Wrangler
from . import logging

# determine if remotely mounted or not, changes some behaviours like
# logging truncation
remote_mount = False
try:
os.statvfs(".") # causes exception if remotely mounted (mpremote/pyboard.py)
except:
remote_mount = True

def get_ip_address():
import network
try:
return network.WLAN(network.STA_IF).ifconfig()[0]
except:
return None

def is_connected_to_wifi():
import network, time
wlan = network.WLAN(network.STA_IF)
return wlan.isconnected()

# helper method to quickly get connected to wifi
def connect_to_wifi(ssid, password, timeout_seconds=30):
import network, time

statuses = {
network.STAT_IDLE: "idle",
network.STAT_CONNECTING: "connecting",
network.STAT_WRONG_PASSWORD: "wrong password",
network.STAT_NO_AP_FOUND: "access point not found",
network.STAT_CONNECT_FAIL: "connection failed",
network.STAT_GOT_IP: "got ip address"
}

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)
start = time.ticks_ms()
status = wlan.status()

logging.debug(f" - {statuses[status]}")
while not wlan.isconnected() and (time.ticks_ms() - start) < (timeout_seconds * 1000):
new_status = wlan.status()
if status != new_status:
logging.debug(f" - {statuses[status]}")
status = new_status
time.sleep(0.25)

if wlan.status() == network.STAT_GOT_IP:
return wlan.ifconfig()[0]
return None


# helper method to put the pico into access point mode
def access_point(ssid, password = None):
import network

# start up network in access point mode
wlan = network.WLAN(network.AP_IF)
wlan.config(essid=ssid)
if password:
wlan.config(password=password)
else:
wlan.config(security=0) # disable password
wlan.active(True)

return wlan
alistairjcbrown marked this conversation as resolved.
Show resolved Hide resolved
32 changes: 32 additions & 0 deletions access-front-door/src/lib/phew/dns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import uasyncio, usocket
from . import logging

async def _handler(socket, ip_address):
while True:
try:
yield uasyncio.core._io_queue.queue_read(socket)
request, client = socket.recvfrom(256)
response = request[:2] # request id
response += b"\x81\x80" # response flags
response += request[4:6] + request[4:6] # qd/an count
response += b"\x00\x00\x00\x00" # ns/ar count
response += request[12:] # origional request body
response += b"\xC0\x0C" # pointer to domain name at byte 12
response += b"\x00\x01\x00\x01" # type and class (A record / IN class)
response += b"\x00\x00\x00\x3C" # time to live 60 seconds
response += b"\x00\x04" # response length (4 bytes = 1 ipv4 address)
response += bytes(map(int, ip_address.split("."))) # ip address parts
socket.sendto(response, client)
except Exception as e:
logging.error(e)

def run_catchall(ip_address, port=53):
logging.info("> starting catch all dns server on port {}".format(port))

_socket = usocket.socket(usocket.AF_INET, usocket.SOCK_DGRAM)
_socket.setblocking(False)
_socket.setsockopt(usocket.SOL_SOCKET, usocket.SO_REUSEADDR, 1)
_socket.bind(usocket.getaddrinfo(ip_address, port, 0, usocket.SOCK_DGRAM)[0][-1])

loop = uasyncio.get_event_loop()
loop.create_task(_handler(_socket, ip_address))
111 changes: 111 additions & 0 deletions access-front-door/src/lib/phew/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import machine, os, gc

log_file = "log.txt"

LOG_INFO = 0b00001
LOG_WARNING = 0b00010
LOG_ERROR = 0b00100
LOG_DEBUG = 0b01000
LOG_EXCEPTION = 0b10000
LOG_ALL = LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_DEBUG | LOG_EXCEPTION

_logging_types = LOG_ALL

# the log file will be truncated if it exceeds _log_truncate_at bytes in
# size. the defaults values are designed to limit the log to at most
# three blocks on the Pico
_log_truncate_at = 11 * 1024
_log_truncate_to = 8 * 1024

def datetime_string():
dt = machine.RTC().datetime()
return "{0:04d}-{1:02d}-{2:02d} {4:02d}:{5:02d}:{6:02d}".format(*dt)

def file_size(file):
try:
return os.stat(file)[6]
except OSError:
return None

def set_truncate_thresholds(truncate_at, truncate_to):
global _log_truncate_at
global _log_truncate_to
_log_truncate_at = truncate_at
_log_truncate_to = truncate_to

def enable_logging_types(types):
global _logging_types
_logging_types = _logging_types | types

def disable_logging_types(types):
global _logging_types
_logging_types = _logging_types & ~types

# truncates the log file down to a target size while maintaining
# clean line breaks
def truncate(file, target_size):
# get the current size of the log file
size = file_size(file)

# calculate how many bytes we're aiming to discard
discard = size - target_size
if discard <= 0:
return

with open(file, "rb") as infile:
with open(file + ".tmp", "wb") as outfile:
# skip a bunch of the input file until we've discarded
# at least enough
while discard > 0:
chunk = infile.read(1024)
discard -= len(chunk)

# try to find a line break nearby to split first chunk on
break_position = max(
chunk.find (b"\n", -discard), # search forward
chunk.rfind(b"\n", -discard) # search backwards
)
if break_position != -1: # if we found a line break..
outfile.write(chunk[break_position + 1:])

# now copy the rest of the file
while True:
chunk = infile.read(1024)
if not chunk:
break
outfile.write(chunk)

# delete the old file and replace with the new
os.remove(file)
os.rename(file + ".tmp", file)


def log(level, text):
datetime = datetime_string()
log_entry = "{0} [{1:8} /{2:>4}kB] {3}".format(datetime, level, round(gc.mem_free() / 1024), text)
print(log_entry)
with open(log_file, "a") as logfile:
logfile.write(log_entry + '\n')

if _log_truncate_at and file_size(log_file) > _log_truncate_at:
truncate(log_file, _log_truncate_to)

def info(*items):
if _logging_types & LOG_INFO:
log("info", " ".join(map(str, items)))

def warn(*items):
if _logging_types & LOG_WARNING:
log("warning", " ".join(map(str, items)))

def error(*items):
if _logging_types & LOG_ERROR:
log("error", " ".join(map(str, items)))

def debug(*items):
if _logging_types & LOG_DEBUG:
log("debug", " ".join(map(str, items)))

def exception(*items):
if _logging_types & LOG_EXCEPTION:
log("exception", " ".join(map(str, items)))
Loading