-
Notifications
You must be signed in to change notification settings - Fork 1
/
obdfeeder.py
executable file
·105 lines (87 loc) · 3.84 KB
/
obdfeeder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/env python
########################################################################
# Copyright (c) 2020 Robert Bosch GmbH
#
# This program and the accompanying materials are made
# available under the terms of the Eclipse Public License 2.0
# which is available at https://www.eclipse.org/legal/epl-2.0/
#
# SPDX-License-Identifier: EPL-2.0
########################################################################
import configparser
import sys, os
import time, threading
import obdconnector as obdC
import obd2vssmapper
from kuksa_viss_client import KuksaClientThread
scriptDir= os.path.dirname(os.path.realpath(__file__))
class OBD_Client():
def __init__(self, config):
print("Init obd client...")
if "obd" not in config:
print("obd section missing from configuration, exiting")
sys.exit(-1)
self.vssClient = KuksaClientThread(config['kuksa_val'])
self.vssClient.start()
self.vssClient.authorize()
self.cfg = {}
provider_config=config['obd']
self.cfg['timeout'] = provider_config.getint('timeout', 1)
self.cfg['baudrate'] = provider_config.getint('baudrate', 2000000)
self.cfg['device'] = provider_config.get('device','/dev/ttyAMA0')
self.cfg['mapping'] = provider_config.get('mapping', os.path.join(scriptDir, 'mapping.yml'))
print("Configuration")
print("Device : {}".format(self.cfg['device']))
print("Baudrate : {} baud".format(self.cfg['baudrate']))
print("Timeout : {} s".format(self.cfg['timeout']))
print("Mapping file : {}".format(self.cfg['mapping']))
self.mapping=obd2vssmapper.mapper(self.cfg['mapping'])
self.thread = threading.Thread(target=self.loop, args=())
self.connection = obdC.openOBD(self.cfg['device'], self.cfg['baudrate'])
self.thread.start()
def publishData(self):
print("Publish data")
for obdval,config in self.mapping.map():
if config['value'] is None:
continue
print("Publish {}: to ".format(obdval), end='')
for path in config['targets']:
self.vssClient.setValue(path, config['value'].magnitude)
print(path, end=' ')
print("")
def loop(self):
print("obd loop started")
while True:
obdC.collectData(self.mapping, self.connection)
self.publishData()
# response=connection.query(cmd)
# if not response.is_null():
# print("Speed is {}, or {} ".format(response.value,response.value.to("mph")))
# else:
# print("No data from car. Are you connected to OBD? Is your STN set to 2Mbit baudrate?")
# print("Have you started porting VRTE to me?\n")
time.sleep(self.cfg['timeout'])
def shutdown(self):
self.running=False
self.thread.join()
self.vssClient.stop()
if __name__ == "__main__":
print("kuksa.val OBD example feeder")
config_candidates=['/config/obdfeeder.ini', '/etc/obdfeeder.ini', os.path.join(scriptDir, 'config/obdfeeder.ini')]
for candidate in config_candidates:
if os.path.isfile(candidate):
configfile=candidate
break
if configfile is None:
print("No configuration file found. Exiting")
sys.exit(-1)
print("read config file" + configfile)
config = configparser.ConfigParser()
config.read(configfile)
client = OBD_Client(config)
def terminationSignalreceived(signalNumber, frame):
print("Received termination signal. Shutting down")
client.shutdown()
signal.signal(signal.SIGINT, terminationSignalreceived)
signal.signal(signal.SIGQUIT, terminationSignalreceived)
signal.signal(signal.SIGTERM, terminationSignalreceived)