forked from PlatformLab/grpc_homa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_listener.cc
116 lines (104 loc) · 3.67 KB
/
test_listener.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include "homa_listener.h"
#include "mock.h"
// This file contains unit tests for homa_listener.cc and homa_listener.h.
class TestListener : public ::testing::Test {
public:
grpc_core::Arena *arena;
HomaListener *lis;
std::vector<HomaStream *> streams;
grpc_stream_refcount refcount;
HomaIncoming msg;
uint32_t msgStreamId;
grpc_closure closure1;
static void closureFunc1(void* arg, grpc_error_handle error) {
int64_t value = reinterpret_cast<int64_t>(arg);
if (error != GRPC_ERROR_NONE) {
Mock::logPrintf("; ", "closure1 invoked with %ld, error %s",
value, grpc_error_string(error));
} else {
Mock::logPrintf("; ", "closure1 invoked with %ld", value);
}
}
TestListener()
: arena(grpc_core::Arena::Create(2000))
, lis(nullptr)
, streams()
, refcount()
, msg(2, true, 100, 0, 0, true, true)
{
gpr_once_init(&HomaListener::shared_once, HomaListener::InitShared);
Mock::setUp();
lis = new HomaListener(nullptr, 4000);
lis->accept_stream_cb = acceptStreamCallback;
lis->accept_stream_data = this;
GRPC_CLOSURE_INIT(&closure1, closureFunc1,
reinterpret_cast<void *>(123), dummy);
}
~TestListener()
{
for (HomaStream *stream: streams) {
delete stream;
}
delete lis;
arena->Destroy();
}
static void acceptStreamCallback(void* fixture, grpc_transport* transport,
const void* initInfo)
{
TestListener *test = reinterpret_cast<TestListener *>(fixture);
HomaListener::StreamInit *init =
static_cast<HomaListener::StreamInit *>(
const_cast<void*>(initInfo));
init->stream = new HomaStream(*init->streamId, test->lis->fd,
&test->refcount, test->arena);
test->streams.push_back(init->stream);
}
};
TEST_F(TestListener, getStream_basics) {
std::optional<grpc_core::MutexLock> lockGuard;
// Id 100: add new stream
msg.streamId.id = 100;
HomaStream *stream1 = lis->getStream(&msg, lockGuard);
EXPECT_EQ(1U, lis->activeRpcs.size());
EXPECT_EQ(100U, stream1->streamId.id);
lockGuard.reset();
// Id 200: add new stream
msg.streamId.id = 200;
HomaStream *stream2 = lis->getStream(&msg, lockGuard);
EXPECT_EQ(2U, lis->activeRpcs.size());
EXPECT_EQ(200U, stream2->streamId.id);
lockGuard.reset();
// Id 100 again
msg.streamId.id = 100;
HomaStream *stream3 = lis->getStream(&msg, lockGuard);
EXPECT_EQ(2U, lis->activeRpcs.size());
EXPECT_EQ(100U, stream3->streamId.id);
EXPECT_EQ(stream1, stream3);
}
TEST_F(TestListener, getStream_noCallback) {
std::optional<grpc_core::MutexLock> lockGuard;
lis->accept_stream_cb = nullptr;
HomaStream *stream1 = lis->getStream(&msg, lockGuard);
EXPECT_EQ(0U, lis->activeRpcs.size());
EXPECT_EQ(nullptr, stream1);
}
TEST_F(TestListener, destroy_stream) {
HomaStream *stream;
grpc_core::ExecCtx execCtx;
{
std::optional<grpc_core::MutexLock> lockGuard;
msg.streamId.id = 100;
stream = lis->getStream(&msg, lockGuard);
EXPECT_EQ(1U, lis->activeRpcs.size());
EXPECT_EQ(100U, stream->streamId.id);
ASSERT_EQ(1U, streams.size());
ASSERT_EQ(stream, streams[0]);
}
HomaListener::destroy_stream(&lis->transport,
reinterpret_cast <grpc_stream*>(stream), &closure1);
free(stream);
streams.clear();
execCtx.Flush();
EXPECT_EQ(0U, lis->activeRpcs.size());
EXPECT_STREQ("closure1 invoked with 123", Mock::log.c_str());
}