Skip to content

Commit

Permalink
[#43] Example App: implement demo case for server streaming / client …
Browse files Browse the repository at this point in the history
…unary calls: extend chat.proto, add clientSS, fix README
  • Loading branch information
igor-kovalchuk committed Dec 18, 2019
1 parent b2d41a2 commit 7ef3335
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 58 deletions.
12 changes: 9 additions & 3 deletions grpc-kotlin-example-chatserver/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
# grpc-kotlin-example-chatserver

A simple command line chat server written using bidirectional gRPC.
A simple command line chat server written using both bidirectional & server streaming gRPC.

Build the parent project. From the repo root, run

```sh
mvn package
./mvnw clean package
```

Start the server
Expand All @@ -14,12 +14,18 @@ Start the server
java -jar grpc-kotlin-example-chatserver/target/grpc-kotlin-example-chatserver.jar server
```

From another shell, start a client
From another shell, start a bidirectional streaming client

```sh
java -jar grpc-kotlin-example-chatserver/target/grpc-kotlin-example-chatserver.jar client
```

From the third shell, start a server streaming client

```sh
java -jar grpc-kotlin-example-chatserver/target/grpc-kotlin-example-chatserver.jar clientSS
```

---

Big thanks to [Björn Hegerfors](https://github.com/Bj0rnen) and [Emilio Del Tessandoro](https://github.com/emmmile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* -\-\-
* simple-kotlin-standalone-example
* --
* Copyright (C) 2016 - 2018 rouz.io
* Copyright (C) 2016 - 2019 rouz.io
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,86 +24,120 @@ import com.google.protobuf.Empty
import com.google.protobuf.Timestamp
import io.grpc.Status
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.launch
import java.time.Instant
import java.util.concurrent.ConcurrentSkipListMap

@UseExperimental(ExperimentalCoroutinesApi::class)
class ChatService : ChatServiceImplBase() {

data class Client(val name: String, val channel: SendChannel<ChatMessageFromService>)

private val clientChannels = LinkedHashSet<Client>()
private val clientChannels = ConcurrentSkipListMap<String, SendChannel<ChatMessageFromService>>()

override suspend fun getNames(request: Empty): ChatRoom {
return ChatRoom.newBuilder()
.addAllNames(clientChannels.map(Client::name))
.build()
.addAllNames(clientChannels.keys)
.build()
}

override fun chat(requests: ReceiveChannel<ChatMessage>): ReceiveChannel<ChatMessageFromService> {
val channel = Channel<ChatMessageFromService>(Channel.UNLIMITED)
channel.invokeOnClose {
private fun createChannel() = Channel<ChatMessageFromService>(100).apply {
invokeOnClose {
it?.printStackTrace()
}
println("New client connection: $channel")

launch {
// wait for first message
val hello = requests.receive()
val name = hello.from
val client = Client(name, channel)
clientChannels.add(client)

try {
for (chatMessage in requests) {
println("Got request from $requests:")
println(chatMessage)
val message = createMessage(chatMessage)
clientChannels
.filter { it.name != chatMessage.from }
.forEach { other ->
println("Sending to $other")
other.channel.send(message)
}
}
} catch (t: Throwable) {
println("Threw $t")
if (Status.fromThrowable(t).code != Status.Code.CANCELLED) {
println("An actual error occurred")
t.printStackTrace()
}

private fun subscribe(name: String, ch: SendChannel<ChatMessageFromService>) {
println("New client connected: $name")
clientChannels.put(name, ch)
?.apply {
println("Close duplicate channel of user: $name")
close()
}
} finally {
println("$name hung up. Removing client channel")
clientChannels.remove(client)
if (!channel.isClosedForSend) {
channel.close()
}

private suspend fun broadcast(message: ChatMessage) = createMessage(message)
.let { broadcastMessage ->
println("Broadcast ${message.from}: ${message.message}")

clientChannels.asSequence()
.filterNot { (name, _) -> name == message.from }
.forEach { (other, ch) ->
launch {
try {
println("Sending to $other")
ch.send(broadcastMessage)
} catch (e: Throwable) {
println("$other hung up: ${e.message}. Removing client channel")
clientChannels.remove(other)?.close(e)
}
}
}
}


override fun chat(requests: ReceiveChannel<ChatMessage>): ReceiveChannel<ChatMessageFromService> =
createChannel().also {
GlobalScope.launch {
doChat(requests, it)
}
}

private suspend fun doChat(req: ReceiveChannel<ChatMessage>, resp: SendChannel<ChatMessageFromService>) {
val hello = req.receive()
subscribe(hello.from, resp)
broadcast(hello)

try {
for (chatMessage in req) {
println("Got request from $req:")
println(chatMessage)
broadcast(chatMessage)
}
} catch (t: Throwable) {
println("Threw $t")
if (Status.fromThrowable(t).code != Status.Code.CANCELLED) {
println("An actual error occurred")
t.printStackTrace()
}
} finally {
println("${hello.from} hung up. Removing client channel")
clientChannels.remove(hello.from)
if (!resp.isClosedForSend) {
resp.close()
}
}
}

return channel
override suspend fun say(request: ChatMessage): Empty = Empty.getDefaultInstance().also {
broadcast(request)
}

override fun listen(request: WhoAmI): ReceiveChannel<ChatMessageFromService> = createChannel().also {
subscribe(request.name, it)
}

fun shutdown() {
println("Shutting down Chat service")
clientChannels.stream().forEach { client ->
clientChannels.forEach { (client, channel) ->
println("Closing client channel $client")
client.channel.close()
channel.close()
}
clientChannels.clear()
}

private fun createMessage(request: ChatMessage): ChatMessageFromService {
return ChatMessageFromService.newBuilder()
.setTimestamp(
Timestamp.newBuilder()
.setSeconds(System.nanoTime() / 1000000000)
.setNanos((System.nanoTime() % 1000000000).toInt())
.build()
)
.setMessage(request)
.build()
}
private fun createMessage(request: ChatMessage) = ChatMessageFromService.newBuilder()
.run {
timestamp = Instant.now().run {
Timestamp.newBuilder().run {
seconds = epochSecond
nanos = nano
build()
}
}
message = request
build()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ fun main(args: Array<String>) {
when(args[0]) {
"server" -> grpcServer()
"client" -> chatClient()
"clientSS" -> serverStreamingChatClient()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*-
* -\-\-
* simple-kotlin-standalone-example
* --
* Copyright (C) 2016 - 2019 rouz.io
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package io.rouz.grpc.examples.chat

import io.grpc.ManagedChannelBuilder
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.util.concurrent.TimeUnit

fun main() = serverStreamingChatClient()

fun serverStreamingChatClient() {
val channel = ManagedChannelBuilder.forAddress("localhost", 15001)
.usePlaintext()
.build()

val chatService = ChatServiceGrpc.newStub(channel)

println("type :q to quit")
print("Enter your name: ")
val from = readLine()

val listen = chatService.listen(WhoAmI.newBuilder().setName(from).build())

runBlocking(Dispatchers.IO) {
launch {
startPrintLoop(listen)
}

try {
while (true) {
print("Message: ")
val message = readLine()
if (message == null || message == ":q") {
break
}
chatService.say(
ChatMessage.newBuilder()
.setFrom(from)
.setMessage(message)
.build()
)
}
} finally {
println("closing")
channel.shutdownNow().awaitTermination(1, TimeUnit.SECONDS)
println("closed")
}
}
}

private suspend fun startPrintLoop(chat: ReceiveChannel<ChatMessageFromService>) =
try {
for (responseMessage in chat) {
val message = responseMessage.message
println("${message.from}: ${message.message}")
}
println("Server disconnected")
} catch (e: Throwable) {
println("Server disconnected badly: $e")
}
7 changes: 7 additions & 0 deletions grpc-kotlin-example-chatserver/src/main/proto/chat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@ option java_package = "io.rouz.grpc.examples.chat";
service ChatService {
rpc Chat (stream ChatMessage) returns (stream ChatMessageFromService);
rpc GetNames (google.protobuf.Empty) returns (ChatRoom);

rpc Say(ChatMessage) returns (google.protobuf.Empty);
rpc Listen(WhoAmI) returns (stream ChatMessageFromService);
}

message ChatRoom {
repeated string names = 1;
}

message WhoAmI {
string name = 1;
}

message ChatMessage {
string from = 1;
string message = 2;
Expand Down

0 comments on commit 7ef3335

Please sign in to comment.