-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
259 lines (188 loc) · 7.55 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import network
import socket
import time
from uselect import select
from machine import Pin
# Turn the Raspberry Pi LED on to see that it's running
led = Pin('LED', Pin.OUT)
led.on()
# WiFi Settings. Change these before uploading to the Pi Pico
WIFI_SSID = 'YOUR_WIFI_SSID'
WIFI_PASSWORD = 'YOUR_WIFI_PASSWORD'
# Set up pins for the first garage door
OPEN_PIN = 6
CLOSED_PIN = 12
RELAY_PIN = 16
# Set up pins for the second garage door
CLOSED_PIN_EXTERIOR = 0
RELAY_PIN_EXTERIOR = 20
# Pulse length in ms
PULSE_LENGTH = 500
# Homekit target and current states for the first garage door
TARGET_DOOR_STATE_OPEN = 0
TARGET_DOOR_STATE_CLOSED = 1
CURRENT_DOOR_STATE_OPEN = 0
CURRENT_DOOR_STATE_CLOSED = 1
CURRENT_DOOR_STATE_OPENING = 2
CURRENT_DOOR_STATE_CLOSING = 3
CURRENT_DOOR_STATE_STOPPED = 4
# Homekit target and current states for the second garage door
TARGET_DOOR_STATE_OPEN_EXTERIOR = 0
TARGET_DOOR_STATE_CLOSED_EXTERIOR = 1
CURRENT_DOOR_STATE_OPEN_EXTERIOR = 0
CURRENT_DOOR_STATE_CLOSED_EXTERIOR = 1
CURRENT_DOOR_STATE_OPENING_EXTERIOR = 2
CURRENT_DOOR_STATE_CLOSING_EXTERIOR = 3
CURRENT_DOOR_STATE_STOPPED_EXTERIOR = 4
IGNORE_SENSORS_AFTER_ACTION_FOR_SECONDS = 5
#Setup pins for relay and sensors for interior door
relay = Pin(RELAY_PIN, Pin.OUT)
openSensor=Pin(OPEN_PIN, Pin.IN, Pin.PULL_UP)
closedSensor=Pin(CLOSED_PIN, Pin.IN, Pin.PULL_UP)
#Setup pins for relay and sensors for exterior door
relayExterior = Pin(RELAY_PIN_EXTERIOR, Pin.OUT)
closedSensorExterior = Pin(CLOSED_PIN_EXTERIOR, Pin.IN, Pin.PULL_UP)
# Set initial target and current states for the first garage door
targetState = TARGET_DOOR_STATE_CLOSED
currentState = CURRENT_DOOR_STATE_STOPPED
# Set initial target and current states for the second garage door
targetStateExterior = TARGET_DOOR_STATE_CLOSED_EXTERIOR
currentStateExterior = CURRENT_DOOR_STATE_STOPPED_EXTERIOR
lastDoorAction = time.time()
# Setup WiFi connection
wifi = network.WLAN(network.STA_IF)
def connectWifi():
global wifi
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
wifi.connect(WIFI_SSID, WIFI_PASSWORD)
max_wait = 10
while wifi.status() != 3:
print('waiting for connection. Status: ' + str(wifi.status()))
time.sleep(1)
print('connected')
status = wifi.ifconfig()
ipAddress = status[0]
print('ip = ' + ipAddress)
connectWifi()
# Set up socket and start listening on port 80
addr = socket.getaddrinfo(wifi.ifconfig()[0], 80)[0][-1]
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(addr)
s.listen(1)
print('listening on', addr)
def startDoor(newTargetState):
global targetState
global currentState
global lastDoorAction
targetState = newTargetState
lastDoorAction = time.time()
relay.value(0)
time.sleep_ms(PULSE_LENGTH)
relay.value(1)
setCurrentState()
print('startDoor', targetState, currentState)
return getDoorStatus()
def startDoorExterior(newTargetStateExterior):
global targetStateExterior
global currentStateExterior
global lastDoorAction
targetStateExterior = newTargetStateExterior
lastDoorAction = time.time()
relayExterior.value(0)
time.sleep_ms(PULSE_LENGTH)
relayExterior.value(1)
setCurrentStateExterior()
print('startDoorExterior', targetStateExterior, currentStateExterior)
return getDoorStatusExterior()
def setCurrentState():
global targetState
global currentState
global openSensor
global closedSensor
# Ignore sensors after having started the door for a few seconds to give the door enough time to move away from the sensor
actionThresholdReached = time.time() > lastDoorAction + IGNORE_SENSORS_AFTER_ACTION_FOR_SECONDS
# If threshold is reached and door is fully open
if actionThresholdReached and openSensor.value() == 0:
currentState = CURRENT_DOOR_STATE_OPEN
targetState = TARGET_DOOR_STATE_OPEN
# If threshold is reached and door is fully closed
elif actionThresholdReached and closedSensor.value() == 0:
currentState = CURRENT_DOOR_STATE_CLOSED
targetState = TARGET_DOOR_STATE_CLOSED
# Threshold has not been reached or door is neither fully open or closed
else:
# Set current state based on intention (target state)
if targetState == TARGET_DOOR_STATE_OPEN:
currentState = CURRENT_DOOR_STATE_OPENING
elif targetState == TARGET_DOOR_STATE_CLOSED:
currentState = CURRENT_DOOR_STATE_CLOSING
def setCurrentStateExterior():
global targetStateExterior
global currentStateExterior
global openSensorExterior
global closedSensorExterior
# Ignore sensors after having started the door for a few seconds to give the door enough time to move away from the sensor
actionThresholdReached = time.time() > lastDoorAction + IGNORE_SENSORS_AFTER_ACTION_FOR_SECONDS
# If threshold is reached and door is fully open
if actionThresholdReached and closedSensorExterior.value() == 0:
currentStateExterior = CURRENT_DOOR_STATE_OPEN_EXTERIOR
targetStateExterior = TARGET_DOOR_STATE_OPEN_EXTERIOR
# If threshold is reached and door is fully closed
elif actionThresholdReached and closedSensorExterior.value() == 1:
currentStateExterior = CURRENT_DOOR_STATE_CLOSED_EXTERIOR
targetStateExterior = TARGET_DOOR_STATE_CLOSED_EXTERIOR
def getDoorStatus():
global targetState
global currentState
# Ensure current state is up to date
setCurrentState()
return '{"success": true, "currentState": ' + str(currentState) + ', "targetState": ' + str(targetState) + '}'
def getDoorStatusExterior():
global targetStateExterior
global currentStateExterior
# Ensure current state is up to date
setCurrentStateExterior()
return '{"success": true, "currentState": ' + str(currentStateExterior) + ', "targetState": ' + str(targetStateExterior) + '}'
def returnError(errcode):
return '{"success": false, "error": "' + errcode + '"}'
# Handle an incoming request
def handleRequest(conn, address):
print('Got a connection from %s' % str(addr))
request = conn.recv(1024)
request = str(request)
print(request)
if '/?open' in request:
response = startDoor(TARGET_DOOR_STATE_OPEN)
elif '/?close' in request:
response = startDoor(TARGET_DOOR_STATE_CLOSED)
elif '/?getstatus' in request:
response = getDoorStatus()
elif '/?oExterior' in request:
response = startDoorExterior(TARGET_DOOR_STATE_OPEN_EXTERIOR)
elif '/?cExterior' in request:
response = startDoorExterior(TARGET_DOOR_STATE_CLOSED_EXTERIOR)
elif '/?gsExterior' in request:
response = getDoorStatusExterior()
else:
response = returnError('UNKNOWN_COMMAND')
conn.send('HTTP/1.0 200 OK\r\nContent-type: application/json\r\n\r\n')
conn.send(response)
conn.close()
# Main Loop
while True:
# Check if WiFi is connected, if not, reconnect
if wifi.isconnected() == False:
print('Connecting wifi...')
connectWifi()
# Handle incoming HTTP requests in a non-blocking way
r, w, err = select((s,), (), (), 1)
# Is there an incoming request? If so, handle the request
if r:
for readable in r:
conn, addr = s.accept()
try:
handleRequest(conn, addr)
except OSError as e:
pass