Skip to content

Commit

Permalink
Drones completing charging after time span in Java samples
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 8, 2023
1 parent fbb736a commit ae0f114
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 7 deletions.
2 changes: 1 addition & 1 deletion samples/grpc/local-drone-control-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.9.1-M1</akka.version>
<akka-projection.version>1.5.1-M1-18-e191ed90-SNAPSHOT</akka-projection.version>
<akka-projection.version>1.5.1-M1-19-60026c44-SNAPSHOT</akka-projection.version>
<akka-persistence-r2dbc.version>1.2.0</akka-persistence-r2dbc.version>
<akka-management.version>1.5.0</akka-management.version>
<akka-diagnostics.version>2.1.0</akka-diagnostics.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.TimerScheduler;
import akka.pattern.StatusReply;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import akka.projection.grpc.consumer.ConsumerFilter;
Expand Down Expand Up @@ -265,8 +266,10 @@ public CommandHandler<Command, Event, State> commandHandler() {
.onCommand(StartCharging.class, this::handleStartCharging)
.onCommand(
CompleteCharging.class,
completeCharging ->
Effect().persist(new ChargingCompleted(completeCharging.droneId)))
completeCharging -> {
context.getLog().info("Drone {} completed charging", completeCharging.droneId);
return Effect().persist(new ChargingCompleted(completeCharging.droneId));
})
.onCommand(
GetState.class,
(state, getState) -> Effect().reply(getState.replyTo, StatusReply.success(state)));
Expand Down Expand Up @@ -363,9 +366,38 @@ public EventHandler<State, Event> eventHandler() {
return noStateHandler.orElse(initializedStateHandler).build();
}

@Override
public SignalHandler<State> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
RecoveryCompleted.class, (state, recoveryCompleted) -> handleRecoveryCompleted(state))
.build();
}

@Override
public Set<String> tagsFor(State state, Event event) {
if (state == null) return Set.of();
else return Set.of("t:" + state.stationLocationId);
}

private void handleRecoveryCompleted(State state) {
// FIXME this scheme is not quite reliable, because station is not remembered/restarted
// until next new event/command if edge system is shut down/restarted
// Complete or set up timers for completion for drones charging,
// but only if the charging was initiated in this replica
var now = Instant.now();
if (state != null) {
state.dronesCharging.stream()
.filter(d -> d.replicaId.equals(getReplicationContext().replicaId().id()))
.forEach(
chargingDrone -> {
if (chargingDrone.chargingDone.isBefore(now))
context.getSelf().tell(new CompleteCharging(chargingDrone.droneId));
else
timers.startSingleTimer(
new CompleteCharging(chargingDrone.droneId),
durationUntil(chargingDrone.chargingDone));
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.9.1-M1</akka.version>
<akka-projection.version>1.5.1-M1-18-e191ed90-SNAPSHOT</akka-projection.version>
<akka-projection.version>1.5.1-M1-19-60026c44-SNAPSHOT</akka-projection.version>
<akka-persistence-r2dbc.version>1.2.0</akka-persistence-r2dbc.version>
<akka-management.version>1.5.0</akka-management.version>
<akka-diagnostics.version>2.1.0</akka-diagnostics.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.TimerScheduler;
import akka.pattern.StatusReply;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import akka.projection.grpc.consumer.ConsumerFilter;
Expand All @@ -25,7 +26,7 @@

public class ChargingStation
extends ReplicatedEventSourcedBehavior<
ChargingStation.Command, ChargingStation.Event, ChargingStation.State> {
ChargingStation.Command, ChargingStation.Event, ChargingStation.State> {

// commands and replies
public interface Command extends CborSerializable {}
Expand Down Expand Up @@ -265,8 +266,10 @@ public CommandHandler<Command, Event, State> commandHandler() {
.onCommand(StartCharging.class, this::handleStartCharging)
.onCommand(
CompleteCharging.class,
completeCharging ->
Effect().persist(new ChargingCompleted(completeCharging.droneId)))
completeCharging -> {
context.getLog().info("Drone {} completed charging", completeCharging.droneId);
return Effect().persist(new ChargingCompleted(completeCharging.droneId));
})
.onCommand(
GetState.class,
(state, getState) -> Effect().reply(getState.replyTo, StatusReply.success(state)));
Expand Down Expand Up @@ -363,9 +366,38 @@ public EventHandler<State, Event> eventHandler() {
return noStateHandler.orElse(initializedStateHandler).build();
}

@Override
public SignalHandler<State> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(
RecoveryCompleted.class, (state, recoveryCompleted) -> handleRecoveryCompleted(state))
.build();
}

@Override
public Set<String> tagsFor(State state, Event event) {
if (state == null) return Set.of();
else return Set.of("t:" + state.stationLocationId);
}

private void handleRecoveryCompleted(State state) {
// FIXME this scheme is not quite reliable, because station is not remembered/restarted
// until next new event/command if edge system is shut down/restarted
// Complete or set up timers for completion for drones charging,
// but only if the charging was initiated in this replica
var now = Instant.now();
if (state != null) {
state.dronesCharging.stream()
.filter(d -> d.replicaId.equals(getReplicationContext().replicaId().id()))
.forEach(
chargingDrone -> {
if (chargingDrone.chargingDone.isBefore(now))
context.getSelf().tell(new CompleteCharging(chargingDrone.droneId));
else
timers.startSingleTimer(
new CompleteCharging(chargingDrone.droneId),
durationUntil(chargingDrone.chargingDone));
});
}
}
}

0 comments on commit ae0f114

Please sign in to comment.