Skip to content

Commit

Permalink
WS refactoring (#2261)
Browse files Browse the repository at this point in the history
* ws: clean-up wsPost implementation

- explain ourselves
- re-do consts, fix locality
- fix shadowing in ctors
- more consistent naming
- timeout for messages

* save 16 bytes temporary

* reference reference
  • Loading branch information
mcspr authored May 26, 2020
1 parent b8fc8cd commit ff89504
Show file tree
Hide file tree
Showing 4 changed files with 285 additions and 169 deletions.
4 changes: 3 additions & 1 deletion code/espurna/mqtt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1148,7 +1148,9 @@ void mqttSetup() {
.onKeyCheck(_mqttWebSocketOnKeyCheck);

mqttRegister([](unsigned int type, const char*, const char*) {
if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) wsPost(_mqttWebSocketOnData);
if ((type == MQTT_CONNECT_EVENT) || (type == MQTT_DISCONNECT_EVENT)) {
wsPost(_mqttWebSocketOnData);
}
});
#endif

Expand Down
203 changes: 120 additions & 83 deletions code/espurna/ws.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,64 @@ uint32_t _ws_last_update = 0;
// WS callbacks
// -----------------------------------------------------------------------------

std::queue<WsPostponedCallbacks> _ws_queue;
ws_callbacks_t _ws_callbacks;

void wsPost(uint32_t client_id, ws_on_send_callback_f&& cb) {
_ws_queue.emplace(client_id, std::move(cb));
}

void wsPost(ws_on_send_callback_f&& cb) {
wsPost(0, std::move(cb));
}

void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb) {
_ws_queue.emplace(client_id, cb);
}

void wsPost(const ws_on_send_callback_f& cb) {
wsPost(0, cb);
}

template <typename T>
void _wsPostCallbacks(uint32_t client_id, T&& cbs, WsPostponedCallbacks::Mode mode) {
_ws_queue.emplace(client_id, std::forward<T>(cbs), mode);
}

void wsPostAll(uint32_t client_id, ws_on_send_callback_list_t&& cbs) {
_wsPostCallbacks(client_id, std::move(cbs), WsPostponedCallbacks::Mode::All);
}

void wsPostAll(ws_on_send_callback_list_t&& cbs) {
wsPostAll(0, std::move(cbs));
}

void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_wsPostCallbacks(client_id, cbs, WsPostponedCallbacks::Mode::All);
}

void wsPostAll(const ws_on_send_callback_list_t& cbs) {
wsPostAll(0, cbs);
}

void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs) {
_wsPostCallbacks(client_id, std::move(cbs), WsPostponedCallbacks::Mode::Sequence);
}

void wsPostSequence(ws_on_send_callback_list_t&& cbs) {
wsPostSequence(0, std::move(cbs));
}

void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_wsPostCallbacks(client_id, cbs, WsPostponedCallbacks::Mode::Sequence);
}

void wsPostSequence(const ws_on_send_callback_list_t& cbs) {
wsPostSequence(0, cbs);
}

// -----------------------------------------------------------------------------

