Skip to content

Commit

Permalink
Influxdb: fix sensor send frequency, use async client, send data in b…
Browse files Browse the repository at this point in the history
…atches (#2061)

* broker: drop REAL_TIME flag, use separate Broker entities

* influxdb: use async client, send data in batches
  • Loading branch information
mcspr authored Dec 16, 2019
1 parent 8e8c990 commit 0ae6e3b
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 90 deletions.
8 changes: 6 additions & 2 deletions code/espurna/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>
enum class TBrokerType {
SYSTEM,
STATUS,
SENSOR,
SENSOR_READ,
SENSOR_REPORT,
DATETIME,
CONFIG
};
Expand Down Expand Up @@ -49,7 +50,10 @@ TBrokerCallbacks<TArgs...> TBroker<type, TArgs...>::callbacks;
// --- Some known types. Bind them here to avoid .ino screwing with order ---

using StatusBroker = TBroker<TBrokerType::STATUS, const String&, unsigned char, unsigned int>;
using SensorBroker = TBroker<TBrokerType::SENSOR, const String&, unsigned char, double, const char*>;

using SensorReadBroker = TBroker<TBrokerType::SENSOR_READ, const String&, unsigned char, double, const char*>;
using SensorReportBroker = TBroker<TBrokerType::SENSOR_REPORT, const String&, unsigned char, double, const char*>;

using TimeBroker = TBroker<TBrokerType::DATETIME, const String&, time_t, const String&>;
using ConfigBroker = TBroker<TBrokerType::CONFIG, const String&, const String&>;

Expand Down
4 changes: 0 additions & 4 deletions code/espurna/config/general.h
Original file line number Diff line number Diff line change
Expand Up @@ -1129,10 +1129,6 @@
#define BROKER_SUPPORT 1 // The broker is a poor-man's pubsub manager
#endif

#ifndef BROKER_REAL_TIME
#define BROKER_REAL_TIME 1 // Report real time data
#endif

// -----------------------------------------------------------------------------
// SETTINGS
// -----------------------------------------------------------------------------
Expand Down
219 changes: 177 additions & 42 deletions code/espurna/influxdb.ino
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,104 @@ Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>

#if INFLUXDB_SUPPORT

#include "ESPAsyncTCP.h"
#include <ESPAsyncTCP.h>
#include <map>
#include <memory>

#include "broker.h"
#include "libs/SyncClientWrap.h"

const char INFLUXDB_REQUEST_TEMPLATE[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n";

constexpr const unsigned long INFLUXDB_CLIENT_TIMEOUT = 5000;
constexpr const size_t INFLUXDB_DATA_BUFFER_SIZE = 256;

bool _idb_enabled = false;
SyncClientWrap * _idb_client;
String _idb_host;
uint16_t _idb_port = 0;

std::map<String, String> _idb_values;
String _idb_data;
bool _idb_flush = false;

std::unique_ptr<AsyncClient> _idb_client = nullptr;
bool _idb_connecting = false;
bool _idb_connected = false;

uint32_t _idb_client_ts = 0;

// -----------------------------------------------------------------------------

void _idbInitClient() {

_idb_client = std::make_unique<AsyncClient>();

_idb_client->onDisconnect([](void * s, AsyncClient * client) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n"));
_idb_flush = false;
_idb_data = "";
_idb_client_ts = 0;
_idb_connected = false;
_idb_connecting = false;
}, nullptr);

_idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Network timeout after %ums\n"), time);
client->close(true);
}, nullptr);

_idb_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) {
// ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1
const char idb_success[] = "HTTP/1.1 204";
const bool result = (len > sizeof(idb_success) && (0 == strncmp((char*) response, idb_success, sizeof(idb_success))));
DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - _idb_client_ts);
_idb_client_ts = millis();
client->close();
}, nullptr);

_idb_client->onPoll([](void * arg, AsyncClient * client) {
unsigned long ts = millis() - _idb_client_ts;
if (ts > INFLUXDB_CLIENT_TIMEOUT) {
DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts);
client->close(true);
return;
}

if (_idb_data.length()) {
client->write(_idb_data.c_str(), _idb_data.length());
_idb_data = "";
}
});

_idb_client->onConnect([](void * arg, AsyncClient * client) {

_idb_client_ts = millis();
_idb_connected = true;
_idb_connecting = false;

DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"),
IPAddress(client->getRemoteAddress()).toString().c_str(),
client->getRemotePort()
);

constexpr const int BUFFER_SIZE = 256;
char headers[BUFFER_SIZE];
int len = snprintf_P(headers, sizeof(headers), INFLUXDB_REQUEST_TEMPLATE,
getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
getSetting("idbUsername", INFLUXDB_USERNAME).c_str(),
getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
_idb_host.c_str(), _idb_port, _idb_data.length()
);
if ((len < 0) || (len > BUFFER_SIZE - 1)) {
client->close(true);
return;
}

client->write(headers, len);

});

}


// -----------------------------------------------------------------------------

Expand All @@ -41,6 +132,7 @@ void _idbConfigure() {
_idb_enabled = false;
setSetting("idbEnabled", 0);
}
if (_idb_enabled && !_idb_client) _idbInitClient();
}

#if BROKER_SUPPORT
Expand All @@ -59,51 +151,79 @@ void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value)

bool idbSend(const char * topic, const char * payload) {

if (!_idb_enabled) return true;
if (!wifiConnected() || (WiFi.getMode() != WIFI_STA)) return true;
if (!_idb_enabled) return false;
if (_idb_connected) return false;

String h = getSetting("idbHost", INFLUXDB_HOST);
#if MDNS_CLIENT_SUPPORT
h = mdnsResolve(h);
#endif
char * host = strdup(h.c_str());
unsigned int port = getSetting("idbPort", INFLUXDB_PORT).toInt();
DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host, port);
_idb_values[topic] = payload;
_idb_flush = true;

