-
Notifications
You must be signed in to change notification settings - Fork 44
/
Copy pathChanneledStream.hh
127 lines (117 loc) · 3.34 KB
/
ChanneledStream.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#pragma once
#include <unordered_map>
#include <elle/protocol/Channel.hh>
#include <elle/protocol/Stream.hh>
#include <elle/protocol/fwd.hh>
namespace elle
{
namespace protocol
{
/// A Stream, in charge of multiplexing communications.
///
/// When writing in a ChanneledStream, the packet is assigned to a Channel,
/// uniquely identified. When writing the actual socket, the Channel in
/// charge of the packet will identify the packet.
/// On the other side of the socket, the other ChanneledStream will create a
/// Channel with the same number, allowing the two owners of each side of
/// the socket to communicate through the same socket. Multiplexing and
/// demultiplexing will be transparent for the user.
///
/// @code{.cc}
///
/// // Consider two peers, connected by an arbitrary socket s.
///
/// // Bob creates two Channels.
/// auto serializer =elle::protocol::Serializer(s);
/// auto channel_stream = elle::protocol::ChanneledStream(serializer);
/// auto c1 = elle::protocol::Channel(channel_stream);
/// auto c2 = elle::protocol::Channel(channel_stream);
///
/// // Alice, on the other side can get those two Channels, via accept.
/// elle::protocol::Serializer serializer(s);
/// elle::protocol::ChanneledStream channel_stream(serializer);
/// auto channel = channel_stream.accept();
/// auto channel2 = channel_stream.accept();
///
/// @endcode
class ChanneledStream
: public Stream
{
/*------.
| Types |
`------*/
public:
using Self = ChanneledStream;
using Super = Stream;
using Channels = std::unordered_map<int, Channel*>;
/*-------------.
| Construction |
`-------------*/
public:
ChanneledStream(elle::reactor::Scheduler& scheduler, Stream& backend);
ChanneledStream(Stream& backend);
virtual
~ChanneledStream();
private:
void
_read_thread();
ELLE_ATTRIBUTE(Stream&, backend);
ELLE_ATTRIBUTE(reactor::Thread::unique_ptr, thread);
ELLE_ATTRIBUTE(std::exception_ptr, exception);
/*--------.
| Version |
`--------*/
public:
ELLE_attribute_r(elle::Version, version, override);
/*----.
| IDs |
`----*/
private:
ELLE_ATTRIBUTE(bool, master);
ELLE_ATTRIBUTE(int, id_current);
private:
int
_id_generate();
/// Decide who is the master on our _backend.
/// \return whether is the master.
bool
_handshake();
/*----------.
| Receiving |
`----------*/
public:
/// Wait for an incoming connection.
///
/// The Channel
Channel
accept();
protected:
elle::Buffer
_read() override;
/*--------.
| Sending |
`--------*/
protected:
void
_write(elle::Buffer const& packet) override;
private:
void
_write(elle::Buffer const& packet, int id);
/*----------.
| Printable |
`----------*/
public:
void
print(std::ostream& stream) const override;
/*--------.
| Details |
`--------*/
private:
friend class Channel;
ELLE_ATTRIBUTE(Channels, channels);
ELLE_ATTRIBUTE(reactor::Channel<Channel>, channels_new);
ELLE_ATTRIBUTE(elle::reactor::Signal, channel_available);
ELLE_ATTRIBUTE(Channel, default);
};
}
}