Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: thread-safety and dynamic memory for MessageOutput #567

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions include/MessageOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
#pragma once

#include <AsyncWebSocket.h>
#include <HardwareSerial.h>
#include <Stream.h>
#include <TaskSchedulerDeclarations.h>
#include <Print.h>
#include <freertos/task.h>
#include <mutex>

#define BUFFER_SIZE 500
#include <vector>
#include <unordered_map>
#include <queue>

class MessageOutputClass : public Print {
public:
Expand All @@ -21,13 +22,19 @@ class MessageOutputClass : public Print {

Task _loopTask;

using message_t = std::vector<uint8_t>;

// we keep a buffer for every task and only write complete lines to the
// serial output and then move them to be pushed through the websocket.
// this way we prevent mangling of messages from different contexts.
std::unordered_map<TaskHandle_t, message_t> _task_messages;
std::queue<message_t> _lines;

AsyncWebSocket* _ws = nullptr;
char _buffer[BUFFER_SIZE];
uint16_t _buff_pos = 0;
uint32_t _lastSend = 0;
bool _forceSend = false;

std::mutex _msgLock;

void serialWrite(message_t const& m);
};

extern MessageOutputClass MessageOutput;
98 changes: 74 additions & 24 deletions src/MessageOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
/*
* Copyright (C) 2022-2023 Thomas Basler and others
*/
#include <HardwareSerial.h>
#include "MessageOutput.h"

#include <Arduino.h>

MessageOutputClass MessageOutput;

void MessageOutputClass::init(Scheduler& scheduler)
Expand All @@ -18,46 +17,97 @@ void MessageOutputClass::init(Scheduler& scheduler)

void MessageOutputClass::register_ws_output(AsyncWebSocket* output)
{
std::lock_guard<std::mutex> lock(_msgLock);

_ws = output;
}

void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m)
{
// on ESP32-S3, Serial.flush() blocks until a serial console is attached.
// operator bool() of HWCDC returns false if the device is not attached to
// a USB host. in general it makes sense to skip writing entirely if the
// default serial port is not ready.
if (!Serial) { return; }

size_t written = 0;
while (written < m.size()) {
written += Serial.write(m.data() + written, m.size() - written);
}
Serial.flush();
}

size_t MessageOutputClass::write(uint8_t c)
{
if (_buff_pos < BUFFER_SIZE) {
std::lock_guard<std::mutex> lock(_msgLock);
_buffer[_buff_pos] = c;
_buff_pos++;
} else {
_forceSend = true;
std::lock_guard<std::mutex> lock(_msgLock);

auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
auto iter = res.first;
auto& message = iter->second;

message.push_back(c);

if (c == '\n') {
serialWrite(message);
_lines.emplace(std::move(message));
_task_messages.erase(iter);
}

return Serial.write(c);
return 1;
}

size_t MessageOutputClass::write(const uint8_t* buffer, size_t size)
size_t MessageOutputClass::write(const uint8_t *buffer, size_t size)
{
std::lock_guard<std::mutex> lock(_msgLock);
if (_buff_pos + size < BUFFER_SIZE) {
memcpy(&_buffer[_buff_pos], buffer, size);
_buff_pos += size;

auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t());
auto iter = res.first;
auto& message = iter->second;

message.reserve(message.size() + size);

for (size_t idx = 0; idx < size; ++idx) {
uint8_t c = buffer[idx];

message.push_back(c);

if (c == '\n') {
serialWrite(message);
_lines.emplace(std::move(message));
message.clear();
message.reserve(size - idx - 1);
}
}
_forceSend = true;

return Serial.write(buffer, size);
if (message.empty()) { _task_messages.erase(iter); }

return size;
}

void MessageOutputClass::loop()
{
// Send data via websocket if either time is over or buffer is full
if (_forceSend || (millis() - _lastSend > 1000)) {
std::lock_guard<std::mutex> lock(_msgLock);
if (_ws && _buff_pos > 0) {
_ws->textAll(_buffer, _buff_pos);
_buff_pos = 0;
std::lock_guard<std::mutex> lock(_msgLock);

// clean up (possibly filled) buffers of deleted tasks
auto map_iter = _task_messages.begin();
while (map_iter != _task_messages.end()) {
if (eTaskGetState(map_iter->first) == eDeleted) {
map_iter = _task_messages.erase(map_iter);
continue;
}
if (_forceSend) {
_buff_pos = 0;

++map_iter;
}

if (!_ws) {
while (!_lines.empty()) {
_lines.pop(); // do not hog memory
}
_forceSend = false;
return;
}

while (!_lines.empty() && _ws->availableForWriteAll()) {
_ws->textAll(std::make_shared<message_t>(std::move(_lines.front())));
_lines.pop();
}
}