Skip to content

Commit

Permalink
[greyhound] use no-interruption-retry toggle in blocking retries as w…
Browse files Browse the repository at this point in the history
…ell (#37095)

GitOrigin-RevId: f9b12e8030fb67924cdaaa27a57462ed256dd238
  • Loading branch information
berman7 authored and wix-oss committed Sep 23, 2023
1 parent e0dc975 commit 6b43379
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ object RecordConsumer {
config.initialSubscription,
blockingState,
nonBlockingRetryHelper,
awaitShutdown
awaitShutdown,
config.produceWithoutShutdown
)
)
case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ private[retry] object BlockingRetryRecordHandler {
retryConfig: RetryConfig,
blockingState: Ref[Map[BlockingTarget, BlockingState]],
nonBlockingHandler: NonBlockingRetryRecordHandler[V, K, R],
awaitShutdown: TopicPartition => UIO[AwaitShutdown]
awaitShutdown: TopicPartition => UIO[AwaitShutdown],
interruptOnShutdown: Boolean
): BlockingRetryRecordHandler[V, K, R] = new BlockingRetryRecordHandler[V, K, R] {
val blockingStateResolver = BlockingStateResolver(blockingState)
case class PollResult(pollAgain: Boolean, blockHandling: Boolean) // TODO: switch to state enum
Expand Down Expand Up @@ -51,11 +52,15 @@ private[retry] object BlockingRetryRecordHandler {
start <- currentTime(TimeUnit.MILLISECONDS)
continueBlocking <-
if (interval.toMillis > 100L) {
awaitShutdown(record.topicPartition).flatMap(
_.interruptOnShutdown(
pollBlockingStateWithSuspensions(record, interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling)
).reporting(r => DoneBlockingBeforeRetry(record.topic, record.partition, record.offset, r.duration, r.failed))
)
(if (interruptOnShutdown) {
awaitShutdown(record.topicPartition).flatMap(
_.interruptOnShutdown(
pollBlockingStateWithSuspensions(record, interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling)
)
)
} else {
pollBlockingStateWithSuspensions(record, interval, start).repeatWhile(result => result.pollAgain).map(_.blockHandling)
}).reporting(r => DoneBlockingBeforeRetry(record.topic, record.partition, record.offset, r.duration, r.failed))
} else {
for {
shouldBlock <- blockingStateResolver.resolve(record)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ object RetryRecordHandler {
subscription: ConsumerSubscription,
blockingState: Ref[Map[BlockingTarget, BlockingState]],
nonBlockingRetryHelper: NonBlockingRetryHelper,
awaitShutdown: TopicPartition => UIO[AwaitShutdown] = _ => ZIO.succeed(AwaitShutdown.never)(zio.Trace.empty)
awaitShutdown: TopicPartition => UIO[AwaitShutdown] = _ => ZIO.succeed(AwaitShutdown.never)(zio.Trace.empty),
produceWithoutShutdown: Boolean = false
)(
implicit evK: K <:< Chunk[Byte],
evV: V <:< Chunk[Byte]
Expand All @@ -41,9 +42,18 @@ object RetryRecordHandler {
nonBlockingRetryHelper,
groupId,
awaitShutdown,
produceWithoutShutdown = false
produceWithoutShutdown = produceWithoutShutdown
)
val blockingHandler =
BlockingRetryRecordHandler(
groupId,
handler,
retryConfig,
blockingState,
nonBlockingHandler,
awaitShutdown,
interruptOnShutdown = !produceWithoutShutdown
)
val blockingHandler = BlockingRetryRecordHandler(groupId, handler, retryConfig, blockingState, nonBlockingHandler, awaitShutdown)
val blockingAndNonBlockingHandler = BlockingAndNonBlockingRetryRecordHandler(groupId, blockingHandler, nonBlockingHandler)

new RecordHandler[R with R2 with GreyhoundMetrics, Nothing, K, V] {
Expand Down

0 comments on commit 6b43379

Please sign in to comment.