-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Emit messages from TransactionalFlow only after committing them #802
Conversation
@@ -167,6 +176,13 @@ private final class TransactionalProducerStageLogic[K, V, P](stage: Transactiona | |||
} | |||
} | |||
|
|||
override protected def emit(future: Future[Results[K, V, P]]): Unit = { | |||
implicit val ec: ExecutionContext = this.materializer.executionContext | |||
val committed: Future[Results[K, V, P]] = future.zip(batchOffsets.isCommitted()).map(_._1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batchOffsets.isCommitted().flatMap(_ => future)
might be a bit cleaner
Interesting approach! It would be useful to play with the parallelism and see how it affects performance, as the smaller it is the quicker Transaction benchmarks will be usable for this after #804 gets in. |
@szymonm Seems like this has been stale for a long while. Shall we close as obsolete? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like the right approach
@@ -72,8 +77,10 @@ private object TransactionalProducerStage { | |||
case (gtp, offset) => new TopicPartition(gtp.topic, gtp.partition) -> new OffsetAndMetadata(offset + 1) | |||
} | |||
|
|||
def internalCommit(): Future[Done] = | |||
def internalCommit(): Future[Done] = { | |||
committed.success(Done) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure this promise is not completed elsewhere, or use trySuccess
@@ -159,6 +156,9 @@ private class DefaultProducerStageLogic[K, V, P, IN <: Envelope[K, V, P], OUT <: | |||
} | |||
} | |||
|
|||
protected def emit(future: Future[OUT]): Unit = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's somewhat confusing to name this emit
since that is a method in Akka's GraphStage. Can we come up with another name? pushOut
?
We are close to merging the use of KIP-447 transactions which solves this. |
Purpose
Resolves #791
Changes
Introduces waiting for commit message before emitting from an element from the
TransactionalProducerStage
.This has the following effect on the logic:
After the
TransactionalProducerStage
we havemapAsync(txSettings.parallelism)
.Before in the
mapAsync
stage, we used to wait for Kafka ack message (producer.send
callback).Now we are going to wait for the transaction to complete (
producer.commitTransaction
).Thus we need to increase the
txSettings.parallelism
to accommodate committing many messages in the same transaction.What do you think about this change @2m, @ennru ?