Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Influxdb: fix sensor send frequency, use async client, send data in batches #2061

Merged
merged 2 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions code/espurna/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>
enum class TBrokerType {
SYSTEM,
STATUS,
SENSOR,
SENSOR_READ,
SENSOR_REPORT,
DATETIME,
CONFIG
};
Expand Down Expand Up @@ -49,7 +50,10 @@ TBrokerCallbacks<TArgs...> TBroker<type, TArgs...>::callbacks;
// --- Some known types. Bind them here to avoid .ino screwing with order ---

using StatusBroker = TBroker<TBrokerType::STATUS, const String&, unsigned char, unsigned int>;
using SensorBroker = TBroker<TBrokerType::SENSOR, const String&, unsigned char, double, const char*>;

using SensorReadBroker = TBroker<TBrokerType::SENSOR_READ, const String&, unsigned char, double, const char*>;
using SensorReportBroker = TBroker<TBrokerType::SENSOR_REPORT, const String&, unsigned char, double, const char*>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note to the reader:
The very first arg is needed to distinguish between two otherwise identical templates, without it both ReadBroker and ReportBroker would be the exact same class.


using TimeBroker = TBroker<TBrokerType::DATETIME, const String&, time_t, const String&>;
using ConfigBroker = TBroker<TBrokerType::CONFIG, const String&, const String&>;

Expand Down
4 changes: 0 additions & 4 deletions code/espurna/config/general.h
Original file line number Diff line number Diff line change
Expand Up @@ -1129,10 +1129,6 @@
#define BROKER_SUPPORT 1 // The broker is a poor-man's pubsub manager
#endif

#ifndef BROKER_REAL_TIME
#define BROKER_REAL_TIME 1 // Report real time data
#endif

// -----------------------------------------------------------------------------
// SETTINGS
// -----------------------------------------------------------------------------
Expand Down
219 changes: 177 additions & 42 deletions code/espurna/influxdb.ino
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,104 @@ Copyright (C) 2017-2019 by Xose Pérez <xose dot perez at gmail dot com>

#if INFLUXDB_SUPPORT

#include "ESPAsyncTCP.h"
#include <ESPAsyncTCP.h>
#include <map>
#include <memory>

#include "broker.h"
#include "libs/SyncClientWrap.h"

const char INFLUXDB_REQUEST_TEMPLATE[] PROGMEM = "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n";

constexpr const unsigned long INFLUXDB_CLIENT_TIMEOUT = 5000;
constexpr const size_t INFLUXDB_DATA_BUFFER_SIZE = 256;

bool _idb_enabled = false;
SyncClientWrap * _idb_client;
String _idb_host;
uint16_t _idb_port = 0;

std::map<String, String> _idb_values;
String _idb_data;
bool _idb_flush = false;

std::unique_ptr<AsyncClient> _idb_client = nullptr;
bool _idb_connecting = false;
bool _idb_connected = false;

uint32_t _idb_client_ts = 0;

// -----------------------------------------------------------------------------

void _idbInitClient() {

_idb_client = std::make_unique<AsyncClient>();

_idb_client->onDisconnect([](void * s, AsyncClient * client) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Disconnected\n"));
_idb_flush = false;
_idb_data = "";
_idb_client_ts = 0;
_idb_connected = false;
_idb_connecting = false;
}, nullptr);

_idb_client->onTimeout([](void * s, AsyncClient * client, uint32_t time) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Network timeout after %ums\n"), time);
client->close(true);
}, nullptr);

_idb_client->onData([](void * arg, AsyncClient * client, void * response, size_t len) {
// ref: https://docs.influxdata.com/influxdb/v1.7/tools/api/#summary-table-1
const char idb_success[] = "HTTP/1.1 204";
const bool result = (len > sizeof(idb_success) && (0 == strncmp((char*) response, idb_success, sizeof(idb_success))));
DEBUG_MSG_P(PSTR("[INFLUXDB] %s response after %ums\n"), result ? "Success" : "Failure", millis() - _idb_client_ts);
_idb_client_ts = millis();
client->close();
}, nullptr);

