Skip to content

Commit

Permalink
Add producer transaction API
Browse files Browse the repository at this point in the history
externalize transaction manager to it's own file

add KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR to ensure transaction topic can be in ha mode
  • Loading branch information
ryarnyah committed Aug 20, 2022
1 parent 3083a9b commit 6c913d4
Show file tree
Hide file tree
Showing 35 changed files with 5,547 additions and 101 deletions.
224 changes: 167 additions & 57 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,65 +48,27 @@ type AsyncProducer interface {
// you can set Producer.Return.Errors in your config to false, which prevents
// errors to be returned.
Errors() <-chan *ProducerError
}

// transactionManager keeps the state necessary to ensure idempotent production
type transactionManager struct {
producerID int64
producerEpoch int16
sequenceNumbers map[string]int32
mutex sync.Mutex
}

const (
noProducerID = -1
noProducerEpoch = -1
)

func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
return sequence, t.producerEpoch
}
// IsTransactional return true when current producer is is transactional.
IsTransactional() bool

func (t *transactionManager) bumpEpoch() {
t.mutex.Lock()
defer t.mutex.Unlock()
t.producerEpoch++
for k := range t.sequenceNumbers {
t.sequenceNumbers[k] = 0
}
}
// TxnStatus return current producer transaction status.
TxnStatus() ProducerTxnStatusFlag

func (t *transactionManager) getProducerID() (int64, int16) {
t.mutex.Lock()
defer t.mutex.Unlock()
return t.producerID, t.producerEpoch
}
// BeginTxn mark current transaction as ready.
BeginTxn() error

func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
txnmgr := &transactionManager{
producerID: noProducerID,
producerEpoch: noProducerEpoch,
}
// CommitTxn commit current transaction.
CommitTxn() error

if conf.Producer.Idempotent {
initProducerIDResponse, err := client.InitProducerID()
if err != nil {
return nil, err
}
txnmgr.producerID = initProducerIDResponse.ProducerID
txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
txnmgr.sequenceNumbers = make(map[string]int32)
txnmgr.mutex = sync.Mutex{}
// AbortTxn abort current transaction.
AbortTxn() error

Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
}
// AddOffsetsToTxn add associated offsets to current transaction.
AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error

return txnmgr, nil
// AddMessageToTxn add message offsets to current transaction.
AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error
}

type asyncProducer struct {
Expand All @@ -122,6 +84,7 @@ type asyncProducer struct {
brokerLock sync.Mutex

txnmgr *transactionManager
txLock sync.Mutex
}

// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
Expand Down Expand Up @@ -175,9 +138,12 @@ func newAsyncProducer(client Client) (AsyncProducer, error) {
type flagSet int8

const (
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
fin // final message from partitionProducer to brokerProducer and back
shutdown // start the shutdown process
syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
fin // final message from partitionProducer to brokerProducer and back
shutdown // start the shutdown process
endtxn // endtxn
committxn // endtxn
aborttxn // endtxn
)

// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
Expand Down Expand Up @@ -283,6 +249,97 @@ func (pe ProducerErrors) Error() string {
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
}

func (p *asyncProducer) IsTransactional() bool {
return p.txnmgr.isTransactional()
}

func (p *asyncProducer) AddMessageToTxn(msg *ConsumerMessage, groupId string, metadata *string) error {
offsets := make(map[string][]*PartitionOffsetMetadata)
offsets[msg.Topic] = []*PartitionOffsetMetadata{
{
Partition: msg.Partition,
Offset: msg.Offset + 1,
Metadata: metadata,
},
}
return p.AddOffsetsToTxn(offsets, groupId)
}

func (p *asyncProducer) AddOffsetsToTxn(offsets map[string][]*PartitionOffsetMetadata, groupId string) error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Printf("producer/txnmgr [%s] attempt to call AddOffsetsToTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
return ErrNonTransactedProducer
}

DebugLogger.Printf("producer/txnmgr [%s] add offsets to transaction\n", p.txnmgr.transactionalID)
return p.txnmgr.addOffsetsToTxn(offsets, groupId)
}

func (p *asyncProducer) TxnStatus() ProducerTxnStatusFlag {
return p.txnmgr.currentTxnStatus()
}

func (p *asyncProducer) BeginTxn() error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Println("producer/txnmgr attempt to call BeginTxn on a non-transactional producer")
return ErrNonTransactedProducer
}

return p.txnmgr.transitionTo(ProducerTxnFlagInTransaction, nil)
}

func (p *asyncProducer) CommitTxn() error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Printf("producer/txnmgr [%s] attempt to call CommitTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
return ErrNonTransactedProducer
}

