-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.go
149 lines (116 loc) · 3.96 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package main
import (
"log"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go"
)
const (
// Specifies name of bucket where data is stored
DBERROR = "DB Error: "
)
func StoreMeasurement(sensorData SensorData) {
measurement := make(map[string]interface{})
//Store only valid values
if sensorData.ValidData.Temp == true {
measurement["Temperature"] = sensorData.Temp
}
//Sometimes Humidity is reported incorrectly
//Let's not store it if it is way too high number
if sensorData.ValidData.Humidity == true {
measurement["Humidity"] = sensorData.Humidity
}
//Store only valid values
if sensorData.ValidData.Pressure == true {
measurement["Pressure"] = int(sensorData.Pressure)
}
if sensorData.ValidData.Temp == true && sensorData.ValidData.Humidity == true {
measurement["AbsoluteHumidity"] = sensorData.AbsHumidity
}
if sensorData.ValidData.Temp == true && sensorData.ValidData.Humidity == true {
measurement["Dewpoint"] = sensorData.DewPoint
}
insert(measurement, sensorData.MAC)
}
func StoreHWmeasurement(sensorData SensorData) {
HWmeasurement := make(map[string]interface{})
if sensorData.ValidData.Battery == true {
HWmeasurement["Battery"] = int(sensorData.Battery)
}
if sensorData.ValidData.TXPower == true {
HWmeasurement["TxPower"] = int(sensorData.TXPower)
}
insertHW(HWmeasurement, sensorData.MAC)
}
//
//
//
// Insert points to database
// Uses: Measurement table
func insert(measurement map[string]interface{}, MAC string) {
// Create client and set batch size to 2
c := influxdb2.NewClientWithOptions(ConnectionString, DBToken, influxdb2.DefaultOptions().SetBatchSize(2))
defer c.Close()
// user blocking write client for writes to desired bucket
writeAPI := c.WriteAPI(DBOrg, DBBucket)
// Get errors channel
errorsCh := writeAPI.Errors()
// Create go proc for reading and logging errors
go func() {
for err := range errorsCh {
log.Printf("DB Error: %s\n", err.Error())
}
}()
//
tags := map[string]string{"Address": Address}
bFound := false
// Create a point and add to batch
for i := 0; i != len(aSensors); i++ {
if MAC == aSensors[i] {
tags["Location"] = aLocations[i]
bFound = true
}
}
/*Check if MAC was found from the list to be stored to DB*/
if bFound == false {
/*Not found. not storing*/
log.Printf("%s Ruuvitag is not listed in config.yml to be stored to database.\n", MAC)
return
}
tags["Device"] = MAC
fields := measurement
pt := influxdb2.NewPoint("measurements", tags, fields, time.Now())
writeAPI.WritePoint(pt)
// Force all unwritten data to be sent
writeAPI.Flush()
}
// Insert points to database
//Uses Hardware table
func insertHW(measurement map[string]interface{}, MAC string) {
// Create client and set batch size to 2
c := influxdb2.NewClientWithOptions(ConnectionString, DBToken, influxdb2.DefaultOptions().SetBatchSize(2))
defer c.Close()
// user blocking write client for writes to desired bucket
writeAPI := c.WriteAPI(DBOrg, DBBucket)
// Create a point and add to batch
tags := map[string]string{"Address": Address}
bFound := false
// Create a point and add to batch
for i := 0; i != len(aSensors); i++ {
if MAC == aSensors[i] {
tags["Location"] = aLocations[i]
bFound = true
}
}
/*Check if MAC was found from the list to be stored to DB*/
if bFound == false {
/*Not found, not storing*/
log.Printf("%s Ruuvitag is not listed in config.yml to be stored to database.\n", MAC)
return
}
tags["Device"] = MAC
fields := measurement
pt := influxdb2.NewPoint("HWmeasurements", tags, fields, time.Now())
writeAPI.WritePoint(pt)
// Force all unwritten data to be sent
writeAPI.Flush()
}