Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer retrieving wrong offsets after transaction #525

Open
6 of 7 tasks
roignpar opened this issue Sep 8, 2020 · 3 comments
Open
6 of 7 tasks

Consumer retrieving wrong offsets after transaction #525

roignpar opened this issue Sep 8, 2020 · 3 comments

Comments

@roignpar
Copy link

roignpar commented Sep 8, 2020

Description

confluent-kafka-go: 1.4.2
librdkafka: 17040127 1.4.2-dirty
broker: cp-kafka docker image, tried with 5.3.1 (Kafka 2.3.0) and 5.5.1 (Kafka 2.5.0)
OS: Linux 5.7.17-2-MANJARO

When sending consumer offsets as part of a transaction and retrieving the consumer offsets with .Committed immediately after committing the transaction the returned offsets are not the ones sent in the transaction. If there is a small delay between committing and retrieving, the offsets are correct.

How to reproduce

package main

import (
	"fmt"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

var topic = "go_transaction_test"
var timeout = 5000

func main() {
	// consumer
	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers":  "localhost",
		"group.id":           "go_transaction_example",
		"enable.auto.commit": "false",
		"isolation.level":    "read_committed",
		"debug":              "consumer",
	})
	if err != nil {
		panic(err)
	}
	defer c.Close()

	c.SubscribeTopics([]string{topic}, nil)

	// producer
	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers":  "localhost",
		"message.timeout.ms": "5000",
		"enable.idempotence": "true",
		"transactional.id":   "go_transaction_test_producer",
		"debug":              "eos",
	})
	if err != nil {
		panic(err)
	}
	defer p.Close()

	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Delivery failed: %v\n", ev.TopicPartition.Error)
				}
			}
		}
	}()

	if err = p.InitTransactions(nil); err != nil {
		panic(err)
	}
	if err = p.BeginTransaction(); err != nil {
		panic(err)
	}

	// populate topic
	for i := 0; i < 30; i++ {
		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 1},
		}, nil)
	}

	if err = p.CommitTransaction(nil); err != nil {
		panic(err)
	}

	// commit initial consumer offsets
	_, err =
		c.CommitOffsets(kafka.TopicPartitions{{Topic: &topic, Partition: 1, Offset: 10}})
	if err != nil {
		panic(err)
	}

	printCommitted(c, "before transaction")

	if err = p.BeginTransaction(); err != nil {
		panic(err)
	}

	cgm, err := c.GetConsumerGroupMetadata()
	if err != nil {
		panic(err)
	}

	// send offsets in transaction
	if err = p.SendOffsetsToTransaction(nil,
		kafka.TopicPartitions{{Topic: &topic, Partition: 1, Offset: 20}},
		cgm); err != nil {
		panic(err)
	}

	if err = p.CommitTransaction(nil); err != nil {
		panic(err)
	}

	printCommitted(c, "after transaction")

	time.Sleep(100 * time.Millisecond)

	printCommitted(c, "after sleeping")
}

func printCommitted(c *kafka.Consumer, text string) {
	committed, err :=
		c.Committed(kafka.TopicPartitions{{Topic: &topic, Partition: 1}}, timeout)
	if err != nil {
		panic(err)
	}

	fmt.Printf("\nConsumer committed offsets %s: %v\n\n", text, committed)
}

Output:

%7|1599561516.018|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.4.2-dirty (0x10402ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS STATIC_LIB_zlib ZLIB STATIC_LIB_libcrypto STATIC_LIB_libssl SSL STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
%7|1599561516.019|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example": subscribe to new subscription of 1 topics (join state init)
%7|1599561516.019|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1599561516.019|INIT|rdkafka#producer-2| [thrd:app]: librdkafka v1.4.2-dirty (0x10402ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING CC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS STATIC_LIB_zlib ZLIB STATIC_LIB_libcrypto STATIC_LIB_libssl SSL STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x8000)
%7|1599561516.022|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: init_transactions
%7|1599561516.022|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change Init -> WaitPID
%7|1599561516.022|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change Init -> RequestPID
%7|1599561516.022|TXN|rdkafka#producer-2| [thrd:main]: Starting PID FSM timer (fire immediately): Starting idempotent producer
%7|1599561516.024|TXNCOORD|rdkafka#producer-2| [thrd:main]: localhost:9092/0: FindCoordinator response: Transaction coordinator is broker 0 (localhost:9092)
%7|1599561516.024|TXNCOORD|rdkafka#producer-2| [thrd:main]: Transaction coordinator changed from (none) -> localhost:9092/0: FindCoordinator response
%7|1599561516.024|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change RequestPID -> WaitTransport
%7|1599561516.024|TXN|rdkafka#producer-2| [thrd:main]: Starting PID FSM timer: No broker available
%7|1599561516.025|COORD|rdkafka#producer-2| [thrd:main]: TxnCoordinator/0: Transaction coordinator is now up
%7|1599561516.025|GETPID|rdkafka#producer-2| [thrd:main]: TxnCoordinator/0: Acquiring ProducerId
%7|1599561516.025|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change WaitTransport -> WaitPID
%7|1599561516.027|GETPID|rdkafka#producer-2| [thrd:main]: Acquired PID{Id:0,Epoch:6}
%7|1599561516.027|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change WaitPID -> Assigned
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change WaitPID -> ReadyNotAcked
%7|1599561516.027|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: init_transactions
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change ReadyNotAcked -> Ready
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change Ready -> InTransaction
%7|1599561516.027|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction (begin)
%7|1599561516.027|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change InTransaction -> BeginCommit
%7|1599561516.027|TXNCOMMIT|rdkafka#producer-2| [thrd:app]: Flushing 30 outstanding message(s) prior to commit
%7|1599561516.028|DRAIN|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [0] beginning partition drain: wait for outstanding requests to finish before producing to new leader
%7|1599561516.028|DRAIN|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [1] beginning partition drain: wait for outstanding requests to finish before producing to new leader
%7|1599561516.028|DRAIN|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [2] beginning partition drain: wait for outstanding requests to finish before producing to new leader
%7|1599561516.029|ADDPARTS|rdkafka#producer-2| [thrd:main]: TxnCoordinator/0: Adding partitions to transaction
%7|1599561516.031|ADDPARTS|rdkafka#producer-2| [thrd:main]: go_transaction_test [1] registered with transaction
%7|1599561516.031|NEWPID|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [1] changed PID{Invalid} -> PID{Id:0,Epoch:6} with base MsgId 1
%7|1599561516.031|RESETSEQ|rdkafka#producer-2| [thrd:localhost:9092/bootstrap]: go_transaction_test [1] resetting epoch base seq from 0 to 1
%7|1599561516.032|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction
%7|1599561516.033|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change BeginCommit -> CommittingTransaction
%7|1599561516.034|TXNCOMPLETE|rdkafka#producer-2| [thrd:main]: Transaction successfully committed
%7|1599561516.034|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change CommittingTransaction -> Ready
%7|1599561516.035|COMMIT|rdkafka#consumer-1| [thrd:main]: Deferring "manual" offset commit for 1 partition(s) in state query-coord: no coordinator available
%7|1599561517.026|JOIN|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example": postponing join until up-to-date metadata is available
%7|1599561517.029|REJOIN|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example": subscription updated from metadata change: rejoining group
%7|1599561517.029|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example" is rebalancing in state up (join-state init) without assignment: group rejoin
%7|1599561517.029|COMMIT|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Committing offsets for 1 partition(s): manual
%7|1599561517.031|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)