DebugLogger.Printf("producer/txnmgr [%s] committing transaction\n", p.txnmgr.transactionalID)
err := p.finishTransaction(true)
if err != nil {
return err
}
DebugLogger.Printf("producer/txnmgr [%s] transaction committed\n", p.txnmgr.transactionalID)
return nil
}

func (p *asyncProducer) AbortTxn() error {
p.txLock.Lock()
defer p.txLock.Unlock()

if !p.IsTransactional() {
DebugLogger.Printf("producer/txnmgr [%s] attempt to call AbortTxn on a non-transactional producer\n", p.txnmgr.transactionalID)
return ErrNonTransactedProducer
}
DebugLogger.Printf("producer/txnmgr [%s] aborting transaction\n", p.txnmgr.transactionalID)
err := p.finishTransaction(false)
if err != nil {
return err
}
DebugLogger.Printf("producer/txnmgr [%s] transaction aborted\n", p.txnmgr.transactionalID)
return nil
}

func (p *asyncProducer) finishTransaction(commit bool) error {
p.inFlight.Add(1)
if commit {
p.input <- &ProducerMessage{flags: endtxn | committxn}
} else {
p.input <- &ProducerMessage{flags: endtxn | aborttxn}
}
p.inFlight.Wait()
return p.txnmgr.finishTransaction(commit)
}

func (p *asyncProducer) Errors() <-chan *ProducerError {
return p.errors
}
Expand Down Expand Up @@ -336,11 +393,27 @@ func (p *asyncProducer) dispatcher() {
continue
}

if msg.flags&endtxn != 0 {
var err error
if msg.flags&committxn != 0 {
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagCommittingTransaction, nil)
} else {
err = p.txnmgr.transitionTo(ProducerTxnFlagEndTransaction|ProducerTxnFlagAbortingTransaction, nil)
}
if err != nil {
Logger.Printf("producer/txnmgr unable to end transaction %s", err)
}
p.inFlight.Done()
continue
}

if msg.flags&shutdown != 0 {
shuttingDown = true
p.inFlight.Done()
continue
} else if msg.retries == 0 {
}

if msg.retries == 0 {
if shuttingDown {
// we can't just call returnError here because that decrements the wait group,
// which hasn't been incremented yet for this message, and shouldn't be
Expand All @@ -353,6 +426,13 @@ func (p *asyncProducer) dispatcher() {
continue
}
p.inFlight.Add(1)
// Ignore retried msg, there are already in txn.
// Can't produce new record when transaction is not started.
if p.IsTransactional() && p.txnmgr.currentTxnStatus()&ProducerTxnFlagInTransaction == 0 {
Logger.Printf("attempt to send message when transaction is not started or is in ending state, got %d, expect %d\n", p.txnmgr.currentTxnStatus(), ProducerTxnFlagInTransaction)
p.returnError(msg, ErrTransactionNotReady)
continue
}
}

for _, interceptor := range p.conf.Producer.Interceptors {
Expand Down Expand Up @@ -605,6 +685,10 @@ func (pp *partitionProducer) dispatch() {
msg.hasSequence = true
}

if pp.parent.IsTransactional() {
pp.parent.txnmgr.maybeAddPartitionToCurrentTxn(pp.topic, pp.partition)
}

pp.brokerProducer.input <- msg
}
}
Expand Down Expand Up @@ -715,6 +799,16 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
}
}(set)

if p.IsTransactional() {
// Add partition to tx before sending current batch
err := p.txnmgr.publishTxnPartitions()
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
}

// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
Expand Down Expand Up @@ -1152,10 +1246,26 @@ func (p *asyncProducer) bumpIdempotentProducerEpoch() {
}
}

func (p *asyncProducer) maybeTransitionToErrorState(err error) error {
if errors.Is(err, ErrClusterAuthorizationFailed) ||
errors.Is(err, ErrProducerFenced) ||
errors.Is(err, ErrUnsupportedVersion) ||
errors.Is(err, ErrTransactionalIDAuthorizationFailed) {
return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagFatalError, err)
}
if p.txnmgr.coordinatorSupportsBumpingEpoch && p.txnmgr.currentTxnStatus()&ProducerTxnFlagEndTransaction == 0 {
p.txnmgr.epochBumpRequired = true
}
return p.txnmgr.transitionTo(ProducerTxnFlagInError|ProducerTxnFlagAbortableError, err)
}

func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
if p.IsTransactional() {
_ = p.maybeTransitionToErrorState(err)
}
// We need to reset the producer ID epoch if we set a sequence number on it, because the broker
// will never see a message with this number, so we can never continue the sequence.
if msg.hasSequence {
if !p.IsTransactional() && msg.hasSequence {
Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
p.bumpIdempotentProducerEpoch()
}
Expand Down
Loading

0 comments on commit 6c913d4

Please sign in to comment.