Skip to content

Commit

Permalink
refactor: use atomic, bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
nullptr authored Jan 26, 2024
1 parent 4e03926 commit 8f68e19
Showing 1 changed file with 35 additions and 33 deletions.
68 changes: 35 additions & 33 deletions sscma/interface/transport/mqtt.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <cstring>
#include <string>
Expand Down Expand Up @@ -123,21 +124,22 @@ class MQTT final : public Supervisable, public Transport {
}

std::size_t read_bytes(char* buffer, std::size_t size) override {
auto head = _head;
auto tail = _tail;
auto remain = head < tail ? tail - head : _size - (head - tail);
std::size_t avail = _size - remain;
std::size_t const size_mask = _size - 1;
auto head = _head.load();
auto tail = _tail.load();
auto remain = head < tail ? tail - head : _size - (head - tail);
std::size_t avail = _size - remain;

size = std::min(avail, size);
std::size_t i = 0;
for (; i < avail; ++i) {
auto c = _buffer[tail = (tail + i) % _size];
auto c = _buffer[tail = (tail + i) & size_mask];
buffer[i] = c;
if (c == '\0') [[unlikely]]
break;
}

_tail = tail;
_tail.store(tail);

return i;
}
Expand All @@ -156,29 +158,28 @@ class MQTT final : public Supervisable, public Transport {
}

char get_char() override {
auto head = _head;
auto tail = _tail;
std::size_t const size_mask = _size - 1;
auto head = _head.load();
auto tail = _tail.load();

if (head == tail) return '\0';

auto c = _buffer[tail];
tail = (tail + 1) % _size;
_tail = tail;
tail = (tail + 1) & size_mask;
_tail.store(tail);

return c;
}

std::size_t get_line(char* buffer, std::size_t size, const char delim = 0x0d) override {
std::size_t len = 0;
std::size_t const len_max = size - 1;
auto head = _head;
auto tail = _tail;
auto prev = tail;
auto found = false;

if (head == tail) return 0;

for (; (!found) & (head != tail) & (len < len_max); tail = (tail + 1) % _size, ++len) {
std::size_t len = 0;
std::size_t const size_mask = _size - 1;
auto head = _head.load();
auto tail = _tail.load();
auto prev = tail;
auto found = false;

for (; (!found) & (head ^ tail) & (len < size); tail = (tail + 1) & size_mask, ++len) {
char c = _buffer[tail];
if (c == '\0')
break;
Expand All @@ -188,8 +189,8 @@ class MQTT final : public Supervisable, public Transport {

if (!found) return 0;

for (std::size_t i = 0; i < len; ++i) buffer[i] = _buffer[(prev + i) % _size];
_tail = tail;
for (std::size_t i = 0; i < len; ++i) buffer[i] = _buffer[(prev + i) & size_mask];
_tail.store(tail);

return len;
}
Expand Down Expand Up @@ -264,18 +265,19 @@ class MQTT final : public Supervisable, public Transport {
}

inline bool push_to_buffer(const char* bytes, std::size_t size) {
std::size_t len = 0;
auto head = _head;
auto tail = _tail;
auto remain = head < tail ? tail - head : _size - (head - tail);
std::size_t len = 0;
std::size_t const size_mask = _size - 1;
auto head = _head.load();
auto tail = _tail.load();
auto remain = head > tail ? head - tail : _size - (tail - head);

size = std::min(remain, size);
for (; len < size; ++len) {
_buffer[head] = bytes[len];
head = (head + 1) % _size;
head = (head + 1) & size_mask;
}

_head = head;
_head.store(head);

return len == size;
}
Expand Down Expand Up @@ -400,11 +402,11 @@ class MQTT final : public Supervisable, public Transport {
Network* _network;
Mutex _device_lock;

static MQTT* _mqtt_handler_ptr;
const std::size_t _size;
char* _buffer;
volatile std::size_t _head;
volatile std::size_t _tail;
static MQTT* _mqtt_handler_ptr;
const std::size_t _size;
char* _buffer;
std::atomic<std::size_t> _head;
std::atomic<std::size_t> _tail;

SynchronizableObject<std::pair<mqtt_sta_e, mqtt_server_config_t>> _mqtt_server_config;

Expand Down

0 comments on commit 8f68e19

Please sign in to comment.