bool success = false;
return true;
}

_idb_client->setTimeout(2);
if (_idb_client->connect((const char *) host, (unsigned int) port)) {
void _idbSend(const String& host, const uint16_t port) {
if (_idb_connected || _idb_connecting) return;

char data[128];
snprintf(data, sizeof(data), "%s,device=%s value=%s", topic, getSetting("hostname").c_str(), String(payload).c_str());
DEBUG_MSG_P(PSTR("[INFLUXDB] Data: %s\n"), data);
DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port);

char request[256];
snprintf(request, sizeof(request), "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n%s",
getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
host, port, strlen(data), data);

if (_idb_client->printf(request) > 0) {
while (_idb_client->connected() && _idb_client->available() == 0) delay(1);
while (_idb_client->available()) _idb_client->read();
if (_idb_client->connected()) _idb_client->stop();
success = true;
} else {
DEBUG_MSG_P(PSTR("[INFLUXDB] Sent failed\n"));
}
// TODO: cache `Host: <host>:<port>` instead of storing things separately?
_idb_host = host;
_idb_port = port;

_idb_client->stop();
while (_idb_client->connected()) yield();
_idb_client_ts = millis();
_idb_connecting = _idb_client->connect(host.c_str(), port);

} else {
DEBUG_MSG_P(PSTR("[INFLUXDB] Connection failed\n"));
if (!_idb_connecting) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port);
_idb_client->close(true);
}
}

void _idbFlush() {
// Clean-up client object when not in use
if (_idb_client && !_idb_enabled && !_idb_connected && !_idb_connecting) {
_idb_client = nullptr;
}

// Wait until current connection is finished
if (!_idb_flush) return;
if (_idb_connected || _idb_connecting) return;

// Wait until connected
if (!wifiConnected()) return;

// TODO: MDNS_CLIENT_SUPPORT is deprecated
String host = getSetting("idbHost", INFLUXDB_HOST);
#if MDNS_CLIENT_SUPPORT
host = mdnsResolve(host);
#endif

const uint16_t port = getSetting("idbPort", INFLUXDB_PORT).toInt();

// TODO: should we always store specific pairs like tspk keeps relay / sensor readings?
// note that we also send heartbeat data, persistent values should be flagged
const String device = getSetting("hostname");

_idb_data = "";
for (auto& pair : _idb_values) {
if (!isNumber(pair.second.c_str())) {
String quoted;
quoted.reserve(pair.second.length() + 2);
quoted += '"';
quoted += pair.second;
quoted += '"';
pair.second = quoted;
}

free(host);
return success;
char buffer[128] = {0};
snprintf_P(buffer, sizeof(buffer),
PSTR("%s,device=%s value=%s\n"),
pair.first.c_str(), device.c_str(), pair.second.c_str()
);
_idb_data += buffer;
}
_idb_values.clear();

_idbSend(host, port);
}

bool idbSend(const char * topic, unsigned char id, const char * payload) {
Expand All @@ -118,8 +238,6 @@ bool idbEnabled() {

void idbSetup() {

_idb_client = new SyncClientWrap();

_idbConfigure();

#if WEB_SUPPORT
Expand All @@ -131,11 +249,28 @@ void idbSetup() {

#if BROKER_SUPPORT
StatusBroker::Register(_idbBrokerStatus);
SensorBroker::Register(_idbBrokerSensor);
SensorReportBroker::Register(_idbBrokerSensor);
#endif

// Main callbacks
espurnaRegisterReload(_idbConfigure);
espurnaRegisterLoop(_idbFlush);

_idb_data.reserve(INFLUXDB_DATA_BUFFER_SIZE);

#if TERMINAL_SUPPORT
terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) {
if (e->argc != 4) {
terminalError(F("idb.send <topic> <id> <value>"));
return;
}

const String topic = e->argv[1];
const auto id = atoi(e->argv[2]);
const String value = e->argv[3];

idbSend(topic.c_str(), id, value.c_str());
});
#endif

}

Expand Down
37 changes: 0 additions & 37 deletions code/espurna/libs/SyncClientWrap.h

This file was deleted.

2 changes: 1 addition & 1 deletion code/espurna/rpnrules.ino
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void rpnSetup() {
#endif

StatusBroker::Register(_rpnBrokerStatus);
SensorBroker::Register(_rpnBrokerCallback);
SensorReadBroker::Register(_rpnBrokerCallback);

espurnaRegisterReload(_rpnConfigure);
espurnaRegisterLoop(_rpnLoop);
Expand Down
8 changes: 4 additions & 4 deletions code/espurna/sensor.ino
Original file line number Diff line number Diff line change
Expand Up @@ -1543,8 +1543,8 @@ void _sensorReport(unsigned char index, double value) {
char buffer[64];
dtostrf(value, 1, decimals, buffer);

#if BROKER_SUPPORT && (not BROKER_REAL_TIME)
SensorBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value, buffer);
#if BROKER_SUPPORT
SensorReportBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value, buffer);
#endif

#if MQTT_SUPPORT
Expand Down Expand Up @@ -1801,12 +1801,12 @@ void sensorLoop() {
// -------------------------------------------------------------

value_show = _magnitudeProcess(magnitude.type, magnitude.decimals, value_raw);
#if BROKER_SUPPORT && BROKER_REAL_TIME
#if BROKER_SUPPORT
{
char buffer[64];
dtostrf(value_show, 1-sizeof(buffer), magnitude.decimals, buffer);

SensorBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value_show, buffer);
SensorReadBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value_show, buffer);
}
#endif

Expand Down

0 comments on commit 0ae6e3b

Please sign in to comment.