Skip to content

Commit

Permalink
chore: bump to akka 2.10.0-M1
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastian-alfers committed Oct 4, 2024
1 parent 929524e commit eb1d348
Show file tree
Hide file tree
Showing 101 changed files with 531 additions and 553 deletions.
2 changes: 1 addition & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ updates.ignore = [
{ groupId = "com.typesafe.slick" }

{groupId = "com.fasterxml.jackson.core" }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.2." }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.5." }
]

updatePullRequests = false
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import akka.stream.javadsl.Source;
import org.junit.*;
import org.scalatestplus.junit.JUnitSuite;
import scala.compat.java8.FutureConverters;
import scala.jdk.FutureConverters;
import scala.concurrent.Await;

import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.projection.cassandra

import java.time.Instant
import java.util.UUID
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ object CassandraProjection {
* before the system is started.
*/
def createTablesIfNotExists(system: ActorSystem[_]): CompletionStage[Done] = {
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
val offsetStore = new CassandraOffsetStore(system)
offsetStore.createKeyspaceAndTable().toJava
offsetStore.createKeyspaceAndTable().asJava
}

@deprecated("Renamed to createTablesIfNotExists", "1.2.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@ package akka.projection.internal
import java.nio.charset.StandardCharsets
import java.util.Base64
import java.util.UUID

import scala.collection.immutable

import akka.actor.ExtendedActorSystem
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.persistence.query
import akka.projection.MergeableOffset
import akka.projection.ProjectionId
import akka.serialization.SerializerWithStringManifest
import akka.util.unused
import org.scalatest.wordspec.AnyWordSpecLike

import scala.annotation.unused

object OffsetSerializationSpec {
class TestSerializer(@unused system: ExtendedActorSystem) extends SerializerWithStringManifest {
def identifier: Int = 9999
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package akka.projection

import scala.concurrent.duration.FiniteDuration
import scala.jdk.DurationConverters._

import akka.annotation.InternalApi
import akka.util.JavaDurationConverters._

/**
* Error handling strategy when processing an `Envelope` fails. The default is defined in configuration .
Expand Down Expand Up @@ -45,7 +45,7 @@ object HandlerRecoveryStrategy {
* and fail the stream if all attempts fail.
*/
def retryAndFail(retries: Int, delay: java.time.Duration): HandlerRecoveryStrategy =
retryAndFail(retries, delay.asScala)
retryAndFail(retries, delay.toScala)

/**
* Scala API: If the first attempt to invoke the handler fails it will retry invoking the handler with the
Expand All @@ -61,7 +61,7 @@ object HandlerRecoveryStrategy {
* discard the element and continue with next if all attempts fail.
*/
def retryAndSkip(retries: Int, delay: java.time.Duration): HandlerRecoveryStrategy =
retryAndSkip(retries, delay.asScala)
retryAndSkip(retries, delay.toScala)

/**
* INTERNAL API: placed here instead of the `internal` package because of sealed trait
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
*/

package akka.projection
import akka.actor.typed.scaladsl.LoggerOps
import scala.util.Failure
import scala.util.Success

Expand Down Expand Up @@ -78,7 +77,7 @@ object ProjectionBehavior {
projection.actorHandlerInit[Any].foreach { init =>
val ref = ctx.spawnAnonymous(Behaviors.supervise(init.behavior).onFailure(SupervisorStrategy.restart))
init.setActor(ref)
ctx.log.debug2("Started actor handler [{}] for projection [{}]", ref, projection.projectionId)
ctx.log.debug("Started actor handler [{}] for projection [{}]", ref, projection.projectionId)
}
val running = projection.run()(ctx.system)
if (running.isInstanceOf[RunningProjectionManagement[_]])
Expand Down Expand Up @@ -133,7 +132,7 @@ object ProjectionBehavior {
running match {
case mgmt: RunningProjectionManagement[Offset] @unchecked =>
if (setOffset.projectionId == projectionId) {
context.log.info2(
context.log.info(
"Offset will be changed to [{}] for projection [{}]. The Projection will be restarted.",
setOffset.offset,
projectionId)
Expand All @@ -146,7 +145,7 @@ object ProjectionBehavior {
}

case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed with: {}", op, exc)
context.log.warn("Operation [{}] failed with: {}", op, exc)
Behaviors.same

case isPaused: IsPaused =>
Expand All @@ -170,7 +169,7 @@ object ProjectionBehavior {
running match {
case mgmt: RunningProjectionManagement[_] =>
if (setPaused.projectionId == projectionId) {
context.log.info2(
context.log.info(
"Running state will be changed to [{}] for projection [{}].",
if (setPaused.paused) "paused" else "resumed",
projectionId)
Expand Down Expand Up @@ -207,7 +206,7 @@ object ProjectionBehavior {
Behaviors.same

case SetOffsetResult(replyTo) =>
context.log.info2(
context.log.info(
"Starting projection [{}] after setting offset to [{}]",
projection.projectionId,
setOffset.offset)
Expand All @@ -216,7 +215,7 @@ object ProjectionBehavior {
stashBuffer.unstashAll(started(running))

case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed.", op, exc)
context.log.warn("Operation [{}] failed.", op, exc)
// start anyway, but no reply
val running = projection.run()(context.system)
stashBuffer.unstashAll(started(running))
Expand All @@ -233,7 +232,7 @@ object ProjectionBehavior {
Behaviors.stopped

case other =>
context.log.debug2("Projection [{}] is being stopped. Discarding [{}].", projectionId, other)
context.log.debug("Projection [{}] is being stopped. Discarding [{}].", projectionId, other)
Behaviors.unhandled
}

Expand All @@ -255,7 +254,7 @@ object ProjectionBehavior {
Behaviors.same

case SetPausedResult(replyTo) =>
context.log.info2(
context.log.info(
"Starting projection [{}] in {} mode.",
projection.projectionId,
if (setPaused.paused) "paused" else "resumed")
Expand All @@ -264,7 +263,7 @@ object ProjectionBehavior {
stashBuffer.unstashAll(started(running))

case ManagementOperationException(op, exc) =>
context.log.warn2("Operation [{}] failed.", op, exc)
context.log.warn("Operation [{}] failed.", op, exc)
// start anyway, but no reply
val running = projection.run()(context.system)
stashBuffer.unstashAll(started(running))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.projection

import scala.collection.immutable

import akka.util.ccompat.JavaConverters._
import scala.jdk.CollectionConverters._

object ProjectionId {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
package akka.projection.internal

import scala.collection.immutable
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

import akka.Done
import akka.annotation.InternalApi
import akka.projection.javadsl
import akka.projection.scaladsl
import akka.util.ccompat.JavaConverters._

/**
* INTERNAL API
Expand All @@ -33,14 +33,14 @@ import akka.util.ccompat.JavaConverters._
extends scaladsl.Handler[Envelope] {

override def process(envelope: Envelope): Future[Done] = {
delegate.process(envelope).toScala
delegate.process(envelope).asScala
}

override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala

}

Expand All @@ -52,14 +52,14 @@ import akka.util.ccompat.JavaConverters._
extends scaladsl.Handler[immutable.Seq[Envelope]] {

override def process(envelopes: immutable.Seq[Envelope]): Future[Done] = {
delegate.process(envelopes.asJava).toScala
delegate.process(envelopes.asJava).asScala
}

override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala

}

Expand All @@ -76,14 +76,14 @@ private[projection] class HandlerLifecycleAdapter(delegate: javadsl.HandlerLifec
* is restarted after a failure.
*/
override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

/**
* Invoked when the projection has been stopped. Can be overridden to implement resource
* cleanup. It is also called when the `Projection` is restarted after a failure.
*/
override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala
}

/**
Expand All @@ -96,12 +96,12 @@ private[projection] class HandlerLifecycleAdapter(delegate: javadsl.HandlerLifec
override private[projection] def behavior = delegate.behavior

override final def process(envelope: Envelope): Future[Done] =
delegate.process(getActor(), envelope).toScala
delegate.process(getActor(), envelope).asScala

override def start(): Future[Done] =
delegate.start().toScala
delegate.start().asScala

override def stop(): Future[Done] =
delegate.stop().toScala
delegate.stop().asScala

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package akka.projection.internal

import akka.NotUsed
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.persistence.query.typed.EventEnvelope
import akka.persistence.query.typed.scaladsl.EventTimestampQuery
import akka.persistence.query.typed.scaladsl.LoadEventQuery
Expand All @@ -19,9 +18,9 @@ import java.time.Instant
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import scala.concurrent.Future
import scala.jdk.FutureConverters._
import scala.jdk.OptionConverters._
import scala.concurrent.{ ExecutionContext, Future }

@InternalApi private[projection] object JavaToScalaBySliceSourceProviderAdapter {
def apply[Offset, Envelope](
Expand All @@ -47,11 +46,11 @@ private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val ec = scala.concurrent.ExecutionContext.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.toJava)(ec).asJava
}
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
delegate.source(offsetAdapter).asScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)
Expand All @@ -72,11 +71,11 @@ private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
def source(offset: () => Future[Option[Offset]]): Future[Source[Envelope, NotUsed]] = {
// the parasitic context is used to convert the Optional to Option and a java streams Source to a scala Source,
// it _should_ not be used for the blocking operation of getting offsets themselves
val ec = akka.dispatch.ExecutionContexts.parasitic
val ec = scala.concurrent.ExecutionContext.parasitic
val offsetAdapter = new Supplier[CompletionStage[Optional[Offset]]] {
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.asJava)(ec).toJava
override def get(): CompletionStage[Optional[Offset]] = offset().map(_.toJava)(ec).asJava
}
delegate.source(offsetAdapter).toScala.map(_.asScala)(ec)
delegate.source(offsetAdapter).asScala.map(_.asScala)(ec)
}

def extractOffset(envelope: Envelope): Offset = delegate.extractOffset(envelope)
Expand All @@ -92,7 +91,7 @@ private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
override def timestampOf(persistenceId: String, sequenceNr: Long): Future[Option[Instant]] =
delegate match {
case timestampQuery: akka.persistence.query.typed.javadsl.EventTimestampQuery =>
timestampQuery.timestampOf(persistenceId, sequenceNr).toScala.map(_.asScala)(ExecutionContexts.parasitic)
timestampQuery.timestampOf(persistenceId, sequenceNr).asScala.map(_.toScala)(ExecutionContext.parasitic)
case _ =>
Future.failed(
new IllegalArgumentException(
Expand All @@ -103,7 +102,7 @@ private[projection] class JavaToScalaSourceProviderAdapter[Offset, Envelope](
override def loadEnvelope[Event](persistenceId: String, sequenceNr: Long): Future[EventEnvelope[Event]] =
delegate match {
case timestampQuery: akka.persistence.query.typed.javadsl.LoadEventQuery =>
timestampQuery.loadEnvelope[Event](persistenceId, sequenceNr).toScala
timestampQuery.loadEnvelope[Event](persistenceId, sequenceNr).asScala
case _ =>
Future.failed(
new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ package akka.projection.internal

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.jdk.DurationConverters._

import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi
import akka.projection.HandlerRecoveryStrategy
import akka.projection.Projection
import akka.stream.RestartSettings
import akka.util.JavaDurationConverters._
import com.typesafe.config.Config

/**
Expand Down Expand Up @@ -102,24 +102,24 @@ private object RecoveryStrategyConfig {
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double): ProjectionImpl =
withRestartBackoffSettings(RestartSettings(minBackoff.asScala, maxBackoff.asScala, randomFactor))
withRestartBackoffSettings(RestartSettings(minBackoff.toScala, maxBackoff.toScala, randomFactor))

def withRestartBackoff(
minBackoff: java.time.Duration,
maxBackoff: java.time.Duration,
randomFactor: Double,
maxRestarts: Int): ProjectionImpl =
withRestartBackoffSettings(
RestartSettings(minBackoff.asScala, maxBackoff.asScala, randomFactor).withMaxRestarts(maxRestarts, minBackoff))
RestartSettings(minBackoff.toScala, maxBackoff.toScala, randomFactor).withMaxRestarts(maxRestarts, minBackoff))

def withSaveOffset(afterEnvelopes: Int, afterDuration: FiniteDuration): ProjectionImpl

def withSaveOffset(afterEnvelopes: Int, afterDuration: java.time.Duration): ProjectionImpl =
withSaveOffset(afterEnvelopes, afterDuration.asScala)
withSaveOffset(afterEnvelopes, afterDuration.toScala)

def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: FiniteDuration): ProjectionImpl

def withGroup(groupAfterEnvelopes: Int, groupAfterDuration: java.time.Duration): ProjectionImpl =
withGroup(groupAfterEnvelopes, groupAfterDuration.asScala)
withGroup(groupAfterEnvelopes, groupAfterDuration.toScala)

}
Loading

0 comments on commit eb1d348

Please sign in to comment.