Skip to content

Commit

Permalink
Messages: thread-safety and dynamic memory (#418)
Browse files Browse the repository at this point in the history
* thread-safety and dynamic memory for MessageOutput

* use dynamic memory to allow handling of arbitrary message lenghts.
* keep a message buffer for every task so no task ever mangles the
  message of another task.
* every complete line is written to the serial console and moved to
  a line buffer for sending them through the websocket.
* the websocket is always fed complete lines.
* make sure to feed only as many lines as possible to the websocket
  handler, so that no lines are dropped.
* lock all MessageOutput state against concurrent access.

* MessageOutput: respect HardwareSerial buffer size

the MessageOutput class buffers whole lines of output printed by any
task in order to avoid mangling of text. that means we hand over full
lines to the HardwareSerial instance, which might be too much in one
call to write(buffer, size). we now check the return value of
write(buffer, size) and call the function again with the part of the
message that could not yet be written by HardwareSerial.
  • Loading branch information
schlimmchen authored Sep 4, 2023
1 parent ba303da commit 68783b4
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 33 deletions.
25 changes: 16 additions & 9 deletions include/MessageOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
#pragma once

#include <AsyncWebSocket.h>
#include <HardwareSerial.h>
#include <Stream.h>
#include <Print.h>
#include <freertos/task.h>
#include <mutex>

#define BUFFER_SIZE 500
#include <vector>
#include <unordered_map>
#include <queue>

class MessageOutputClass : public Print {
public:
Expand All @@ -16,13 +17,19 @@ class MessageOutputClass : public Print {
void register_ws_output(AsyncWebSocket* output);

private:
AsyncWebSocket* _ws = NULL;
char _buffer[BUFFER_SIZE];
uint16_t _buff_pos = 0;
uint32_t _lastSend = 0;
bool _forceSend = false;
using message_t = std::vector<uint8_t>;

// we keep a buffer for every task and only write complete lines to the
// serial output and then move them to be pushed through the websocket.
// this way we prevent mangling of messages from different contexts.
std::unordered_map<TaskHandle_t, message_t> _task_messages;
std::queue<message_t> _lines;

AsyncWebSocket* _ws = nullptr;

std::mutex _msgLock;

void serialWrite(message_t const& m);
};

extern MessageOutputClass MessageOutput;
98 changes: 74 additions & 24 deletions src/MessageOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,54 +2,104 @@
/*
* Copyright (C) 2022 Thomas Basler and others
*/
#include <HardwareSerial.h>
#include "MessageOutput.h"

#include <Arduino.h>

MessageOutputClass MessageOutput;

void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
{
std::lock_guard<std::mutex> lock(_msgLock);

_ws = output;
}

void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m)
{
// on ESP32-S3, Serial.flush() blocks until a serial console is attached.
// operator bool() of HWCDC returns false if the device is not attached to
// a USB host. in general it makes sense to skip writing entirely if the
// default serial port is not ready.
if (!Serial) { return; }

size_t written = 0;
while (written < m.size()) {
written += Serial.write(m.data() + written, m.size() - written);
}
Serial.flush();
}

size_t MessageOutputClass::write(uint8_t c)
{
if (_buff_pos < BUFFER_SIZE) {
std::lock_guard<std::mutex> lock(_msgLock);
_buffer[_buff_pos] = c;
_buff_pos++;
} else {
_forceSend = true;
std::lock_guard<std::mutex> lock(_msgLock);

auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
auto iter = res.first;
auto& message = iter->second;

message.push_back(c);

if (c == '\n') {
serialWrite(message);
_lines.emplace(std::move(message));
_task_messages.erase(iter);
}

return Serial.write(c);
return 1;
}

size_t MessageOutputClass::write(const uint8_t* buffer, size_t size)
size_t MessageOutputClass::write(const uint8_t *buffer, size_t size)
{
std::lock_guard<std::mutex> lock(_msgLock);
if (_buff_pos + size < BUFFER_SIZE) {
memcpy(&_buffer[_buff_pos], buffer, size);
_buff_pos += size;

auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
auto iter = res.first;
auto& message = iter->second;

message.reserve(message.size() + size);

for (size_t idx = 0; idx < size; ++idx) {
uint8_t c = buffer[idx];

message.push_back(c);

if (c == '\n') {
serialWrite(message);
_lines.emplace(std::move(message));
message.clear();
message.reserve(size - idx - 1);
}
}
_forceSend = true;

return Serial.write(buffer, size);
if (message.empty()) { _task_messages.erase(iter); }

return size;
}

void MessageOutputClass::loop()
{
// Send data via websocket if either time is over or buffer is full
if (_forceSend || (millis() - _lastSend > 1000)) {
std::lock_guard<std::mutex> lock(_msgLock);
if (_ws && _buff_pos > 0) {
_ws->textAll(_buffer, _buff_pos);
_buff_pos = 0;
std::lock_guard<std::mutex> lock(_msgLock);

// clean up (possibly filled) buffers of deleted tasks
auto map_iter = _task_messages.begin();
while (map_iter != _task_messages.end()) {
if (eTaskGetState(map_iter->first) == eDeleted) {
map_iter = _task_messages.erase(map_iter);
continue;
}
if (_forceSend) {
_buff_pos = 0;

++map_iter;
}

if (!_ws) {
while (!_lines.empty()) {
_lines.pop(); // do not hog memory
}
_forceSend = false;
return;
}

while (!_lines.empty() && _ws->availableForWriteAll()) {
_ws->textAll(std::make_shared<message_t>(std::move(_lines.front())));
_lines.pop();
}
}

0 comments on commit 68783b4

Please sign in to comment.