Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Transactional API for Restore #98

Open
1 task
mdedetrich opened this issue Feb 10, 2022 · 0 comments
Open
1 task

Support Transactional API for Restore #98

mdedetrich opened this issue Feb 10, 2022 · 0 comments

Comments

@mdedetrich
Copy link
Contributor

What is currently missing?

Currently the restore portion of Guardian only uses a standard Kafka producer using Alpakka's Producer. While this is fine for standard scenarios most people when doing a restore would ideally want exactly once semantics for making a restore. This seems possible to do using the standard Producer with

def baseProducerConfig
    : Some[ProducerSettings[Array[Byte], Array[Byte]] => ProducerSettings[Array[Byte], Array[Byte]]] =
  Some(
    _.withBootstrapServers(
      container.bootstrapServers
    ).withProperties(
      Map(
        ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG             -> true.toString,
        ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString,
        ProducerConfig.BATCH_SIZE_CONFIG                     -> 0.toString
      )
    ).withParallelism(1)
  )

However such a configuration has terrible throughput since you are essentially forcing the Kafka producer to send one message at a time with no batching

How could this be improved?

Support needs to be done in Alpakka Kafka so that its possible to create a Transactional.source from a non Kafka cluster source. The current Transactional.sink only works with a PartitionOffsetCommittedMarker which is currently only created with a Transactional.source. Trying to manually create a PartitionOffset from a different Source just results in an exception being thrown at https://github.com/akka/alpakka-kafka/blob/1a1dab1e5168a829b7e76053375a364739043850/core/src/main/scala/akka/kafka/internal/TransactionalProducerStage.scala#L220 and its not possible to manually create a PartitionOffsetCommittedMarker since its private/internal API

There is currently an ongoing issue at Alpakka Kafka at akka/alpakka-kafka#1075 for this

Is this a feature you would work on yourself?

  • I plan to open a pull request for this feature
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant