diff --git a/include/MessageOutput.h b/include/MessageOutput.h index 94f915a5d..fc06f385c 100644 --- a/include/MessageOutput.h +++ b/include/MessageOutput.h @@ -2,12 +2,13 @@ #pragma once #include -#include -#include #include +#include +#include #include - -#define BUFFER_SIZE 500 +#include +#include +#include class MessageOutputClass : public Print { public: @@ -21,13 +22,19 @@ class MessageOutputClass : public Print { Task _loopTask; + using message_t = std::vector; + + // we keep a buffer for every task and only write complete lines to the + // serial output and then move them to be pushed through the websocket. + // this way we prevent mangling of messages from different contexts. + std::unordered_map _task_messages; + std::queue _lines; + AsyncWebSocket* _ws = nullptr; - char _buffer[BUFFER_SIZE]; - uint16_t _buff_pos = 0; - uint32_t _lastSend = 0; - bool _forceSend = false; std::mutex _msgLock; + + void serialWrite(message_t const& m); }; extern MessageOutputClass MessageOutput; \ No newline at end of file diff --git a/src/MessageOutput.cpp b/src/MessageOutput.cpp index f602bee15..027ce20c8 100644 --- a/src/MessageOutput.cpp +++ b/src/MessageOutput.cpp @@ -2,10 +2,9 @@ /* * Copyright (C) 2022-2023 Thomas Basler and others */ +#include #include "MessageOutput.h" -#include - MessageOutputClass MessageOutput; void MessageOutputClass::init(Scheduler& scheduler) @@ -18,46 +17,97 @@ void MessageOutputClass::init(Scheduler& scheduler) void MessageOutputClass::register_ws_output(AsyncWebSocket* output) { + std::lock_guard lock(_msgLock); + _ws = output; } +void MessageOutputClass::serialWrite(MessageOutputClass::message_t const& m) +{ + // on ESP32-S3, Serial.flush() blocks until a serial console is attached. + // operator bool() of HWCDC returns false if the device is not attached to + // a USB host. in general it makes sense to skip writing entirely if the + // default serial port is not ready. + if (!Serial) { return; } + + size_t written = 0; + while (written < m.size()) { + written += Serial.write(m.data() + written, m.size() - written); + } + Serial.flush(); +} + size_t MessageOutputClass::write(uint8_t c) { - if (_buff_pos < BUFFER_SIZE) { - std::lock_guard lock(_msgLock); - _buffer[_buff_pos] = c; - _buff_pos++; - } else { - _forceSend = true; + std::lock_guard lock(_msgLock); + + auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t()); + auto iter = res.first; + auto& message = iter->second; + + message.push_back(c); + + if (c == '\n') { + serialWrite(message); + _lines.emplace(std::move(message)); + _task_messages.erase(iter); } - return Serial.write(c); + return 1; } -size_t MessageOutputClass::write(const uint8_t* buffer, size_t size) +size_t MessageOutputClass::write(const uint8_t *buffer, size_t size) { std::lock_guard lock(_msgLock); - if (_buff_pos + size < BUFFER_SIZE) { - memcpy(&_buffer[_buff_pos], buffer, size); - _buff_pos += size; + + auto res = _task_messages.emplace(xTaskGetCurrentTaskHandle(), message_t()); + auto iter = res.first; + auto& message = iter->second; + + message.reserve(message.size() + size); + + for (size_t idx = 0; idx < size; ++idx) { + uint8_t c = buffer[idx]; + + message.push_back(c); + + if (c == '\n') { + serialWrite(message); + _lines.emplace(std::move(message)); + message.clear(); + message.reserve(size - idx - 1); + } } - _forceSend = true; - return Serial.write(buffer, size); + if (message.empty()) { _task_messages.erase(iter); } + + return size; } void MessageOutputClass::loop() { - // Send data via websocket if either time is over or buffer is full - if (_forceSend || (millis() - _lastSend > 1000)) { - std::lock_guard lock(_msgLock); - if (_ws && _buff_pos > 0) { - _ws->textAll(_buffer, _buff_pos); - _buff_pos = 0; + std::lock_guard lock(_msgLock); + + // clean up (possibly filled) buffers of deleted tasks + auto map_iter = _task_messages.begin(); + while (map_iter != _task_messages.end()) { + if (eTaskGetState(map_iter->first) == eDeleted) { + map_iter = _task_messages.erase(map_iter); + continue; } - if (_forceSend) { - _buff_pos = 0; + + ++map_iter; + } + + if (!_ws) { + while (!_lines.empty()) { + _lines.pop(); // do not hog memory } - _forceSend = false; + return; + } + + while (!_lines.empty() && _ws->availableForWriteAll()) { + _ws->textAll(std::make_shared(std::move(_lines.front()))); + _lines.pop(); } } \ No newline at end of file