Consumer committed offsets before transaction: [go_transaction_test[1]@10]

%7|1599561517.032|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change Ready -> InTransaction
%7|1599561517.032|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: send_offsets_to_transaction
%7|1599561517.037|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction (begin)
%7|1599561517.037|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change InTransaction -> BeginCommit
%7|1599561517.037|TXNCOMMIT|rdkafka#producer-2| [thrd:app]: Flushing 0 outstanding message(s) prior to commit
%7|1599561517.037|TXNAPI|rdkafka#producer-2| [thrd:app]: Transactional API called: commit_transaction
%7|1599561517.037|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change BeginCommit -> CommittingTransaction
%7|1599561517.038|TXNCOMPLETE|rdkafka#producer-2| [thrd:main]: Transaction successfully committed
%7|1599561517.038|TXNSTATE|rdkafka#producer-2| [thrd:main]: Transaction state change CommittingTransaction -> Ready
%7|1599561517.039|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)

Consumer committed offsets after transaction: [go_transaction_test[1]@10]

%7|1599561517.140|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/0: Fetch committed offsets for 1/1 partition(s)

Consumer committed offsets after sleeping: [go_transaction_test[1]@20]

%7|1599561517.235|DESTROY|rdkafka#producer-2| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1599561517.235|IDEMPSTATE|rdkafka#producer-2| [thrd:main]: Idempotent producer state change Assigned -> Terminate
%7|1599561517.235|DESTROY|rdkafka#producer-2| [thrd:main]: Destroy internal
%7|1599561517.235|DESTROY|rdkafka#producer-2| [thrd:main]: Removing all topics
%7|1599561517.235|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1599561517.235|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%7|1599561517.235|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "go_transaction_example" is rebalancing in state up (join-state init) without assignment: unsubscribe
%7|1599561517.235|LEAVE|rdkafka#consumer-1| [thrd:main]: localhost:9092/0: Leaving group
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1599561517.239|DESTROY|rdkafka#consumer-1| [thrd:app]: Terminating instance (destroy flags none (0x0))
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Closing consumer
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Waiting for close events
%3|1599561517.239|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: 1/1 brokers are down: Local: All broker connections are down
%7|1599561517.239|CLOSE|rdkafka#consumer-1| [thrd:app]: Consumer closed
%7|1599561517.239|DESTROY|rdkafka#consumer-1| [thrd:main]: Destroy internal
%7|1599561517.239|DESTROY|rdkafka#consumer-1| [thrd:main]: Removing all topics

Broker logs are quite verbose, seems to ignore LOG4J env config vars.

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (LibraryVersion()):
  • Apache Kafka broker version:
  • Client configuration: ConfigMap{...}
  • Operating system:
  • Provide client logs (with "debug": ".." as necessary)
  • Provide broker log excerpts
  • Critical issue
@dtheodor
Copy link
Contributor

@edenhill can you provide a comment here? This issue is on librdkafka, does not have to do with the go wrapper

@sneko
Copy link

sneko commented Mar 19, 2021

@roignpar not sure that's your issue but: from what I remember the transaction adds some "markers" as message into the topic, and the .Committed() won't reflect the truth.

Try to use .Position() with the right TopicPartition{} instead :)

@jliunyu
Copy link
Contributor

jliunyu commented Mar 18, 2022

Since this is an old version, do you still see this error with the latest version?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants