forked from rkoshak/sensorReporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dash.py
80 lines (63 loc) · 2.73 KB
/
dash.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""
Copyright 2015 Richard Koshak
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Script: dash.py
Author: Rich Koshak
Date: October 22, 2015
Purpose: Scans for ARP packets from Amazon Dash buttons
"""
# vim: tabstop=8 expandtab shiftwidth=4 softtabstop=4
import sys
import traceback
from scapy.all import *
import ConfigParser
class dash:
"""Scans for ARP packets from Dash buttons"""
def __init__(self, publisher, logger, params):
"""Sets the sensor pin to pull up and publishes its current state"""
self.logger = logger
self.logger.info("----------Configuring dash: ")
self.devices = {}
i = 1
addr = 'Address%s' % (i)
destination = 'Destination%s' % (i)
done = False
while not done:
try:
mac = params(addr)
self.devices[mac] = params(destination)
self.logger.info("Sniffing for %s to publish to %s" % (mac, params(destination)))
i += 1
addr = 'Address%s' % (i)
destination = 'Destination%s' % (i)
except ConfigParser.NoOptionError:
done = True
self.publish = publisher.publish
self.poll = int(params("Poll"))
def checkState(self):
"""Detects when the Dash button issues an ARP packet and publishes the fact to the topic"""
try:
def arp_display(pkt):
if ARP in pkt and pkt[ARP].op in (1,2): #who-has or is-at
if self.devices.get(pkt[ARP].hwsrc, None) != None:
self.logger.info("Dash button pressed for: " + self.devices[pkt[ARP].hwsrc])
self.publish("Pressed", self.devices[pkt[ARP].hwsrc])
# else:
# self.logger.info("Received and ARP packet from an unknown mac: " + pkt[ARP].hwsrc)
self.logger.info("Dash: kicking off ARP sniffing")
print sniff(prn=arp_display, filter="arp", store=0, count=0)
# Never returns
self.logger.info("Dash: you should never see this")
except:
self.logger.error("Unexpected error in dash: %s", sys.exc_info()[0])
traceback.print_exc(file=sys.stdout)
def publishState(self):
"""Does nothing"""