Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump to akka 2.10.0-M1, alpakka 9.0.0-M1 #1115

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

package akka.persistence.cassandra

import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.util.OptionVal
import com.datastax.oss.driver.api.core.cql.PreparedStatement

Expand All @@ -26,7 +26,7 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement
ps.foreach { p =>
// only cache successful futures, ok to overwrite
preparedStatement = OptionVal.Some(Future.successful(p))
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
ps
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package akka.persistence.cassandra
import com.datastax.oss.driver.api.core.cql.Row
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

import akka.persistence.PersistentRepr
import akka.persistence.cassandra.journal._
Expand All @@ -16,7 +17,6 @@ import java.{ util => ju }

import akka.util.OptionVal
import akka.serialization.Serialization
import akka.util.ccompat.JavaConverters._
import java.nio.ByteBuffer

import com.datastax.oss.protocol.internal.util.Bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package akka.persistence.cassandra

import scala.collection.immutable
import scala.jdk.CollectionConverters._

import akka.actor.ClassicActorSystemProvider

Expand Down Expand Up @@ -48,10 +49,7 @@ class KeyspaceAndTableStatements(
* This can be queried in for example a startup script without accessing the actual
* Cassandra plugin actor.
*/
def getCreateJournalTablesStatements: java.util.List[String] = {
import akka.util.ccompat.JavaConverters._
createJournalTablesStatements.asJava
}
def getCreateJournalTablesStatements: java.util.List[String] = createJournalTablesStatements.asJava

/**
* The Cassandra Statement that can be used to create the configured keyspace.
Expand All @@ -77,9 +75,6 @@ class KeyspaceAndTableStatements(
* This can be queried in for example a startup script without accessing the actual
* Cassandra plugin actor.
*/
def getCreateSnapshotTablesStatements: java.util.List[String] = {
import akka.util.ccompat.JavaConverters._
createSnapshotTablesStatements.asJava
}
def getCreateSnapshotTablesStatements: java.util.List[String] = createSnapshotTablesStatements.asJava

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.persistence.cassandra.compaction

import com.typesafe.config.{ Config, ConfigFactory }

import akka.util.ccompat.JavaConverters._
import scala.jdk.CollectionConverters._

/*
* Based upon https://github.com/apache/cassandra/blob/cassandra-2.2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import akka.event.LoggingAdapter
import akka.persistence.cassandra.PluginSettings
import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, TagPidSequenceNr }
import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row, Statement }
import akka.util.ccompat.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future }
import scala.jdk.CollectionConverters._
import java.lang.{ Long => JLong }

import akka.annotation.InternalApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,20 @@ import akka.persistence.cassandra.journal.TagWriter.TagProgress
import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtension }
import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry }
import akka.stream.scaladsl.Sink
import akka.dispatch.ExecutionContexts
import akka.util.{ OptionVal, Timeout }
import com.datastax.oss.driver.api.core.cql._
import com.typesafe.config.Config
import com.datastax.oss.driver.api.core.uuid.Uuids
import com.datastax.oss.protocol.internal.util.Bytes

import scala.annotation.tailrec
import akka.util.ccompat.JavaConverters._
import scala.collection.immutable
import scala.collection.immutable.Seq
import scala.concurrent._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import scala.compat.java8.FutureConverters._
import akka.annotation.DoNotInherit
import akka.annotation.InternalStableApi
import akka.stream.scaladsl.Source
Expand Down Expand Up @@ -217,11 +216,11 @@ import akka.stream.scaladsl.Source
result
.flatMap(_ => deleteDeletedToSeqNr(persistenceId))
.flatMap(_ => deleteFromAllPersistenceIds(persistenceId))
else result.map(_ => Done)(ExecutionContexts.parasitic)
else result.map(_ => Done)(ExecutionContext.parasitic)
result2.pipeTo(sender())

case HealthCheckQuery =>
session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContexts.parasitic).pipeTo(sender())
session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContext.parasitic).pipeTo(sender())
}

override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = {
Expand Down Expand Up @@ -287,7 +286,7 @@ import akka.stream.scaladsl.Source
tagWrites match {
case Some(t) =>
implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout)
t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic)
t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContext.parasitic)
case None => Future.successful(Nil)
}
}
Expand Down Expand Up @@ -547,7 +546,7 @@ import akka.stream.scaladsl.Source
e.getClass.getName,
e.getMessage)
}
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
deleteResult.map(_ => Done)(ExecutionContext.parasitic)
}
}
}
Expand Down Expand Up @@ -587,7 +586,7 @@ import akka.stream.scaladsl.Source
}
})
})))
deleteResult.map(_ => Done)(ExecutionContexts.parasitic)
deleteResult.map(_ => Done)(ExecutionContext.parasitic)
}

// Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows.
Expand Down Expand Up @@ -696,7 +695,7 @@ import akka.stream.scaladsl.Source
var batch =
new BatchStatementBuilder(BatchType.UNLOGGED).build().setExecutionProfileName(journalSettings.writeProfile)
batch = body(batch)
session.underlying().flatMap(_.executeAsync(batch).toScala).map(_ => ())
session.underlying().flatMap(_.executeAsync(batch).asScala).map(_ => ())
}

private def selectOne[T <: Statement[T]](stmt: Statement[T]): Future[Option[Row]] = {
Expand Down Expand Up @@ -920,7 +919,7 @@ import akka.stream.scaladsl.Source

if (async) Future(deserializedEvent)
else Future.successful(deserializedEvent)
}).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic)
}).map(event => DeserializedEvent(event, meta))(ExecutionContext.parasitic)

} catch {
case NonFatal(e) => Future.failed(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package akka.persistence.cassandra.journal

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.FutureConverters._

import akka.Done
import akka.annotation.InternalApi
Expand Down Expand Up @@ -335,27 +335,27 @@ import akka.persistence.cassandra.FutureDone
def tagStatements: Future[Done] =
if (eventsByTagSettings.eventsByTagEnabled) {
for {
_ <- session.executeAsync(createTagsTable).toScala
_ <- session.executeAsync(createTagsProgressTable).toScala
_ <- session.executeAsync(createTagScanningTable).toScala
_ <- session.executeAsync(createTagsTable).asScala
_ <- session.executeAsync(createTagsProgressTable).asScala
_ <- session.executeAsync(createTagScanningTable).asScala
} yield Done
} else FutureDone

def keyspace: Future[Done] =
if (journalSettings.keyspaceAutoCreate)
session.executeAsync(createKeyspace).toScala.map(_ => Done)
session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone

val done = if (journalSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).toScala
_ <- session.executeAsync(createMetadataTable).toScala
_ <- session.executeAsync(createTable).asScala
_ <- session.executeAsync(createMetadataTable).asScala
_ <- {
if (settings.journalSettings.supportAllPersistenceIds)
session.executeAsync(createAllPersistenceIdsTable).toScala
session.executeAsync(createAllPersistenceIdsTable).asScala
else
FutureDone
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.Timers
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.persistence.cassandra.journal.CassandraJournal._
import akka.persistence.cassandra.journal.TagWriter._
Expand Down Expand Up @@ -222,7 +221,7 @@ import scala.util.Try
case BulkTagWrite(tws, withoutTags) =>
val replyTo = sender()
val forwards = tws.map(forwardTagWrite)
Future.sequence(forwards).map(_ => Done)(ExecutionContexts.parasitic).pipeTo(replyTo)
Future.sequence(forwards).map(_ => Done)(ExecutionContext.parasitic).pipeTo(replyTo)
updatePendingScanning(withoutTags)
case WriteTagScanningTick =>
writeTagScanning()
Expand Down Expand Up @@ -351,7 +350,7 @@ import scala.util.Try
Future.successful(Done)
} else {
updatePendingScanning(tw.serialised)
askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContexts.parasitic)
askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContext.parasitic)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, Seriali
import akka.serialization.Serialization

import scala.concurrent._
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
import akka.util.ccompat.JavaConverters._
import com.typesafe.config.{ Config, ConfigValueType }
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ import scala.util.{ Failure, Success, Try }
import com.datastax.oss.driver.api.core.CqlSession

import scala.annotation.nowarn
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

import akka.persistence.cassandra.PluginSettings

/**
Expand Down Expand Up @@ -70,15 +71,15 @@ import akka.persistence.cassandra.PluginSettings

def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec: ExecutionContext): Future[Option[Row]] = {
val boundStatement = selectSingleRowQuery.bind(persistenceId, pnr: JLong).setExecutionProfileName(profile)
session.executeAsync(boundStatement).toScala.map(rs => Option(rs.one()))
session.executeAsync(boundStatement).asScala.map(rs => Option(rs.one()))
}

def highestDeletedSequenceNumber(persistenceId: String)(implicit ec: ExecutionContext): Future[Long] =
executeStatement(selectDeletedToQuery.bind(persistenceId).setExecutionProfileName(profile)).map(r =>
Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0))

private def executeStatement(statement: Statement[_]): Future[AsyncResultSet] =
session.executeAsync(statement).toScala
session.executeAsync(statement).asScala

}

Expand Down Expand Up @@ -401,7 +402,7 @@ import akka.persistence.cassandra.PluginSettings
} else if (rs.remaining() == 0) {
log.debug("EventsByPersistenceId [{}] Fetch more from seqNr [{}]", persistenceId, expectedNextSeqNr)
queryState = QueryInProgress(switchPartition, fetchMore = true, System.nanoTime())
val rsFut = rs.fetchNextPage().toScala
val rsFut = rs.fetchNextPage().asScala
rsFut.onComplete(newResultSetCb.invoke)
} else {
val row = rs.one()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.datastax.oss.driver.api.core.cql.AsyncResultSet
import com.datastax.oss.driver.api.core.cql.Row
import com.datastax.oss.driver.api.core.uuid.Uuids

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

/**
* INTERNAL API
Expand Down Expand Up @@ -99,7 +99,7 @@ import scala.compat.java8.FutureConverters._
Retries.retry({ () =>
val bound =
statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile)
session.executeAsync(bound).toScala
session.executeAsync(bound).asScala
}, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor)
}
}
Expand Down Expand Up @@ -953,7 +953,7 @@ import scala.compat.java8.FutureConverters._

private def fetchMore(rs: AsyncResultSet): Unit = {
log.debug("[{}] No more results without paging. Requesting more.", stageUuid)
val moreResults = rs.fetchNextPage().toScala
val moreResults = rs.fetchNextPage().asScala
updateQueryState(QueryInProgress(abortForMissingSearch = false))
moreResults.onComplete(newResultSetCb.invoke)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import akka.persistence.query.javadsl._
import akka.stream.alpakka.cassandra.javadsl.CassandraSession
import akka.stream.javadsl.Source

import scala.compat.java8.FutureConverters
import scala.jdk.FutureConverters._

object CassandraReadJournal {

Expand Down Expand Up @@ -67,8 +67,7 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query
* It is also not required to wait until this CompletionStage is complete to start
* using the read journal.
*/
def initialize(): CompletionStage[Done] =
FutureConverters.toJava(scaladslReadJournal.initialize())
def initialize(): CompletionStage[Done] = scaladslReadJournal.initialize().asJava

/**
* Use this as the UUID offset in `eventsByTag` queries when you want all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

package akka.persistence.cassandra.snapshot

import scala.compat.java8.FutureConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.FutureConverters._

import akka.Done
import akka.annotation.InternalApi
Expand Down Expand Up @@ -112,15 +112,15 @@ import akka.persistence.cassandra.FutureDone
implicit ec: ExecutionContext): Future[Done] = {
def keyspace: Future[Done] =
if (snapshotSettings.keyspaceAutoCreate)
session.executeAsync(createKeyspace).toScala.map(_ => Done)
session.executeAsync(createKeyspace).asScala.map(_ => Done)
else FutureDone

if (snapshotSettings.tablesAutoCreate) {
// reason for setSchemaMetadataEnabled is that it speed up tests by multiple factors
session.setSchemaMetadataEnabled(false)
val result = for {
_ <- keyspace
_ <- session.executeAsync(createTable).toScala
_ <- session.executeAsync(createTable).asScala
} yield {
session.setSchemaMetadataEnabled(null)
Done
Expand Down
Loading
Loading