diff --git a/firmware/main/app_main.cc b/firmware/main/app_main.cc index d47760c..253a4ce 100644 --- a/firmware/main/app_main.cc +++ b/firmware/main/app_main.cc @@ -1,5 +1,6 @@ #include "driver/gpio.h" #include "driver/periph_ctrl.h" +#include "driver/uart.h" #include "esp_event.h" #include "esp_log.h" #include "esp_sntp.h" @@ -43,9 +44,24 @@ typedef struct { union { float temp; struct { - float pm1; - float pm2; - float pm10; + struct { + uint16_t pm1_ug; + uint16_t pm2_ug; + uint16_t pm10_ug; + } std; + struct { + uint16_t pm1_ug; + uint16_t pm2_ug; + uint16_t pm10_ug; + } atm; + struct { + uint16_t pm03_cnt; + uint16_t pm05_cnt; + uint16_t pm1_cnt; + uint16_t pm2_cnt; + uint16_t pm5_cnt; + uint16_t pm10_cnt; + } cnt; } pm; }; } measurement; @@ -83,7 +99,7 @@ static struct { typedef struct { esp_mqtt_client_handle_t handle; psk_hint_key_t *psk_hint; - char msg[256]; + char msg[512]; EventGroupHandle_t event; char *cmd_topic; const char *resp_topic; @@ -594,9 +610,114 @@ static void start_temp_tasks(const QueueHandle_t ms_queue) { } } +struct pms5003_response { + uint8_t magic0; + uint8_t magic1; + uint16_t frame_len; + uint16_t pm1_ug; + uint16_t pm2_ug; + uint16_t pm10_ug; + uint16_t pm1_ug_atm; + uint16_t pm2_ug_atm; + uint16_t pm10_ug_atm; + uint16_t pm03_cnt; + uint16_t pm05_cnt; + uint16_t pm1_cnt; + uint16_t pm2_cnt; + uint16_t pm5_cnt; + uint16_t pm10_cnt; + uint16_t reserved; + uint16_t check; +} __attribute__((packed)); + +static constexpr size_t pms_frame_len = + sizeof(pms5003_response) - sizeof(pms5003_response::magic0) - + sizeof(pms5003_response::magic1) - sizeof(pms5003_response::frame_len); + +static constexpr int pmUartPort = 1; + +[[noreturn]] static void task_collect_pm(void *const arg) { + auto *const queue = reinterpret_cast(arg); + + pms5003_response resp; + measurement ms{.type = MS_PARTICULATES, .sensor = CONFIG_DEV_NAME}; + + while (true) { + const int received = uart_read_bytes( + pmUartPort, &resp, sizeof(pms5003_response), portMAX_DELAY); + + if (received != sizeof(pms5003_response)) { + continue; + } + + if (resp.magic0 != 0x42 || resp.magic1 != 0x4d) { + ESP_LOGW(kTag, "invalid magic number 0x%x%x", resp.magic0, resp.magic1); + continue; + } + + // flip bytes from big-endian to host architecture's little endian + for (uint16_t *p = &resp.frame_len; p <= &resp.check; ++p) { + *p = ntohs(*p); + } + + if (resp.frame_len != pms_frame_len) { + ESP_LOGW(kTag, "invalid frame length %d", resp.frame_len); + continue; + } + + uint16_t checksum = 0; + for (uint8_t *p = &resp.magic0; + p < reinterpret_cast(&resp.check); ++p) { + checksum += *p; + } + if (checksum != resp.check) { + ESP_LOGW(kTag, "invalid check 0x%x, expected 0x%x", resp.check, checksum); + continue; + } + + ms.time = get_timestamp(); + + ms.pm.atm.pm1_ug = resp.pm1_ug_atm; + ms.pm.atm.pm2_ug = resp.pm2_ug_atm; + ms.pm.atm.pm10_ug = resp.pm10_ug_atm; + + ms.pm.std.pm1_ug = resp.pm1_ug; + ms.pm.std.pm2_ug = resp.pm2_ug; + ms.pm.std.pm10_ug = resp.pm10_ug; + + ms.pm.cnt.pm03_cnt = resp.pm03_cnt; + ms.pm.cnt.pm05_cnt = resp.pm05_cnt; + ms.pm.cnt.pm1_cnt = resp.pm1_cnt; + ms.pm.cnt.pm2_cnt = resp.pm2_cnt; + ms.pm.cnt.pm5_cnt = resp.pm5_cnt; + ms.pm.cnt.pm10_cnt = resp.pm10_cnt; + + queue_send_retrying(queue, &ms, sizeof(measurement)); + + ESP_LOGI(kTag, "received PM: PM1 %dµg, PM2.5 %dµg, PM10 %dµg", + resp.pm1_ug_atm, resp.pm2_ug_atm, resp.pm10_ug_atm); + } +} + static void start_pm_task(const QueueHandle_t ms_queue) { - // TODO: implement pollution collector, currently no-op - // xTaskCreate(task_collect_pm, "ms_pm", KiB(2), ms_queue, 2, NULL); + const uart_config_t conf{ + .baud_rate = 9600, + .data_bits = UART_DATA_8_BITS, + .parity = UART_PARITY_DISABLE, + .stop_bits = UART_STOP_BITS_1, + .flow_ctrl = UART_HW_FLOWCTRL_DISABLE, + .source_clk = UART_SCLK_APB, + }; + + const int rx_pin = 25; + const int tx_pin = 27; + + ESP_ERROR_CHECK(uart_driver_install(pmUartPort, KiB(2), 0, 0, nullptr, 0)); + ESP_ERROR_CHECK(uart_param_config(pmUartPort, &conf)); + ESP_ERROR_CHECK(uart_set_pin(pmUartPort, tx_pin, rx_pin, UART_PIN_NO_CHANGE, + UART_PIN_NO_CHANGE)); + + xTaskCreate(task_collect_pm, "ms_pm", KiB(4), ms_queue, 4, nullptr); } static bool is_valid_timestamp(const time_t ts) { @@ -616,19 +737,37 @@ static void format_temp_msg(char *const msg, const size_t len, app_settings.dev_name, ms->time, ms->sensor, ms->temp); } -static void format_part_msg(char *const msg, const size_t len, - const measurement *const ms) { +static void format_pm_msg(char *const msg, const size_t len, + const measurement *const ms) { snprintf(msg, len, "{" "\"dev\":\"%s\"," "\"time\":%ld," "\"sens\":\"%s\"," - "\"pm1\":%f," - "\"pm2\":%f," - "\"pm10\":%f," + "\"std\":{" + "\"pm1\":%u," + "\"pm2.5\":%u," + "\"pm10\":%u" + "}," + "\"atm\":{" + "\"pm1\":%u," + "\"pm2.5\":%u," + "\"pm10\":%u" + "}," + "\"cnt\":{" + "\"pm0.3\":%u," + "\"pm0.5\":%u," + "\"pm1\":%u," + "\"pm2.5\":%u," + "\"pm5\":%u," + "\"pm10\":%u" + "}" "}", - app_settings.dev_name, ms->time, ms->sensor, ms->pm.pm1, ms->pm.pm2, - ms->pm.pm10); + app_settings.dev_name, ms->time, ms->sensor, ms->pm.std.pm1_ug, + ms->pm.std.pm2_ug, ms->pm.std.pm10_ug, ms->pm.atm.pm1_ug, + ms->pm.atm.pm2_ug, ms->pm.atm.pm10_ug, ms->pm.cnt.pm03_cnt, + ms->pm.cnt.pm05_cnt, ms->pm.cnt.pm1_cnt, ms->pm.cnt.pm2_cnt, + ms->pm.cnt.pm5_cnt, ms->pm.cnt.pm10_cnt); } static bool mqtt_send_measurement(mqtt_client *const mq, @@ -642,7 +781,7 @@ static bool mqtt_send_measurement(mqtt_client *const mq, break; case MS_PARTICULATES: - format_part_msg(mq->msg, sizeof(mq->msg), ms); + format_pm_msg(mq->msg, sizeof(mq->msg), ms); type = "meas/part"; break; diff --git a/gateway/.gitignore b/gateway/.gitignore new file mode 100644 index 0000000..41d1086 --- /dev/null +++ b/gateway/.gitignore @@ -0,0 +1,2 @@ +airmon + diff --git a/gateway/main.go b/gateway/main.go index 87cc5ca..59af5ee 100644 --- a/gateway/main.go +++ b/gateway/main.go @@ -34,6 +34,51 @@ func (t *temperature) toPoint() *influxdb2Write.Point { time.Unix(t.Time, 0)) } +type pmConcentration struct { + Pm1 uint16 `json:"pm1"` + Pm2_5 uint16 `json:"pm2.5"` + Pm10 uint16 `json:"pm10"` +} + +type particulates struct { + Device string `json:"dev"` + Time int64 `json:"time"` + Sensor string `json:"sens"` + Standard pmConcentration `json:"std"` + Atmospheric pmConcentration `json:"atm"` + Count struct { + Pm03 uint16 `json:"pm0.3"` + Pm05 uint16 `json:"pm0.5"` + Pm1 uint16 `json:"pm1"` + Pm2_5 uint16 `json:"pm2.5"` + Pm5 uint16 `json:"pm5"` + Pm10 uint16 `json:"pm10"` + } `json:"cnt"` +} + +func (t *particulates) toPoint() *influxdb2Write.Point { + return influxdb2.NewPoint("particulates", + map[string]string{ + "device": t.Device, + "sensor": t.Sensor, + }, + map[string]interface{}{ + "ug_std_1": t.Standard.Pm1, + "ug_std_2_5": t.Standard.Pm2_5, + "ug_std_10": t.Standard.Pm10, + "ug_atm_1": t.Atmospheric.Pm1, + "ug_atm_2_5": t.Atmospheric.Pm2_5, + "ug_atm_10": t.Atmospheric.Pm10, + "cnt_0_3": t.Count.Pm03, + "cnt_0_5": t.Count.Pm05, + "cnt_1": t.Count.Pm1, + "cnt_2_5": t.Count.Pm2_5, + "cnt_5": t.Count.Pm5, + "cnt_10": t.Count.Pm10, + }, + time.Unix(t.Time, 0)) +} + func isStdinPiped() bool { stat, err := os.Stdin.Stat() return err == nil && (stat.Mode()&os.ModeCharDevice) == 0 @@ -47,13 +92,28 @@ func saveTemperature(write influxdb2Api.WriteAPIBlocking, data []byte) error { } point := temp.toPoint() if err := write.WritePoint(context.Background(), point); err != nil { - log.Print("could not write point: ", err) + log.Print("could not temp write point: ", err) return err } log.Print("temp point written: ", temp) return nil } +func saveParticulates(write influxdb2Api.WriteAPIBlocking, data []byte) error { + var part particulates + if err := json.Unmarshal(data, &part); err != nil { + log.Print("could not parse particulates json: ", err) + return err + } + point := part.toPoint() + if err := write.WritePoint(context.Background(), point); err != nil { + log.Print("could not write part point: ", err) + return err + } + log.Print("part point written: ", part) + return nil +} + func main() { mqttBroker := flag.String("mqtt.broker", "tcp://localhost:1883", "the broker URI") mqttUser := flag.String("mqtt.user", "", "MQTT username") @@ -131,6 +191,13 @@ func main() { log.Fatal("could not subscribe to mqtt topic: ", mqttToken.Error()) } + mqttClient.Subscribe("meas/part", 0, func(client mqtt.Client, msg mqtt.Message) { + go saveParticulates(write, msg.Payload()) + }) + if mqttToken.Wait() && mqttToken.Error() != nil { + log.Fatal("could not subscribe to mqtt topic: ", mqttToken.Error()) + } + log.Print("started, press Ctrl+C to terminate") select {}