-
Notifications
You must be signed in to change notification settings - Fork 0
/
RegistrySpec.groovy
179 lines (152 loc) · 4.81 KB
/
RegistrySpec.groovy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package dev.yavuztas.cap.capsource.registry
import dev.yavuztas.cap.capsource.config.properties.RegistryProperties
import dev.yavuztas.cap.capsource.feed.FeedConsumer
import dev.yavuztas.cap.capsource.feed.FeedData
import dev.yavuztas.cap.capsource.feed.FeedSupplier
import dev.yavuztas.cap.capsource.feed.RawFeedData
import io.vertx.core.Vertx
import io.vertx.core.VertxOptions
import io.vertx.core.net.NetSocket
import org.jetbrains.annotations.NotNull
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import spock.lang.Specification
import spock.util.concurrent.AsyncConditions
import java.nio.charset.StandardCharsets
import java.util.concurrent.Executors
import java.util.concurrent.ScheduledExecutorService
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import java.util.function.BiConsumer
import java.util.function.Consumer
class RegistrySpec extends Specification {
static Logger log = LoggerFactory.getLogger(RegistrySpec)
Vertx vertx
Vertx clientVertx
Registry registry
ScheduledExecutorService writeThread
def setup() {
writeThread = Executors.newScheduledThreadPool(1, r -> new Thread(r, "write-thread"))
clientVertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(1))
vertx = Vertx.vertx(new VertxOptions().setEventLoopPoolSize(8))
def props = new RegistryProperties('localhost', 7000, 4)
def mockSupplier = new FeedSupplier() {
AtomicLong writeIndex = new AtomicLong(1L)
@Override
void addConsumer(@NotNull FeedConsumer consumer) {
// consume periodic
writeThread.scheduleAtFixedRate({ consumer.consume(this) }, 0, 200, TimeUnit.MILLISECONDS)
}
@Override
long writeIndex() {
return writeIndex.get()
}
@Override
FeedData get(long readIndex) {
return null
}
@Override
long forEach(long readIndex, long amount, @NotNull Consumer<FeedData> action) {
long to = Math.min(writeIndex.get(), amount)
readIndex = readIndex < 0 ? 0 : readIndex
(to - readIndex).times {
action.accept(new RawFeedData("message-${writeIndex.get()}"))
}
writeIndex.incrementAndGet()
return to
}
@Override
long forEachRemaining(long readIndex, @NotNull Consumer<FeedData> action) {
return 0
}
}
registry = new Registry(props, vertx, [mockSupplier])
def ac = new AsyncConditions(1)
def future = registry.init()
future.onComplete(r -> {
ac.evaluate { assert r.succeeded() }
})
ac.await()
}
def cleanup() {
registry.destroy()
writeThread.shutdown()
clientVertx.close()
}
NetSocket createClient(BiConsumer<NetSocket, String> onMessage) {
def ac = new AsyncConditions(1)
NetSocket socket = null
def client = clientVertx.createNetClient()
client.connect(7000, "localhost", r -> {
if (r.succeeded()) {
socket = r.result()
ac.evaluate { assert true }
} else {
log.info("Failed to connect: ${r.cause().getMessage()}")
}
})
ac.await()
socket.handler(buffer -> {
// consume buffer
int pos = 0
while (pos < buffer.length()) {
def messageSize = buffer.getUnsignedShort(pos)
pos += 2
def message = buffer.getString(pos, pos + messageSize, StandardCharsets.US_ASCII.name())
pos += messageSize
// delegate to consumer
onMessage.accept(socket, message)
}
})
return socket
}
def 'test connect clients'() {
when:
def ac = new AsyncConditions(10)
def clients = []
for (i in 0..<2) {
clients.add(createClient((s,m) -> {
log.info("client#${s.localAddress()} got message: ${m}")
ac.evaluate { assert true }
}))
}
then:
ac.await(5)
cleanup:
clients.each { (it as NetSocket).close() }
}
def 'test client message consume order'() {
given:
def order = new AtomicInteger(0)
when:
def ac = new AsyncConditions(10)
def client = createClient((s,m) -> {
log.info("client#${s.localAddress()} got message: ${m}")
def next = Integer.parseInt(m.split("-")[1])
if (next - order.getAndSet(next) > 1) {
throw new IllegalStateException("Client recieved message is not in order")
}
ac.evaluate { assert true }
})
then:
ac.await(5)
noExceptionThrown()
cleanup:
client.close()
}
def 'test slow client'() {
when:
def clientReadIndex = new AtomicLong(0)
def ac = new AsyncConditions(5)
def client = createClient((s,m) -> {
Thread.sleep(1000)
log.info("client#${s.localAddress()} readIndex: ${clientReadIndex.incrementAndGet()}, message: ${m}")
ac.evaluate { assert true }
})
then:
ac.await(15)
cleanup:
client.close()
}
}