Skip to content

Commit

Permalink
Add support for Plantower PM sensors
Browse files Browse the repository at this point in the history
  • Loading branch information
hg committed Nov 12, 2020
1 parent 329a095 commit c344800
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 15 deletions.
167 changes: 153 additions & 14 deletions firmware/main/app_main.cc
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "driver/gpio.h"
#include "driver/periph_ctrl.h"
#include "driver/uart.h"
#include "esp_event.h"
#include "esp_log.h"
#include "esp_sntp.h"
Expand Down Expand Up @@ -43,9 +44,24 @@ typedef struct {
union {
float temp;
struct {
float pm1;
float pm2;
float pm10;
struct {
uint16_t pm1_ug;
uint16_t pm2_ug;
uint16_t pm10_ug;
} std;
struct {
uint16_t pm1_ug;
uint16_t pm2_ug;
uint16_t pm10_ug;
} atm;
struct {
uint16_t pm03_cnt;
uint16_t pm05_cnt;
uint16_t pm1_cnt;
uint16_t pm2_cnt;
uint16_t pm5_cnt;
uint16_t pm10_cnt;
} cnt;
} pm;
};
} measurement;
Expand Down Expand Up @@ -83,7 +99,7 @@ static struct {
typedef struct {
esp_mqtt_client_handle_t handle;
psk_hint_key_t *psk_hint;
char msg[256];
char msg[512];
EventGroupHandle_t event;
char *cmd_topic;
const char *resp_topic;
Expand Down Expand Up @@ -594,9 +610,114 @@ static void start_temp_tasks(const QueueHandle_t ms_queue) {
}
}

struct pms5003_response {
uint8_t magic0;
uint8_t magic1;
uint16_t frame_len;
uint16_t pm1_ug;
uint16_t pm2_ug;
uint16_t pm10_ug;
uint16_t pm1_ug_atm;
uint16_t pm2_ug_atm;
uint16_t pm10_ug_atm;
uint16_t pm03_cnt;
uint16_t pm05_cnt;
uint16_t pm1_cnt;
uint16_t pm2_cnt;
uint16_t pm5_cnt;
uint16_t pm10_cnt;
uint16_t reserved;
uint16_t check;
} __attribute__((packed));

static constexpr size_t pms_frame_len =
sizeof(pms5003_response) - sizeof(pms5003_response::magic0) -
sizeof(pms5003_response::magic1) - sizeof(pms5003_response::frame_len);

static constexpr int pmUartPort = 1;

[[noreturn]] static void task_collect_pm(void *const arg) {
auto *const queue = reinterpret_cast<QueueHandle_t>(arg);

pms5003_response resp;
measurement ms{.type = MS_PARTICULATES, .sensor = CONFIG_DEV_NAME};

while (true) {
const int received = uart_read_bytes(
pmUartPort, &resp, sizeof(pms5003_response), portMAX_DELAY);

if (received != sizeof(pms5003_response)) {
continue;
}

if (resp.magic0 != 0x42 || resp.magic1 != 0x4d) {
ESP_LOGW(kTag, "invalid magic number 0x%x%x", resp.magic0, resp.magic1);
continue;
}

// flip bytes from big-endian to host architecture's little endian
for (uint16_t *p = &resp.frame_len; p <= &resp.check; ++p) {
*p = ntohs(*p);
}

if (resp.frame_len != pms_frame_len) {
ESP_LOGW(kTag, "invalid frame length %d", resp.frame_len);
continue;
}

uint16_t checksum = 0;
for (uint8_t *p = &resp.magic0;
p < reinterpret_cast<uint8_t *>(&resp.check); ++p) {
checksum += *p;
}
if (checksum != resp.check) {
ESP_LOGW(kTag, "invalid check 0x%x, expected 0x%x", resp.check, checksum);
continue;
}

ms.time = get_timestamp();

ms.pm.atm.pm1_ug = resp.pm1_ug_atm;
ms.pm.atm.pm2_ug = resp.pm2_ug_atm;
ms.pm.atm.pm10_ug = resp.pm10_ug_atm;

ms.pm.std.pm1_ug = resp.pm1_ug;
ms.pm.std.pm2_ug = resp.pm2_ug;
ms.pm.std.pm10_ug = resp.pm10_ug;

ms.pm.cnt.pm03_cnt = resp.pm03_cnt;
ms.pm.cnt.pm05_cnt = resp.pm05_cnt;
ms.pm.cnt.pm1_cnt = resp.pm1_cnt;
ms.pm.cnt.pm2_cnt = resp.pm2_cnt;
ms.pm.cnt.pm5_cnt = resp.pm5_cnt;
ms.pm.cnt.pm10_cnt = resp.pm10_cnt;

queue_send_retrying(queue, &ms, sizeof(measurement));

ESP_LOGI(kTag, "received PM: PM1 %dµg, PM2.5 %dµg, PM10 %dµg",
resp.pm1_ug_atm, resp.pm2_ug_atm, resp.pm10_ug_atm);
}
}

static void start_pm_task(const QueueHandle_t ms_queue) {
// TODO: implement pollution collector, currently no-op
// xTaskCreate(task_collect_pm, "ms_pm", KiB(2), ms_queue, 2, NULL);
const uart_config_t conf{
.baud_rate = 9600,
.data_bits = UART_DATA_8_BITS,
.parity = UART_PARITY_DISABLE,
.stop_bits = UART_STOP_BITS_1,
.flow_ctrl = UART_HW_FLOWCTRL_DISABLE,
.source_clk = UART_SCLK_APB,
};

const int rx_pin = 25;
const int tx_pin = 27;

ESP_ERROR_CHECK(uart_driver_install(pmUartPort, KiB(2), 0, 0, nullptr, 0));
ESP_ERROR_CHECK(uart_param_config(pmUartPort, &conf));
ESP_ERROR_CHECK(uart_set_pin(pmUartPort, tx_pin, rx_pin, UART_PIN_NO_CHANGE,
UART_PIN_NO_CHANGE));

xTaskCreate(task_collect_pm, "ms_pm", KiB(4), ms_queue, 4, nullptr);
}

static bool is_valid_timestamp(const time_t ts) {
Expand All @@ -616,19 +737,37 @@ static void format_temp_msg(char *const msg, const size_t len,
app_settings.dev_name, ms->time, ms->sensor, ms->temp);
}

static void format_part_msg(char *const msg, const size_t len,
const measurement *const ms) {
static void format_pm_msg(char *const msg, const size_t len,
const measurement *const ms) {
snprintf(msg, len,
"{"
"\"dev\":\"%s\","
"\"time\":%ld,"
"\"sens\":\"%s\","
"\"pm1\":%f,"
"\"pm2\":%f,"
"\"pm10\":%f,"
"\"std\":{"
"\"pm1\":%u,"
"\"pm2.5\":%u,"
"\"pm10\":%u"
"},"
"\"atm\":{"
"\"pm1\":%u,"
"\"pm2.5\":%u,"
"\"pm10\":%u"
"},"
"\"cnt\":{"
"\"pm0.3\":%u,"
"\"pm0.5\":%u,"
"\"pm1\":%u,"
"\"pm2.5\":%u,"
"\"pm5\":%u,"
"\"pm10\":%u"
"}"
"}",
app_settings.dev_name, ms->time, ms->sensor, ms->pm.pm1, ms->pm.pm2,
ms->pm.pm10);
app_settings.dev_name, ms->time, ms->sensor, ms->pm.std.pm1_ug,
ms->pm.std.pm2_ug, ms->pm.std.pm10_ug, ms->pm.atm.pm1_ug,
ms->pm.atm.pm2_ug, ms->pm.atm.pm10_ug, ms->pm.cnt.pm03_cnt,
ms->pm.cnt.pm05_cnt, ms->pm.cnt.pm1_cnt, ms->pm.cnt.pm2_cnt,
ms->pm.cnt.pm5_cnt, ms->pm.cnt.pm10_cnt);
}

static bool mqtt_send_measurement(mqtt_client *const mq,
Expand All @@ -642,7 +781,7 @@ static bool mqtt_send_measurement(mqtt_client *const mq,
break;

case MS_PARTICULATES:
format_part_msg(mq->msg, sizeof(mq->msg), ms);
format_pm_msg(mq->msg, sizeof(mq->msg), ms);
type = "meas/part";
break;

Expand Down
2 changes: 2 additions & 0 deletions gateway/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
airmon

69 changes: 68 additions & 1 deletion gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,51 @@ func (t *temperature) toPoint() *influxdb2Write.Point {
time.Unix(t.Time, 0))
}

type pmConcentration struct {
Pm1 uint16 `json:"pm1"`
Pm2_5 uint16 `json:"pm2.5"`
Pm10 uint16 `json:"pm10"`
}

type particulates struct {
Device string `json:"dev"`
Time int64 `json:"time"`
Sensor string `json:"sens"`
Standard pmConcentration `json:"std"`
Atmospheric pmConcentration `json:"atm"`
Count struct {
Pm03 uint16 `json:"pm0.3"`
Pm05 uint16 `json:"pm0.5"`
Pm1 uint16 `json:"pm1"`
Pm2_5 uint16 `json:"pm2.5"`
Pm5 uint16 `json:"pm5"`
Pm10 uint16 `json:"pm10"`
} `json:"cnt"`
}

func (t *particulates) toPoint() *influxdb2Write.Point {
return influxdb2.NewPoint("particulates",
map[string]string{
"device": t.Device,
"sensor": t.Sensor,
},
map[string]interface{}{
"ug_std_1": t.Standard.Pm1,
"ug_std_2_5": t.Standard.Pm2_5,
"ug_std_10": t.Standard.Pm10,
"ug_atm_1": t.Atmospheric.Pm1,
"ug_atm_2_5": t.Atmospheric.Pm2_5,
"ug_atm_10": t.Atmospheric.Pm10,
"cnt_0_3": t.Count.Pm03,
"cnt_0_5": t.Count.Pm05,
"cnt_1": t.Count.Pm1,
"cnt_2_5": t.Count.Pm2_5,
"cnt_5": t.Count.Pm5,
"cnt_10": t.Count.Pm10,
},
time.Unix(t.Time, 0))
}

func isStdinPiped() bool {
stat, err := os.Stdin.Stat()
return err == nil && (stat.Mode()&os.ModeCharDevice) == 0
Expand All @@ -47,13 +92,28 @@ func saveTemperature(write influxdb2Api.WriteAPIBlocking, data []byte) error {
}
point := temp.toPoint()
if err := write.WritePoint(context.Background(), point); err != nil {
log.Print("could not write point: ", err)
log.Print("could not temp write point: ", err)
return err
}
log.Print("temp point written: ", temp)
return nil
}

func saveParticulates(write influxdb2Api.WriteAPIBlocking, data []byte) error {
var part particulates
if err := json.Unmarshal(data, &part); err != nil {
log.Print("could not parse particulates json: ", err)
return err
}
point := part.toPoint()
if err := write.WritePoint(context.Background(), point); err != nil {
log.Print("could not write part point: ", err)
return err
}
log.Print("part point written: ", part)
return nil
}

func main() {
mqttBroker := flag.String("mqtt.broker", "tcp://localhost:1883", "the broker URI")
mqttUser := flag.String("mqtt.user", "", "MQTT username")
Expand Down Expand Up @@ -131,6 +191,13 @@ func main() {
log.Fatal("could not subscribe to mqtt topic: ", mqttToken.Error())
}

mqttClient.Subscribe("meas/part", 0, func(client mqtt.Client, msg mqtt.Message) {
go saveParticulates(write, msg.Payload())
})
if mqttToken.Wait() && mqttToken.Error() != nil {
log.Fatal("could not subscribe to mqtt topic: ", mqttToken.Error())
}

log.Print("started, press Ctrl+C to terminate")

select {}
Expand Down

0 comments on commit c344800

Please sign in to comment.