_idb_client->onPoll([](void * arg, AsyncClient * client) {
unsigned long ts = millis() - _idb_client_ts;
if (ts > INFLUXDB_CLIENT_TIMEOUT) {
DEBUG_MSG_P(PSTR("[INFLUXDB] No response after %ums\n"), ts);
client->close(true);
return;
}

if (_idb_data.length()) {
client->write(_idb_data.c_str(), _idb_data.length());
_idb_data = "";
}
});

_idb_client->onConnect([](void * arg, AsyncClient * client) {

_idb_client_ts = millis();
_idb_connected = true;
_idb_connecting = false;

DEBUG_MSG_P(PSTR("[INFLUXDB] Connected to %s:%u\n"),
IPAddress(client->getRemoteAddress()).toString().c_str(),
client->getRemotePort()
);

constexpr const int BUFFER_SIZE = 256;
char headers[BUFFER_SIZE];
int len = snprintf_P(headers, sizeof(headers), INFLUXDB_REQUEST_TEMPLATE,
getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
getSetting("idbUsername", INFLUXDB_USERNAME).c_str(),
getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
_idb_host.c_str(), _idb_port, _idb_data.length()
);
if ((len < 0) || (len > BUFFER_SIZE - 1)) {
client->close(true);
return;
}

client->write(headers, len);

});

}


// -----------------------------------------------------------------------------

Expand All @@ -41,6 +132,7 @@ void _idbConfigure() {
_idb_enabled = false;
setSetting("idbEnabled", 0);
}
if (_idb_enabled && !_idb_client) _idbInitClient();
}

#if BROKER_SUPPORT
Expand All @@ -59,51 +151,79 @@ void _idbBrokerStatus(const String& topic, unsigned char id, unsigned int value)

bool idbSend(const char * topic, const char * payload) {

if (!_idb_enabled) return true;
if (!wifiConnected() || (WiFi.getMode() != WIFI_STA)) return true;
if (!_idb_enabled) return false;
if (_idb_connected) return false;

String h = getSetting("idbHost", INFLUXDB_HOST);
#if MDNS_CLIENT_SUPPORT
h = mdnsResolve(h);
#endif
char * host = strdup(h.c_str());
unsigned int port = getSetting("idbPort", INFLUXDB_PORT).toInt();
DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host, port);
_idb_values[topic] = payload;
_idb_flush = true;

bool success = false;
return true;
}

