-
Notifications
You must be signed in to change notification settings - Fork 4
/
Bank.scala
144 lines (129 loc) · 4.47 KB
/
Bank.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package reactive5
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.StatusReply
import akka.Done
import com.typesafe.config.ConfigFactory
import reactive5.Client.Init
import scala.util.Failure
import scala.util.Success
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.receptionist.Receptionist
import akka.cluster.typed.Cluster
import akka.actor.typed.pubsub.Topic.Subscribe
object BankAccount {
// potentially you can register multiple actors under one ServiceKey
val BankAccountServiceKey = ServiceKey[Command]("accountService")
trait Command {
def replyTo: ActorRef[Response]
}
case class Deposit(amount: BigInt, replyTo: ActorRef[Response])
extends Command {
require(amount > 0)
}
case class Withdraw(amount: BigInt, replyTo: ActorRef[Response])
extends Command {
require(amount > 0)
}
trait Response
case object Done extends Response
case object Failed extends Response
def apply(balance: BigInt): Behavior[Command] = Behaviors.setup { context =>
context.system.receptionist ! Receptionist.register(
BankAccountServiceKey,
context.self
)
Behaviors.receiveMessage {
case Deposit(amount, replyTo) =>
replyTo ! Done
apply(balance + amount)
case Withdraw(amount, replyTo) if amount <= balance =>
replyTo ! Done
apply(balance - amount)
case c: Command =>
c.replyTo ! Failed
Behaviors.same
}
}
}
object Client {
trait Command
case class Init(replyTo: ActorRef[StatusReply[Done]]) extends Command
private case class BankAccountResponse(response: BankAccount.Response)
extends Command
private case class ListingResponse(listing: Receptionist.Listing)
extends Command
def apply(): Behavior[Command] = Behaviors.setup { context =>
context.log.info(s"Starting up ${context.self}")
val listingResponseAdapter =
context.messageAdapter[Receptionist.Listing](ListingResponse)
Behaviors.receiveMessage { case Init(replyTo) =>
// find registered BankAccount actor
context.system.receptionist ! Receptionist.Find(
BankAccount.BankAccountServiceKey,
listingResponseAdapter
)
context.log.info("Received init")
findingBankActor(replyTo)
}
}
def findingBankActor(
replyTo: ActorRef[StatusReply[Done]]
): Behavior[Command] = Behaviors.setup { context =>
val bankAccountResponseAdapter =
context.messageAdapter[BankAccount.Response](BankAccountResponse)
Behaviors.receiveMessage {
case ListingResponse(
BankAccount.BankAccountServiceKey.Listing(listings)
) =>
listings.foreach( // should be only one such actor
_ ! BankAccount.Deposit(200, bankAccountResponseAdapter)
)
context.log.info(s"BankActor was found $listings")
awaitForBankAccountResponse(replyTo)
}
}
def awaitForBankAccountResponse(
replyTo: ActorRef[StatusReply[Done]]
): Behavior[Command] = Behaviors.setup { context =>
Behaviors.receiveMessage {
case BankAccountResponse(BankAccount.Done) =>
context.log.info(s"Received the success from BankAccount")
replyTo ! StatusReply.Ack
apply()
case BankAccountResponse(BankAccount.Failed) =>
context.log.info(s"Received the failure from BankAccount")
replyTo ! StatusReply.error("Bank operation has failed")
apply()
}
}
}
/** Small demonstration of Akka recepcionist patter based on Akka Cluster
* see: https://doc.akka.io/docs/akka/current/typed/actor-discovery.html
*/
object BankApp extends App {
val config = ConfigFactory.load()
val remoteBankAccount =
ActorSystem(
BankAccount(0),
"Reactive5",
config.getConfig("serverapp").withFallback(config)
)
// can be spawned on separate node
val remoteClient = ActorSystem(
Client(),
"Reactive5",
config.getConfig("clientapp").withFallback(config)
)
import akka.actor.typed.scaladsl.AskPattern._
Thread.sleep(10000) // wait for the cluster to form itself
remoteClient
.askWithStatus[Done](Init)(10.seconds, remoteClient.scheduler)
.onComplete {
case Failure(exception) => exception.printStackTrace(); println("Failure")
case Success(value) => println("Success")
}(scala.concurrent.ExecutionContext.global)
Await.result(remoteBankAccount.whenTerminated, Duration.Inf)
}