Skip to content

Commit

Permalink
Fix failing checkpoint at shutdown tests (#1021)
Browse files Browse the repository at this point in the history
* Simplify initialization and finalization related to checkpointing at shutdown

* ZIO snapshot version

* Upgrade to latest snapshot with ZStream scope fix

* Increase timeout

* No disconnect

* Alternative zombie test formulation

* Prevent an interrupt due to aggregateAsyncWithin

* Revert zio version

* Revert change

* Revert timeout
  • Loading branch information
svroonland committed Aug 11, 2024
1 parent 1c3310c commit a200340
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ inThisBuild(
)
)

val zioVersion = "2.1.6"
val zioVersion = "2.1.7"
val zioAwsVersion = "7.21.15.13"

lazy val root = project
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package nl.vroste.zio.kinesis.client.zionative.leasecoordinator

import nl.vroste.zio.kinesis.client.Util.ZStreamExtensions
import nl.vroste.zio.kinesis.client.zionative.Consumer.InitialPosition
import nl.vroste.zio.kinesis.client.zionative.LeaseCoordinator.AcquiredLease
import nl.vroste.zio.kinesis.client.zionative.LeaseRepository.{
Expand Down Expand Up @@ -53,7 +52,8 @@ private class DefaultLeaseCoordinator(
strategy: ShardAssignmentStrategy,
initialPosition: InitialPosition,
initialShards: Task[Map[ShardId, Shard.ReadOnly]],
currentShards: Task[Map[ShardId, Shard.ReadOnly]]
currentShards: Task[Map[ShardId, Shard.ReadOnly]],
scope: Scope
) extends LeaseCoordinator {

import DefaultLeaseCoordinator._
Expand Down Expand Up @@ -354,23 +354,10 @@ private class DefaultLeaseCoordinator(
}
} yield ()

override def acquiredLeases: ZStream[Any, Throwable, AcquiredLease] = ZStream.unwrapScoped {
for {
// We need a forked scope with independent finalizer order, because `initialize` will call `forkScoped` after being forked,
// which leads to a wrong finalizer order
scope <- ZIO.scope
childScope <- scope.fork
runloopFiber <- initialize
.provideEnvironment(ZEnvironment(childScope))
.forkScoped
_ <- ZIO.addFinalizer(
releaseLeases *> ZIO.logDebug("releaseLeases done")
) // We need the runloop to be alive for this operation
} yield ZStream
override def acquiredLeases: ZStream[Any, Throwable, AcquiredLease] =
ZStream
.fromQueue(acquiredLeasesQueue)
.map { case (lease, complete) => AcquiredLease(lease.key, complete) }
.terminateOnFiberFailure(runloopFiber)
}

override def getCheckpointForShard(shardId: String): UIO[Option[Either[SpecialCheckpoint, ExtendedSequenceNumber]]] =
for {
Expand All @@ -390,13 +377,18 @@ private class DefaultLeaseCoordinator(
)
)
permit <- Semaphore.make(1)
} yield new DefaultCheckpointer(
shardId,
state,
permit,
(checkpoint, release) => serialExecutionByShard(shardId)(updateCheckpoint(shardId, checkpoint, release)),
serialExecutionByShard(shardId)(releaseLease(shardId))
)
checkpointer = new DefaultCheckpointer(
shardId,
state,
permit,
(checkpoint, release) =>
serialExecutionByShard(shardId)(updateCheckpoint(shardId, checkpoint, release)),
serialExecutionByShard(shardId)(releaseLease(shardId))
)
_ <- scope.addFinalizer(
checkpointer.checkpointAndRelease.ignore
) // As a fallback when our shard streams do their finalization after the main stream
} yield checkpointer

private def updateCheckpoint(
shard: String,
Expand Down Expand Up @@ -468,6 +460,8 @@ private[zionative] object DefaultLeaseCoordinator {
table <- ZIO.service[LeaseRepository]
state <- Ref.make(State.empty)
serialExecution <- ZIO.acquireRelease(SerialExecution.keyed[String])(_ => ZIO.logDebug("Shutting down runloop"))
scope <- ZIO.scope
childScope <- scope.fork
c = new DefaultLeaseCoordinator(
table,
applicationName,
Expand All @@ -480,8 +474,13 @@ private[zionative] object DefaultLeaseCoordinator {
strategy,
initialPosition,
initialShards,
currentShards
currentShards,
scope
)
_ <- c.initialize.provideEnvironment(ZEnvironment(childScope))
_ <- ZIO.addFinalizer(
c.releaseLeases *> ZIO.logDebug("releaseLeases done")
) // We need the runloop to be alive for this operation
} yield c)
.tapErrorCause(c => ZIO.logSpan("Error creating DefaultLeaseCoordinator")(ZIO.logErrorCause(c)))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ object NativeConsumerTest extends ZIOSpecDefault {
emitDiagnostic = emitDiagnostic,
shardAssignmentStrategy = ShardAssignmentStrategy.balanced(renewInterval + 1.second)
)
.flatMapPar(Int.MaxValue) { case (shard @ _, shardStream, checkpointer) =>
.flatMapPar(nrShards) { case (shard @ _, shardStream, checkpointer) =>
shardStream
.tap(_ => onStarted)
.tap(checkpointer.stage)
Expand All @@ -484,6 +484,7 @@ object NativeConsumerTest extends ZIOSpecDefault {
}
}

// Test is complete when worker 2 and 3 have all leases
def testIsComplete(events: List[(String, Instant, DiagnosticEvent)]) = {
def getCurrentHeldLeasesByWorker =
events.foldLeft(Map.empty[String, Set[String]]) { case (acc, (worker, _, event)) =>
Expand Down Expand Up @@ -518,38 +519,33 @@ object NativeConsumerTest extends ZIOSpecDefault {

consumer1Started <- Promise.make[Throwable, Unit]

_ <- (
ZStream
.mergeAll(3)(
// The zombie consumer: not updating checkpoints and not renewing leases
consumer(
streamName,
applicationName,
"worker1",
handleEvent("worker1"),
consumer1Started.succeed(()),
checkpointInterval = 5.minutes,
renewInterval = 5.minutes // Such that it appears to be dead
),
ZStream.fromZIO(consumer1Started.await) *> // Give worker1 the first lease
consumer(streamName, applicationName, "worker2", handleEvent("worker2"))