_idb_client->setTimeout(2);
if (_idb_client->connect((const char *) host, (unsigned int) port)) {
void _idbSend(const String& host, const uint16_t port) {
if (_idb_connected || _idb_connecting) return;

char data[128];
snprintf(data, sizeof(data), "%s,device=%s value=%s", topic, getSetting("hostname").c_str(), String(payload).c_str());
DEBUG_MSG_P(PSTR("[INFLUXDB] Data: %s\n"), data);
DEBUG_MSG_P(PSTR("[INFLUXDB] Sending to %s:%u\n"), host.c_str(), port);

char request[256];
snprintf(request, sizeof(request), "POST /write?db=%s&u=%s&p=%s HTTP/1.1\r\nHost: %s:%u\r\nContent-Length: %d\r\n\r\n%s",
getSetting("idbDatabase", INFLUXDB_DATABASE).c_str(),
getSetting("idbUsername", INFLUXDB_USERNAME).c_str(), getSetting("idbPassword", INFLUXDB_PASSWORD).c_str(),
host, port, strlen(data), data);

if (_idb_client->printf(request) > 0) {
while (_idb_client->connected() && _idb_client->available() == 0) delay(1);
while (_idb_client->available()) _idb_client->read();
if (_idb_client->connected()) _idb_client->stop();
success = true;
} else {
DEBUG_MSG_P(PSTR("[INFLUXDB] Sent failed\n"));
}
// TODO: cache `Host: <host>:<port>` instead of storing things separately?
_idb_host = host;
_idb_port = port;

_idb_client->stop();
while (_idb_client->connected()) yield();
_idb_client_ts = millis();
_idb_connecting = _idb_client->connect(host.c_str(), port);

} else {
DEBUG_MSG_P(PSTR("[INFLUXDB] Connection failed\n"));
if (!_idb_connecting) {
DEBUG_MSG_P(PSTR("[INFLUXDB] Connection to %s:%u failed\n"), host.c_str(), port);
_idb_client->close(true);
}
}

void _idbFlush() {
// Clean-up client object when not in use
if (_idb_client && !_idb_enabled && !_idb_connected && !_idb_connecting) {
_idb_client = nullptr;
}

// Wait until current connection is finished
if (!_idb_flush) return;
if (_idb_connected || _idb_connecting) return;

// Wait until connected
if (!wifiConnected()) return;

// TODO: MDNS_CLIENT_SUPPORT is deprecated
String host = getSetting("idbHost", INFLUXDB_HOST);
#if MDNS_CLIENT_SUPPORT
host = mdnsResolve(host);
#endif

const uint16_t port = getSetting("idbPort", INFLUXDB_PORT).toInt();

// TODO: should we always store specific pairs like tspk keeps relay / sensor readings?
// note that we also send heartbeat data, persistent values should be flagged
const String device = getSetting("hostname");

_idb_data = "";
for (auto& pair : _idb_values) {
if (!isNumber(pair.second.c_str())) {
String quoted;
quoted.reserve(pair.second.length() + 2);
quoted += '"';
quoted += pair.second;
quoted += '"';
pair.second = quoted;
}

free(host);
return success;
char buffer[128] = {0};
snprintf_P(buffer, sizeof(buffer),
PSTR("%s,device=%s value=%s\n"),
pair.first.c_str(), device.c_str(), pair.second.c_str()
);
_idb_data += buffer;
}
_idb_values.clear();

_idbSend(host, port);
}

bool idbSend(const char * topic, unsigned char id, const char * payload) {
Expand All @@ -118,8 +238,6 @@ bool idbEnabled() {

void idbSetup() {

_idb_client = new SyncClientWrap();

_idbConfigure();

#if WEB_SUPPORT
Expand All @@ -131,11 +249,28 @@ void idbSetup() {

#if BROKER_SUPPORT
StatusBroker::Register(_idbBrokerStatus);
SensorBroker::Register(_idbBrokerSensor);
SensorReportBroker::Register(_idbBrokerSensor);
#endif

// Main callbacks
espurnaRegisterReload(_idbConfigure);
espurnaRegisterLoop(_idbFlush);

_idb_data.reserve(INFLUXDB_DATA_BUFFER_SIZE);

#if TERMINAL_SUPPORT
terminalRegisterCommand(F("IDB.SEND"), [](Embedis* e) {
if (e->argc != 4) {
terminalError(F("idb.send <topic> <id> <value>"));
return;
}

const String topic = e->argv[1];
const auto id = atoi(e->argv[2]);
const String value = e->argv[3];

idbSend(topic.c_str(), id, value.c_str());
});
#endif

}

Expand Down
37 changes: 0 additions & 37 deletions code/espurna/libs/SyncClientWrap.h

This file was deleted.

2 changes: 1 addition & 1 deletion code/espurna/rpnrules.ino
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void rpnSetup() {
#endif

StatusBroker::Register(_rpnBrokerStatus);
SensorBroker::Register(_rpnBrokerCallback);
SensorReadBroker::Register(_rpnBrokerCallback);

espurnaRegisterReload(_rpnConfigure);
espurnaRegisterLoop(_rpnLoop);
Expand Down
8 changes: 4 additions & 4 deletions code/espurna/sensor.ino
Original file line number Diff line number Diff line change
Expand Up @@ -1543,8 +1543,8 @@ void _sensorReport(unsigned char index, double value) {
char buffer[64];
dtostrf(value, 1, decimals, buffer);

#if BROKER_SUPPORT && (not BROKER_REAL_TIME)
SensorBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value, buffer);
#if BROKER_SUPPORT
SensorReportBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value, buffer);
#endif

#if MQTT_SUPPORT
Expand Down Expand Up @@ -1801,12 +1801,12 @@ void sensorLoop() {
// -------------------------------------------------------------

value_show = _magnitudeProcess(magnitude.type, magnitude.decimals, value_raw);
#if BROKER_SUPPORT && BROKER_REAL_TIME
#if BROKER_SUPPORT
{
char buffer[64];
dtostrf(value_show, 1-sizeof(buffer), magnitude.decimals, buffer);

SensorBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value_show, buffer);
SensorReadBroker::Publish(magnitudeTopic(magnitude.type), magnitude.global, value_show, buffer);
}
#endif

Expand Down