Skip to content

Commit

Permalink
doc: reduce db connections with SliceRangeShardAllocationStrategy (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw authored Jul 1, 2024
1 parent cc4aa83 commit ae9f862
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 28 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.persistence.r2dbc.TestActors.DurableStatePersister
import akka.persistence.r2dbc.TestActors.Persister
import akka.persistence.r2dbc.internal.codec.PayloadCodec
import akka.persistence.r2dbc.internal.codec.PayloadCodec.RichRow
import akka.serialization.jackson.JsonSerializable

/**
* The purpose of this test is to verify JSONB payloads, but it can also be run with ordinary BYTEA payloads. To test
Expand Down
6 changes: 0 additions & 6 deletions core/src/test/scala/akka/persistence/r2dbc/TestConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,6 @@ object TestConfig {
refresh-interval = 1s
}
}
akka.actor {
serialization-bindings {
"akka.persistence.r2dbc.CborSerializable" = jackson-cbor
"akka.persistence.r2dbc.JsonSerializable" = jackson-json
}
}
akka.actor.testkit.typed.default-timeout = 10s
"""))
.withFallback(defaultConfig)
Expand Down
26 changes: 26 additions & 0 deletions docs/src/main/paradox/data-partition.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,32 @@ Each data partition corresponds to a table. You can copy the DDL statements for
data partition suffix. For example `event_journal_0`, `event_journal_0_slice_idx`, `event_journal_1`, `event_journal_1_slice_idx`.
Note that the index must also reference the parent table with same data partition suffix.

## Reducing number of database connections

When using the @extref:[default allocation strategy for Akka Cluster Sharding](akka:typed/cluster-sharding.html#shard-allocation)
the there is no correlation between the slice of the entity and to which node the entity will be allocated. That means
that there will be database connections from an Akka node to each of the databases. With a large Akka cluster each
database would have to handle many connections, maybe more than its connection limit. That would be an inefficient
use of resources on both the Akka side and the databases.

To reduce number of connections you can change the allocation strategy to @apidoc[SliceRangeShardAllocationStrategy].
It will collocate entities with the same slice and contiguous range of slices to the same Akka node. Thereby
the connections from one Akka node will go to one or a few databases since the database sharding is based on
slice ranges.

Java
: @@snip [ShardingDocExample](/docs/src/test/java/jdocs/home/sharding/ShardingDocExample.java) { #sharding-init }

Scala
: @@snip [ShardingDocExample](/docs/src/test/scala/docs/home/sharding/ShardingDocExample.scala) { #sharding-init }


Note that `SliceRangeShardAllocationStrategy` also requires change of the message extractor to
@apidoc[SliceRangeShardAllocationStrategy.ShardBySliceMessageExtractor].

Do not change shard allocation strategy in a rolling update. The cluster must be fully stopped and then started again
when changing to a different allocation strategy.

## Changing data partitions

The configuration of data partitions and databases **must not** be changed in a rolling update, since the data must
Expand Down
41 changes: 41 additions & 0 deletions docs/src/test/java/jdocs/home/sharding/ShardingDocExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.home.sharding;

//#sharding-init
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.cluster.sharding.typed.SliceRangeShardAllocationStrategy;
import akka.cluster.sharding.typed.javadsl.ClusterSharding;
import akka.cluster.sharding.typed.javadsl.Entity;
import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
import akka.persistence.Persistence;

//#sharding-init

public class ShardingDocExample {
public static class DeviceEntity {
public interface Command {
}

public static final EntityTypeKey<Command> ENTITY_TYPE_KEY =
EntityTypeKey.create(Command.class, "device");

public static Behavior<Command> create(String deviceId) {
return null;
}
}

public static void example() {
ActorSystem<?> system = null;

//#sharding-init
ClusterSharding.get(system).init(Entity.of(DeviceEntity.ENTITY_TYPE_KEY, entityContext ->
DeviceEntity.create(entityContext.getEntityId()))
.withMessageExtractor(new SliceRangeShardAllocationStrategy.ShardBySliceMessageExtractor<>(
DeviceEntity.ENTITY_TYPE_KEY.name(), Persistence.get(system)))
.withAllocationStrategy(new SliceRangeShardAllocationStrategy(10, 0.1)));
//#sharding-init
}
}
7 changes: 0 additions & 7 deletions docs/src/test/scala/docs/home/CborSerializable.scala

This file was deleted.

38 changes: 38 additions & 0 deletions docs/src/test/scala/docs/home/sharding/ShardingDocExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.home.sharding

//#sharding-init
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.SliceRangeShardAllocationStrategy
import akka.cluster.sharding.typed.SliceRangeShardAllocationStrategy.ShardBySliceMessageExtractor
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.persistence.Persistence

//#sharding-init

object ShardingDocExample {
object DeviceEntity {
sealed trait Command

val EntityKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("DeviceEntity")

def apply(deviceId: String): Behavior[Command] = ???
}

val system: ActorSystem[_] = ???

//#sharding-init
ClusterSharding(system).init(
Entity(DeviceEntity.EntityKey)(entityContext => DeviceEntity(entityContext.entityId))
.withMessageExtractor(
new ShardBySliceMessageExtractor[DeviceEntity.Command](DeviceEntity.EntityKey.name, Persistence(system)))
.withAllocationStrategy(new SliceRangeShardAllocationStrategy(10, 0.1)))
//#sharding-init

}
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,6 @@ object Dependencies {
// r2dbcPostgres is already a transitive dependency from core, but
// sometimes sbt doesn't understand that ¯\_(ツ)_/¯
r2dbcPostgres,
TestDeps.akkaPersistenceTyped)
TestDeps.akkaPersistenceTyped,
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion)
}

0 comments on commit ae9f862

Please sign in to comment.