diff --git a/core/src/main/scala/akka/persistence/cassandra/CachedPreparedStatement.scala b/core/src/main/scala/akka/persistence/cassandra/CachedPreparedStatement.scala index b4c69d579..dbdc5b5af 100644 --- a/core/src/main/scala/akka/persistence/cassandra/CachedPreparedStatement.scala +++ b/core/src/main/scala/akka/persistence/cassandra/CachedPreparedStatement.scala @@ -4,10 +4,10 @@ package akka.persistence.cassandra +import scala.concurrent.ExecutionContext import scala.concurrent.Future import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.util.OptionVal import com.datastax.oss.driver.api.core.cql.PreparedStatement @@ -26,7 +26,7 @@ import com.datastax.oss.driver.api.core.cql.PreparedStatement ps.foreach { p => // only cache successful futures, ok to overwrite preparedStatement = OptionVal.Some(Future.successful(p)) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) ps } diff --git a/core/src/main/scala/akka/persistence/cassandra/Extractors.scala b/core/src/main/scala/akka/persistence/cassandra/Extractors.scala index ee24ecb6d..f34069498 100644 --- a/core/src/main/scala/akka/persistence/cassandra/Extractors.scala +++ b/core/src/main/scala/akka/persistence/cassandra/Extractors.scala @@ -7,6 +7,7 @@ package akka.persistence.cassandra import com.datastax.oss.driver.api.core.cql.Row import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.jdk.CollectionConverters._ import akka.persistence.PersistentRepr import akka.persistence.cassandra.journal._ @@ -16,7 +17,6 @@ import java.{ util => ju } import akka.util.OptionVal import akka.serialization.Serialization -import akka.util.ccompat.JavaConverters._ import java.nio.ByteBuffer import com.datastax.oss.protocol.internal.util.Bytes diff --git a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala index 43b888cd5..a33d50496 100644 --- a/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/KeyspaceAndTableStatements.scala @@ -5,6 +5,7 @@ package akka.persistence.cassandra import scala.collection.immutable +import scala.jdk.CollectionConverters._ import akka.actor.ClassicActorSystemProvider @@ -48,10 +49,7 @@ class KeyspaceAndTableStatements( * This can be queried in for example a startup script without accessing the actual * Cassandra plugin actor. */ - def getCreateJournalTablesStatements: java.util.List[String] = { - import akka.util.ccompat.JavaConverters._ - createJournalTablesStatements.asJava - } + def getCreateJournalTablesStatements: java.util.List[String] = createJournalTablesStatements.asJava /** * The Cassandra Statement that can be used to create the configured keyspace. @@ -77,9 +75,6 @@ class KeyspaceAndTableStatements( * This can be queried in for example a startup script without accessing the actual * Cassandra plugin actor. */ - def getCreateSnapshotTablesStatements: java.util.List[String] = { - import akka.util.ccompat.JavaConverters._ - createSnapshotTablesStatements.asJava - } + def getCreateSnapshotTablesStatements: java.util.List[String] = createSnapshotTablesStatements.asJava } diff --git a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala index 95dc908d3..85ad0d2e1 100644 --- a/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala +++ b/core/src/main/scala/akka/persistence/cassandra/compaction/BaseCompactionStrategy.scala @@ -6,7 +6,7 @@ package akka.persistence.cassandra.compaction import com.typesafe.config.{ Config, ConfigFactory } -import akka.util.ccompat.JavaConverters._ +import scala.jdk.CollectionConverters._ /* * Based upon https://github.com/apache/cassandra/blob/cassandra-2.2/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraEventUpdate.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraEventUpdate.scala index 8f7f18c6d..e7366c800 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraEventUpdate.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraEventUpdate.scala @@ -9,8 +9,8 @@ import akka.event.LoggingAdapter import akka.persistence.cassandra.PluginSettings import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, TagPidSequenceNr } import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row, Statement } -import akka.util.ccompat.JavaConverters._ import scala.concurrent.{ ExecutionContext, Future } +import scala.jdk.CollectionConverters._ import java.lang.{ Long => JLong } import akka.annotation.InternalApi diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala index 2e4c0425a..48e78d349 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournal.scala @@ -25,7 +25,6 @@ import akka.persistence.cassandra.journal.TagWriter.TagProgress import akka.serialization.{ AsyncSerializer, Serialization, SerializationExtension } import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } import akka.stream.scaladsl.Sink -import akka.dispatch.ExecutionContexts import akka.util.{ OptionVal, Timeout } import com.datastax.oss.driver.api.core.cql._ import com.typesafe.config.Config @@ -33,13 +32,13 @@ import com.datastax.oss.driver.api.core.uuid.Uuids import com.datastax.oss.protocol.internal.util.Bytes import scala.annotation.tailrec -import akka.util.ccompat.JavaConverters._ import scala.collection.immutable import scala.collection.immutable.Seq import scala.concurrent._ +import scala.jdk.CollectionConverters._ +import scala.jdk.FutureConverters._ import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import scala.compat.java8.FutureConverters._ import akka.annotation.DoNotInherit import akka.annotation.InternalStableApi import akka.stream.scaladsl.Source @@ -217,11 +216,11 @@ import akka.stream.scaladsl.Source result .flatMap(_ => deleteDeletedToSeqNr(persistenceId)) .flatMap(_ => deleteFromAllPersistenceIds(persistenceId)) - else result.map(_ => Done)(ExecutionContexts.parasitic) + else result.map(_ => Done)(ExecutionContext.parasitic) result2.pipeTo(sender()) case HealthCheckQuery => - session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContexts.parasitic).pipeTo(sender()) + session.selectOne(healthCheckCql).map(_ => HealthCheckResponse)(ExecutionContext.parasitic).pipeTo(sender()) } override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] = { @@ -287,7 +286,7 @@ import akka.stream.scaladsl.Source tagWrites match { case Some(t) => implicit val timeout: Timeout = Timeout(settings.eventsByTagSettings.tagWriteTimeout) - t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContexts.parasitic) + t.ask(extractTagWrites(serialized)).map(_ => Nil)(ExecutionContext.parasitic) case None => Future.successful(Nil) } } @@ -547,7 +546,7 @@ import akka.stream.scaladsl.Source e.getClass.getName, e.getMessage) } - deleteResult.map(_ => Done)(ExecutionContexts.parasitic) + deleteResult.map(_ => Done)(ExecutionContext.parasitic) } } } @@ -587,7 +586,7 @@ import akka.stream.scaladsl.Source } }) }))) - deleteResult.map(_ => Done)(ExecutionContexts.parasitic) + deleteResult.map(_ => Done)(ExecutionContext.parasitic) } // Deletes the events by inserting into the metadata table deleted_to and physically deletes the rows. @@ -696,7 +695,7 @@ import akka.stream.scaladsl.Source var batch = new BatchStatementBuilder(BatchType.UNLOGGED).build().setExecutionProfileName(journalSettings.writeProfile) batch = body(batch) - session.underlying().flatMap(_.executeAsync(batch).toScala).map(_ => ()) + session.underlying().flatMap(_.executeAsync(batch).asScala).map(_ => ()) } private def selectOne[T <: Statement[T]](stmt: Statement[T]): Future[Option[Row]] = { @@ -920,7 +919,7 @@ import akka.stream.scaladsl.Source if (async) Future(deserializedEvent) else Future.successful(deserializedEvent) - }).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic) + }).map(event => DeserializedEvent(event, meta))(ExecutionContext.parasitic) } catch { case NonFatal(e) => Future.failed(e) diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala index 41c69aef9..946e6ac79 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/CassandraJournalStatements.scala @@ -4,9 +4,9 @@ package akka.persistence.cassandra.journal -import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.jdk.FutureConverters._ import akka.Done import akka.annotation.InternalApi @@ -335,15 +335,15 @@ import akka.persistence.cassandra.FutureDone def tagStatements: Future[Done] = if (eventsByTagSettings.eventsByTagEnabled) { for { - _ <- session.executeAsync(createTagsTable).toScala - _ <- session.executeAsync(createTagsProgressTable).toScala - _ <- session.executeAsync(createTagScanningTable).toScala + _ <- session.executeAsync(createTagsTable).asScala + _ <- session.executeAsync(createTagsProgressTable).asScala + _ <- session.executeAsync(createTagScanningTable).asScala } yield Done } else FutureDone def keyspace: Future[Done] = if (journalSettings.keyspaceAutoCreate) - session.executeAsync(createKeyspace).toScala.map(_ => Done) + session.executeAsync(createKeyspace).asScala.map(_ => Done) else FutureDone val done = if (journalSettings.tablesAutoCreate) { @@ -351,11 +351,11 @@ import akka.persistence.cassandra.FutureDone session.setSchemaMetadataEnabled(false) val result = for { _ <- keyspace - _ <- session.executeAsync(createTable).toScala - _ <- session.executeAsync(createMetadataTable).toScala + _ <- session.executeAsync(createTable).asScala + _ <- session.executeAsync(createMetadataTable).asScala _ <- { if (settings.journalSettings.supportAllPersistenceIds) - session.executeAsync(createAllPersistenceIdsTable).toScala + session.executeAsync(createAllPersistenceIdsTable).asScala else FutureDone } diff --git a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala index 7063f44df..081e0f5e8 100644 --- a/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala +++ b/core/src/main/scala/akka/persistence/cassandra/journal/TagWriters.scala @@ -22,7 +22,6 @@ import akka.actor.Props import akka.actor.SupervisorStrategy import akka.actor.Timers import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter import akka.persistence.cassandra.journal.CassandraJournal._ import akka.persistence.cassandra.journal.TagWriter._ @@ -222,7 +221,7 @@ import scala.util.Try case BulkTagWrite(tws, withoutTags) => val replyTo = sender() val forwards = tws.map(forwardTagWrite) - Future.sequence(forwards).map(_ => Done)(ExecutionContexts.parasitic).pipeTo(replyTo) + Future.sequence(forwards).map(_ => Done)(ExecutionContext.parasitic).pipeTo(replyTo) updatePendingScanning(withoutTags) case WriteTagScanningTick => writeTagScanning() @@ -351,7 +350,7 @@ import scala.util.Try Future.successful(Done) } else { updatePendingScanning(tw.serialised) - askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContexts.parasitic) + askTagActor(tw.tag, tw).map(_ => Done)(ExecutionContext.parasitic) } } diff --git a/core/src/main/scala/akka/persistence/cassandra/package.scala b/core/src/main/scala/akka/persistence/cassandra/package.scala index 78d97aaba..2da287381 100644 --- a/core/src/main/scala/akka/persistence/cassandra/package.scala +++ b/core/src/main/scala/akka/persistence/cassandra/package.scala @@ -15,8 +15,8 @@ import akka.persistence.cassandra.journal.CassandraJournal.{ Serialized, Seriali import akka.serialization.Serialization import scala.concurrent._ +import scala.jdk.CollectionConverters._ import scala.util.control.NonFatal -import akka.util.ccompat.JavaConverters._ import com.typesafe.config.{ Config, ConfigValueType } import akka.actor.ActorSystem import akka.actor.ExtendedActorSystem diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala index a97db1b5b..a8f1927b0 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByPersistenceIdStage.scala @@ -18,7 +18,8 @@ import scala.util.{ Failure, Success, Try } import com.datastax.oss.driver.api.core.CqlSession import scala.annotation.nowarn -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ + import akka.persistence.cassandra.PluginSettings /** @@ -70,7 +71,7 @@ import akka.persistence.cassandra.PluginSettings def selectSingleRow(persistenceId: String, pnr: Long)(implicit ec: ExecutionContext): Future[Option[Row]] = { val boundStatement = selectSingleRowQuery.bind(persistenceId, pnr: JLong).setExecutionProfileName(profile) - session.executeAsync(boundStatement).toScala.map(rs => Option(rs.one())) + session.executeAsync(boundStatement).asScala.map(rs => Option(rs.one())) } def highestDeletedSequenceNumber(persistenceId: String)(implicit ec: ExecutionContext): Future[Long] = @@ -78,7 +79,7 @@ import akka.persistence.cassandra.PluginSettings Option(r.one()).map(_.getLong("deleted_to")).getOrElse(0)) private def executeStatement(statement: Statement[_]): Future[AsyncResultSet] = - session.executeAsync(statement).toScala + session.executeAsync(statement).asScala } @@ -401,7 +402,7 @@ import akka.persistence.cassandra.PluginSettings } else if (rs.remaining() == 0) { log.debug("EventsByPersistenceId [{}] Fetch more from seqNr [{}]", persistenceId, expectedNextSeqNr) queryState = QueryInProgress(switchPartition, fetchMore = true, System.nanoTime()) - val rsFut = rs.fetchNextPage().toScala + val rsFut = rs.fetchNextPage().asScala rsFut.onComplete(newResultSetCb.invoke) } else { val row = rs.one() diff --git a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala index 294e03c37..328c4a353 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/EventsByTagStage.scala @@ -31,7 +31,7 @@ import com.datastax.oss.driver.api.core.cql.AsyncResultSet import com.datastax.oss.driver.api.core.cql.Row import com.datastax.oss.driver.api.core.uuid.Uuids -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ /** * INTERNAL API @@ -99,7 +99,7 @@ import scala.compat.java8.FutureConverters._ Retries.retry({ () => val bound = statements.byTagWithUpperLimit.bind(tag, bucket.key: JLong, from, to).setExecutionProfileName(readProfile) - session.executeAsync(bound).toScala + session.executeAsync(bound).asScala }, retries.retries, onFailure, retries.minDuration, retries.maxDuration, retries.randomFactor) } } @@ -953,7 +953,7 @@ import scala.compat.java8.FutureConverters._ private def fetchMore(rs: AsyncResultSet): Unit = { log.debug("[{}] No more results without paging. Requesting more.", stageUuid) - val moreResults = rs.fetchNextPage().toScala + val moreResults = rs.fetchNextPage().asScala updateQueryState(QueryInProgress(abortForMissingSearch = false)) moreResults.onComplete(newResultSetCb.invoke) } diff --git a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala index d2f56680a..4102df588 100644 --- a/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala +++ b/core/src/main/scala/akka/persistence/cassandra/query/javadsl/CassandraReadJournal.scala @@ -15,7 +15,7 @@ import akka.persistence.query.javadsl._ import akka.stream.alpakka.cassandra.javadsl.CassandraSession import akka.stream.javadsl.Source -import scala.compat.java8.FutureConverters +import scala.jdk.FutureConverters._ object CassandraReadJournal { @@ -67,8 +67,7 @@ class CassandraReadJournal(scaladslReadJournal: akka.persistence.cassandra.query * It is also not required to wait until this CompletionStage is complete to start * using the read journal. */ - def initialize(): CompletionStage[Done] = - FutureConverters.toJava(scaladslReadJournal.initialize()) + def initialize(): CompletionStage[Done] = scaladslReadJournal.initialize().asJava /** * Use this as the UUID offset in `eventsByTag` queries when you want all diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala index 32a7a18bc..4653a92ab 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStatements.scala @@ -4,9 +4,9 @@ package akka.persistence.cassandra.snapshot -import scala.compat.java8.FutureConverters._ import scala.concurrent.ExecutionContext import scala.concurrent.Future +import scala.jdk.FutureConverters._ import akka.Done import akka.annotation.InternalApi @@ -112,7 +112,7 @@ import akka.persistence.cassandra.FutureDone implicit ec: ExecutionContext): Future[Done] = { def keyspace: Future[Done] = if (snapshotSettings.keyspaceAutoCreate) - session.executeAsync(createKeyspace).toScala.map(_ => Done) + session.executeAsync(createKeyspace).asScala.map(_ => Done) else FutureDone if (snapshotSettings.tablesAutoCreate) { @@ -120,7 +120,7 @@ import akka.persistence.cassandra.FutureDone session.setSchemaMetadataEnabled(false) val result = for { _ <- keyspace - _ <- session.executeAsync(createTable).toScala + _ <- session.executeAsync(createTable).asScala } yield { session.setSchemaMetadataEnabled(null) Done diff --git a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala index ed6ef61d2..77768efbf 100644 --- a/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala +++ b/core/src/main/scala/akka/persistence/cassandra/snapshot/CassandraSnapshotStore.scala @@ -6,15 +6,16 @@ package akka.persistence.cassandra.snapshot import java.lang.{ Long => JLong } import java.nio.ByteBuffer - import akka.NotUsed +import scala.annotation.unused import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.control.NonFatal + import akka.actor._ import akka.pattern.pipe import akka.persistence._ @@ -27,13 +28,12 @@ import akka.serialization.SerializationExtension import akka.serialization.Serializers import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import akka.util.{ unused, OptionVal } +import akka.util.OptionVal import com.datastax.oss.driver.api.core.cql._ import com.datastax.oss.protocol.internal.util.Bytes import com.typesafe.config.Config import akka.Done import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessionRegistry } @@ -251,11 +251,11 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi } def executeBatch(body: BatchStatementBuilder => Unit): Future[Unit] = { - import scala.compat.java8.FutureConverters._ + import scala.jdk.FutureConverters._ val batch = new BatchStatementBuilder(BatchType.UNLOGGED).setExecutionProfileName(snapshotSettings.writeProfile) body(batch) - session.underlying().flatMap(_.executeAsync(batch.build()).toScala).map(_ => ()) + session.underlying().flatMap(_.executeAsync(batch.build()).asScala).map(_ => ()) } private def metadata( @@ -405,7 +405,7 @@ import akka.stream.alpakka.cassandra.scaladsl.{ CassandraSession, CassandraSessi // Serialization.deserialize adds transport info serialization.deserialize(bytes, serId, manifest).get } - }).map(payload => DeserializedSnapshot(payload, meta))(ExecutionContexts.parasitic) + }).map(payload => DeserializedSnapshot(payload, meta))(ExecutionContext.parasitic) } catch { case NonFatal(e) => Future.failed(e) diff --git a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala index 34835d6d8..41e0cba27 100644 --- a/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/CassandraSpec.scala @@ -171,7 +171,7 @@ abstract class CassandraSpec( try { if (failed && dumpRowsOnFailure) { println("RowDump::") - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ if (system.settings.config.getBoolean("akka.persistence.cassandra.events-by-tag.enabled")) { println("tag_views") cluster diff --git a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala index b8a37d4bc..fbcaa0c29 100644 --- a/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/cleanup/CleanupSpec.scala @@ -556,7 +556,7 @@ class CleanupSpec extends CassandraSpec(CleanupSpec.config) with DirectWriting { } private def allSnapshots(pid: String): Seq[SnapshotMetadata] = { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ cluster .execute(s"select * from ${snapshotName}.snapshots where persistence_id = '${pid}' order by sequence_nr") .asScala diff --git a/core/src/test/scala/akka/persistence/cassandra/journal/TagScanningSpec.scala b/core/src/test/scala/akka/persistence/cassandra/journal/TagScanningSpec.scala index e7b60b9d7..7b74a99f3 100644 --- a/core/src/test/scala/akka/persistence/cassandra/journal/TagScanningSpec.scala +++ b/core/src/test/scala/akka/persistence/cassandra/journal/TagScanningSpec.scala @@ -28,7 +28,7 @@ class TagScanningSpec extends CassandraSpec(TagScanningSpec.config) { } awaitAssert { - import akka.util.ccompat.JavaConverters._ + import scala.jdk.CollectionConverters._ val expected = (0 until nrActors).map(n => (s"$n".toInt, 1L)).toList val scanning = cluster .execute(s"select * from ${journalName}.tag_scanning") diff --git a/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java index 097ecba29..7172110cb 100644 --- a/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java +++ b/docs/src/test/java/jdoc/cleanup/CleanupDocExample.java @@ -5,7 +5,8 @@ import akka.persistence.cassandra.cleanup.Cleanup; import akka.persistence.cassandra.query.javadsl.CassandraReadJournal; import akka.persistence.query.PersistenceQuery; -import scala.compat.java8.FutureConverters; + +import scala.jdk.javaapi.FutureConverters; import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; @@ -26,7 +27,7 @@ public static void example() { // forall persistence ids, keep two snapshots and delete all events before the oldest kept snapshot - queries.currentPersistenceIds().mapAsync(persistenceIdParallelism, pid -> FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid, 2))).run(system); + queries.currentPersistenceIds().mapAsync(persistenceIdParallelism, pid -> FutureConverters.asJava(cleanup.cleanupBeforeSnapshot(pid, 2))).run(system); // forall persistence ids, keep everything after the provided unix timestamp, if there aren't enough snapshots after this time // go back before the timestamp to find snapshot to delete before @@ -34,7 +35,7 @@ public static void example() { ZonedDateTime keepAfter = ZonedDateTime.now().minus(1, ChronoUnit.MONTHS); queries .currentPersistenceIds() - .mapAsync(persistenceIdParallelism, pid -> FutureConverters.toJava(cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant().toEpochMilli()))) + .mapAsync(persistenceIdParallelism, pid -> FutureConverters.asJava(cleanup.cleanupBeforeSnapshot(pid, 2, keepAfter.toInstant().toEpochMilli()))) .run(system); //#cleanup diff --git a/dse-test/src/test/scala/your/pack/DseSessionProvider.scala b/dse-test/src/test/scala/your/pack/DseSessionProvider.scala index 2fe6713f8..ef92b76d9 100644 --- a/dse-test/src/test/scala/your/pack/DseSessionProvider.scala +++ b/dse-test/src/test/scala/your/pack/DseSessionProvider.scala @@ -4,7 +4,7 @@ import akka.stream.alpakka.cassandra.CqlSessionProvider import com.datastax.dse.driver.api.core.DseSession import com.datastax.oss.driver.api.core.CqlSession -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ import scala.concurrent.{ ExecutionContext, Future } //#dse-session-provider @@ -14,7 +14,7 @@ class DseSessionProvider extends CqlSessionProvider { .builder() // .withAuthProvider() can add any DSE specific authentication here .buildAsync() - .toScala + .asScala } } //#dse-session-provider diff --git a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala index 2ea8d58a2..3a046213a 100644 --- a/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala +++ b/example/src/main/scala/akka/persistence/cassandra/example/EventProcessorStream.scala @@ -10,7 +10,6 @@ import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry import akka.stream.scaladsl.{ RestartSource, Sink, Source } import com.datastax.oss.driver.api.core.cql.{ PreparedStatement, Row } import org.slf4j.{ Logger, LoggerFactory } -import akka.actor.typed.scaladsl.LoggerOps import org.HdrHistogram.Histogram import scala.concurrent.{ ExecutionContext, Future } diff --git a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala index 88098988c..f56a05bf1 100644 --- a/example/src/main/scala/akka/persistence/cassandra/example/Main.scala +++ b/example/src/main/scala/akka/persistence/cassandra/example/Main.scala @@ -7,7 +7,6 @@ import akka.cluster.typed.{ Cluster, SelfUp, Subscribe } import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement import akka.persistence.cassandra.example.LoadGenerator.Start -import akka.actor.typed.scaladsl.LoggerOps import akka.stream.alpakka.cassandra.scaladsl.CassandraSessionRegistry import scala.concurrent.Await diff --git a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala index 91ca0d8a9..0b1077de9 100644 --- a/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala +++ b/example/src/main/scala/akka/persistence/cassandra/example/ReadSide.scala @@ -8,7 +8,6 @@ import akka.cluster.sharding.typed.scaladsl.ShardedDaemonProcess import akka.stream.{ KillSwitches, SharedKillSwitch } import com.typesafe.config.Config import org.HdrHistogram.Histogram -import akka.actor.typed.scaladsl.LoggerOps import scala.concurrent.duration._ object ReadSide { diff --git a/example/src/main/scala/akka/persistence/cassandra/example/Reporter.scala b/example/src/main/scala/akka/persistence/cassandra/example/Reporter.scala index 5b577ae3c..a8925f3bb 100644 --- a/example/src/main/scala/akka/persistence/cassandra/example/Reporter.scala +++ b/example/src/main/scala/akka/persistence/cassandra/example/Reporter.scala @@ -4,7 +4,6 @@ import akka.actor.typed.{ ActorRef, Behavior } import akka.actor.typed.pubsub.Topic import akka.actor.typed.scaladsl.Behaviors import akka.persistence.cassandra.example.ReadSideTopic.ReadSideMetrics -import akka.actor.typed.scaladsl.LoggerOps object Reporter { def apply(topic: ActorRef[Topic.Command[ReadSideTopic.ReadSideMetrics]]): Behavior[ReadSideMetrics] = diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 56f4d8586..1cc7e4769 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -12,19 +12,19 @@ object Dependencies { val Scala2Versions = Seq(Scala213) val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 - val AkkaVersion = System.getProperty("override.akka.version", "2.9.3") + val AkkaVersion = System.getProperty("override.akka.version", "2.10.0-M1") val AkkaVersionInDocs = VersionNumber(AkkaVersion).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } val CassandraVersionInDocs = "4.0" // Should be sync with the version of the driver in Alpakka Cassandra val CassandraDriverVersion = "4.17.0" val DriverVersionInDocs = "4.14" - val AlpakkaVersion = "8.0.0" + val AlpakkaVersion = "9.0.0-M1" val AlpakkaVersionInDocs = "8.0" // for example val AkkaManagementVersion = "1.5.0" - val Logback = "ch.qos.logback" % "logback-classic" % "1.2.13" + val Logback = "ch.qos.logback" % "logback-classic" % "1.5.7" val reconcilerDependencies = Seq( "com.typesafe.akka" %% "akka-actor-testkit-typed" % AkkaVersion % Test,