-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender.cc
132 lines (117 loc) · 4.1 KB
/
sender.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#include "sender.h"
#include "control-protocol.h"
#include <iostream>
#include <algorithm>
#include <sstream>
#include <unistd.h>
void streamer() {
PacketReader pr(configuration()->psize, configuration()->session_id);
udp::Broadcaster sock(configuration()->data_address);
while (true) {
auto packet = pr.readPacket();
if (!packet) break;
packet_fifo()->push(*packet);
sock.send(packet->toBytes());
}
running().lock().get() = false;
}
std::unique_ptr<Configuration> &configuration() {
static std::unique_ptr<Configuration> _configuration;
return _configuration;
}
std::unique_ptr<PacketFifo> &packet_fifo() {
static std::unique_ptr<PacketFifo> _packet_fifo;
return _packet_fifo;
}
std::string reply_message() {
std::stringstream result;
result << ctrl::REPLY_HEADER << ' ';
result << configuration()->data_address.getIP() << ' ' << configuration()->data_address.getPort() << ' ';
result << configuration()->name;
return result.str();
}
void handle_lookup(const udp::Address &caller, udp::Socket &sock) {
sock.send(udp::Datagram(caller, reply_message()));
}
void handle_rexmit(const udp::Address &caller, const std::string &packets_list) {
std::string fbn_string;
std::stringstream ss(packets_list);
rexmit_orders_t new_orders;
while (std::getline(ss, fbn_string, ',')) {
uint64_t fbn;
try {
fbn = std::stoul(fbn_string);
if (fbn % configuration()->psize == 0) new_orders.emplace_back(fbn, caller.getIP());
} catch (...) {}
}
auto orders_lock = rexmit_orders().lock();
orders_lock->splice(orders_lock->end(), new_orders);
}
static const long CONTROLLER_SOCET_TIMEOUT = 200;
void controller() {
udp::Socket sock;
sock.bindToPort(configuration()->control_port);
while (true) {
if (!running().lock().get()) break;
auto datagram = sock.receive(CONTROLLER_SOCET_TIMEOUT);
if (datagram.data.empty()) continue;
if (!ctrl::controlPacketBytesValidation(datagram.data))
continue;
auto author = datagram.address;
auto message = std::string(datagram.data.begin(), datagram.data.end());
std::stringstream sstream(message);
std::string message_header;
sstream >> message_header;
if (message_header == ctrl::LOOKUP_HEADER) {
handle_lookup(author, sock);
} else if (message_header == ctrl::REXMIT_HEADER) {
std::string packets_list;
sstream >> packets_list;
handle_rexmit(author, packets_list);
} else if (message_header == "SHOW") {
auto orders = rexmit_orders().lock().get();
std::for_each(orders.begin(), orders.end(), [](const std::pair<uint64_t, std::string> &order) {
std::cout << "From " << order.second << " packet " << order.first << " |" << std::endl;
});
}
}
}
void resend_packets(udp::Socket &sock) {
rexmit_orders_t orders;
{
auto lock = rexmit_orders().lock();
orders = lock.get();
lock->clear();
}
for (auto &order : orders) {
auto packet = packet_fifo()->getPacket(order.first);
if (packet) {
udp::Datagram dgram(
udp::Address(order.second, configuration()->data_address.getPort()),
packet->toBytes()
);
try {
sock.send(dgram);
} catch (const IOException &exc) {
std::cout << "Error occurred while resending packet" << std::endl;
std::cerr << exc.what() << std::endl;
}
}
}
}
void resender() {
udp::Socket sock;
while (true) {
if (!running().lock().get()) break;
resend_packets(sock);
std::this_thread::sleep_for(std::chrono::microseconds(configuration()->rtime));
}
}
MutexValue<rexmit_orders_t> &rexmit_orders() {
static MutexValue<rexmit_orders_t> *orders = new MutexValue<rexmit_orders_t>(rexmit_orders_t());
return *orders;
}
MutexValue<bool> &running() {
static auto *run = new MutexValue<bool>(true);
return *run;
}