-
Notifications
You must be signed in to change notification settings - Fork 0
/
audio-transmission.cc
132 lines (113 loc) · 3.91 KB
/
audio-transmission.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include <unistd.h>
#include <cstring>
#include <netinet/in.h>
#include "audio-transmission.h"
static uint64_t htonl64(uint64_t uint64) {
if (htonl(1) == 1) return uint64;
uint64_t result = 0;
uint32_t tmp;
tmp = (uint32_t) uint64;
tmp = htonl(tmp);
result = result | (((uint64_t) tmp) << 32u);
tmp = (uint32_t) (uint64 >> 32u);
tmp = htonl(tmp);
result = result | ((uint64_t) tmp);
return result;
}
void PacketFifo::push(const AudioPacket &ap) {
auto lock = data.lock();
lock.get()[cursor] = ap;
cursor = (cursor + 1) % size;
last_fbn = ap.first_byte_num;
}
std::optional<AudioPacket> PacketFifo::getPacket(uint64_t fbn) {
auto lock = data.lock();
if (!hasPacket(fbn)) return {};
unsigned long packet_index = (size + cursor - 1 - (last_fbn - fbn) / psize) % size;
return std::make_optional(lock.get()[packet_index]);
}
bool PacketFifo::hasPacket(uint64_t fbn) {
return fbn <= last_fbn && size * psize > last_fbn - fbn && fbn % psize == 0;
}
std::optional<AudioPacket> PacketReader::readPacket() {
while (cursor < psize - 1) {
ssize_t count = read(0, buffer + cursor, psize - cursor);
if (count > 0) {
cursor += count;
} else if (count == 0) {
return {};
} else {
throw std::runtime_error("Input error.");
}
}
auto result = AudioPacket(session_id, fbn, std::vector<uint8_t>(buffer, buffer + psize));
fbn += psize;
cursor = 0;
return std::make_optional(result);
}
PacketReader::~PacketReader() {
delete[] buffer;
}
std::vector<uint8_t> AudioPacket::toBytes() {
std::vector<uint8_t> result(data.size() + 16);
auto session_n = htonl64(session_id);
std::memcpy(result.data(), &session_n, 8);
auto fbn_n = htonl64(first_byte_num);
std::memcpy(result.data() + 8, &fbn_n, 8);
std::memcpy(result.data() + 16, data.data(), data.size());
return result;
}
std::optional<AudioPacket> AudioPacket::fromBytes(const std::vector<uint8_t> &bytes) {
if (bytes.size() < 2 * sizeof(uint64_t)) return {};
uint64_t session_id;
uint64_t fbn;
memcpy(&session_id, bytes.data(), sizeof(uint64_t));
session_id = htonl64(session_id);
memcpy(&fbn, bytes.data() + sizeof(uint64_t), sizeof(uint64_t));
fbn = htonl64(fbn);
std::vector<uint8_t> data(bytes.size() - 2*sizeof(uint64_t));
std::memcpy(data.data(), bytes.data() + 2 * sizeof(uint64_t), bytes.size() - 2 * sizeof(uint64_t));
return std::make_optional<AudioPacket>(session_id, fbn, data);
}
void AudioBuffer::push(const AudioPacket &ap) {
if (missing_map.find(ap.first_byte_num) != missing_map.end()) {
putMissing(ap);
}
if (ap.first_byte_num <= packets.back()->first_byte_num)
return;
auto _lock = missing_packets.lock();
for (uint64_t fbn = packets.back()->first_byte_num + psize; fbn < ap.first_byte_num; fbn += psize) {
insertPacket({}, _lock);
_lock->push_back(fbn);
missing_map[fbn] = packets.end();
}
insertPacket(ap, _lock);
if (!ready && packets.size() >= 3 * size / 4) ready = true;
}
void AudioBuffer::insertPacket(const std::optional<AudioPacket> &ap, LockedValue<std::list<uint64_t>> &_lock) {
if (packets.size() == size) {
dropPacket(_lock);
}
packets.push_back(ap);
}
void AudioBuffer::dropPacket(LockedValue<std::list<uint64_t>> &_lock) {
missing_map.erase(lowest_fbn);
packets.pop_front();
_lock->remove(lowest_fbn);
lowest_fbn += psize;
}
void AudioBuffer::putMissing(const AudioPacket &ap) {
missing_packets.lock()->remove(ap.first_byte_num);
auto it = missing_map.find(ap.first_byte_num);
*(it->second) = ap;
}
std::optional<AudioPacket> AudioBuffer::pop() {
if (packets.empty()) return {};
auto packet = packets.front();
auto _lock = missing_packets.lock();
dropPacket(_lock);
return packet;
}
bool AudioBuffer::isReady() {
return ready;
}