ws_callbacks_t& ws_callbacks_t::onVisible(ws_on_send_callback_f cb) {
on_visible.push_back(cb);
return *this;
Expand All @@ -53,14 +111,11 @@ ws_callbacks_t& ws_callbacks_t::onKeyCheck(ws_on_keycheck_callback_f cb) {
return *this;
}

static ws_callbacks_t _ws_callbacks;
static std::queue<ws_data_t> _ws_client_data;

// -----------------------------------------------------------------------------
// WS authentication
// -----------------------------------------------------------------------------

ws_ticket_t _ws_tickets[WS_BUFFER_SIZE];
WsTicket _ws_tickets[WS_BUFFER_SIZE];

void _onAuth(AsyncWebServerRequest *request) {

Expand Down Expand Up @@ -109,28 +164,30 @@ bool _wsAuth(AsyncWebSocketClient * client) {

#if DEBUG_WEB_SUPPORT

ws_debug_t _ws_debug(WS_DEBUG_MSG_BUFFER);
constexpr size_t WsDebugMessagesMax = 8;

WsDebug _ws_debug(WsDebugMessagesMax);

void ws_debug_t::send(const bool connected) {
if (!connected && flush) {
void WsDebug::send(bool connected) {
if (!connected && _flush) {
clear();
return;
}

if (!flush) return;
if (!_flush) return;
// ref: http://arduinojson.org/v5/assistant/
// {"weblog": {"msg":[...],"pre":[...]}}
DynamicJsonBuffer jsonBuffer(2*JSON_ARRAY_SIZE(messages.size()) + JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2));
DynamicJsonBuffer jsonBuffer(2*JSON_ARRAY_SIZE(_messages.size()) + JSON_OBJECT_SIZE(1) + JSON_OBJECT_SIZE(2));

JsonObject& root = jsonBuffer.createObject();
JsonObject& weblog = root.createNestedObject("weblog");

JsonArray& msg = weblog.createNestedArray("msg");
JsonArray& pre = weblog.createNestedArray("pre");
JsonArray& msg_array = weblog.createNestedArray("msg");
JsonArray& pre_array = weblog.createNestedArray("pre");

for (auto& message : messages) {
pre.add(message.first.c_str());
msg.add(message.second.c_str());
for (auto& msg : _messages) {
pre_array.add(msg.first.c_str());
msg_array.add(msg.second.c_str());
}

wsSend(root);
Expand Down Expand Up @@ -421,31 +478,6 @@ void _wsOnConnected(JsonObject& root) {
root["hbInterval"] = getSetting("hbInterval", HEARTBEAT_INTERVAL);
}

void wsSend(JsonObject& root) {
// TODO: avoid serializing twice?
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);

if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
_ws.textAll(buffer);
}
}

void wsSend(uint32_t client_id, JsonObject& root) {
AsyncWebSocketClient* client = _ws.client(client_id);
if (client == nullptr) return;

// TODO: avoid serializing twice?
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);

if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
client->text(buffer);
}
}

void _wsConnected(uint32_t client_id) {

const bool changePassword = (USE_PASSWORD && WEB_FORCE_PASS_CHANGE)
Expand Down Expand Up @@ -515,22 +547,30 @@ void _wsEvent(AsyncWebSocket * server, AsyncWebSocketClient * client, AwsEventTy
// TODO: make this generic loop method to queue important ws messages?
// or, if something uses ticker / async ctx to send messages,
// it needs a retry mechanism built into the callback object
void _wsHandleClientData(const bool connected) {
void _wsHandlePostponedCallbacks(bool connected) {

if (!connected && !_ws_client_data.empty()) {
_ws_client_data.pop();
if (!connected && !_ws_queue.empty()) {
_ws_queue.pop();
return;
}

if (_ws_client_data.empty()) return;
auto& data = _ws_client_data.front();
if (_ws_queue.empty()) return;
auto& callbacks = _ws_queue.front();

// avoid stalling forever when can't send anything
constexpr decltype(ESP.getCycleCount()) WsQueueTimeoutClockCycles = microsecondsToClockCycles(10 * 1000 * 1000); // 10s
if (ESP.getCycleCount() - callbacks.timestamp > WsQueueTimeoutClockCycles) {
_ws_queue.pop();
return;
}

// client_id == 0 means we need to send the message to every client
if (data.client_id) {
AsyncWebSocketClient* ws_client = _ws.client(data.client_id);
if (callbacks.client_id) {
AsyncWebSocketClient* ws_client = _ws.client(callbacks.client_id);

// ...but, we need to check if client is still connected
if (!ws_client) {
_ws_client_data.pop();
_ws_queue.pop();
return;
}

Expand All @@ -544,27 +584,27 @@ void _wsHandleClientData(const bool connected) {
// XXX: block allocation will try to create *2 next time,
// likely failing and causing wsSend to reference empty objects
// XXX: arduinojson6 will not do this, but we may need to use per-callback buffers
constexpr const size_t BUFFER_SIZE = 3192;
DynamicJsonBuffer jsonBuffer(BUFFER_SIZE);
constexpr size_t WsQueueJsonBufferSize = 3192;
DynamicJsonBuffer jsonBuffer(WsQueueJsonBufferSize);
JsonObject& root = jsonBuffer.createObject();

data.send(root);
if (data.client_id) {
wsSend(data.client_id, root);
callbacks.send(root);
if (callbacks.client_id) {
wsSend(callbacks.client_id, root);
} else {
wsSend(root);
}
yield();

if (data.done()) {
_ws_client_data.pop();
if (callbacks.done()) {
_ws_queue.pop();
}
}

void _wsLoop() {
const bool connected = wsConnected();
_wsDoUpdate(connected);
_wsHandleClientData(connected);
_wsHandlePostponedCallbacks(connected);
#if DEBUG_WEB_SUPPORT
_ws_debug.send(connected);
#endif
Expand All @@ -586,6 +626,31 @@ ws_callbacks_t& wsRegister() {
return _ws_callbacks;
}

void wsSend(JsonObject& root) {
// Note: 'measurement' tries to serialize json contents byte-by-byte,
// which is somewhat costly, but likely unavoidable for us.
size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);

if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
_ws.textAll(buffer);
}
}

void wsSend(uint32_t client_id, JsonObject& root) {
AsyncWebSocketClient* client = _ws.client(client_id);
if (client == nullptr) return;

size_t len = root.measureLength();
AsyncWebSocketMessageBuffer* buffer = _ws.makeBuffer(len);

if (buffer) {
root.printTo(reinterpret_cast<char*>(buffer->get()), len + 1);
client->text(buffer);
}
}

void wsSend(ws_on_send_callback_f callback) {
if (_ws.count() > 0) {
DynamicJsonBuffer jsonBuffer(512);
Expand Down Expand Up @@ -630,34 +695,6 @@ void wsSend_P(uint32_t client_id, PGM_P payload) {
_ws.text(client_id, buffer);
}

void wsPost(const ws_on_send_callback_f& cb) {
_ws_client_data.emplace(cb);
}

void wsPost(uint32_t client_id, const ws_on_send_callback_f& cb) {
_ws_client_data.emplace(client_id, cb);
}

void wsPostAll(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(client_id, cbs, ws_data_t::ALL);
}

void wsPostAll(const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(0, cbs, ws_data_t::ALL);
}

void wsPostSequence(uint32_t client_id, const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(client_id, cbs, ws_data_t::SEQUENCE);
}

void wsPostSequence(uint32_t client_id, ws_on_send_callback_list_t&& cbs) {
_ws_client_data.emplace(client_id, std::forward<ws_on_send_callback_list_t>(cbs), ws_data_t::SEQUENCE);
}

void wsPostSequence(const ws_on_send_callback_list_t& cbs) {
_ws_client_data.emplace(0, cbs, ws_data_t::SEQUENCE);
}

void wsSetup() {

_ws.onEvent(_wsEvent);
Expand Down
Loading

0 comments on commit ff89504

Please sign in to comment.