Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partition difinite compatibility with librdkafka #1213

Closed
DeathKR opened this issue Nov 14, 2018 · 2 comments
Closed

Partition difinite compatibility with librdkafka #1213

DeathKR opened this issue Nov 14, 2018 · 2 comments
Labels
maintenance question stale Issues and pull requests without any recent activity

Comments

@DeathKR
Copy link

DeathKR commented Nov 14, 2018

Versions

Sarama Version: All
Kafka Version: All
Go Version: All

Problem Description

We are using Sarama for golang and Confluentinc/confluent-kafka-dotnet (bindings to librdkafka) for C#. We see a diference in partition definition. Librdkafka use crc32, i wrote CustomHashPartitioner, code:

`
crc32q := crc32.MakeTable(crc32.IEEE)

hasher:= func() hash.Hash32 {
	return crc32.New(crc32q)
}

dict := map[string]int{
	"593b1335-381e-4281-b339-6c85cf06266a": 5,
	"53aaab45-67b3-468a-a9a7-b74c46868bd1": 0,
	"f63d51ca-e9c9-4703-987b-8d83c1efdf20": 7,
	"d4279299-34c6-40a4-8796-ac6ece9385cb": 8,
	"66b91fdb-f1a0-458b-a8c6-cde77465c4f3": 4,
	"d4eeeb1b-8c45-4250-9c2c-131b38208ee8": 5,
}

for key,v := range dict {
	partitioner := sarama.NewCustomHashPartitioner(hasher)("topic")
	msg := sarama.ProducerMessage{}
	msg.Key = sarama.StringEncoder(key)

	crc := crc32.Checksum([]byte(key), crc32q)
	resu := (crc ) % 9

	choice, _ := partitioner.Partition(&msg, 9)
	fmt.Println("[key]:",key, "[librdkafka]:", v, "[sarama]: ", choice, "[checksum % num]:", resu, crc)
}

`

And out:

[key]: 66b91fdb-f1a0-458b-a8c6-cde77465c4f3 [librdkafka]: 4 [sarama]: 4 [checksum % num]: 4 280646581 [key]: d4eeeb1b-8c45-4250-9c2c-131b38208ee8 [librdkafka]: 5 [sarama]: 8 [checksum % num]: 5 2542580951 [key]: 593b1335-381e-4281-b339-6c85cf06266a [librdkafka]: 5 [sarama]: 5 [checksum % num]: 5 482296334 [key]: 53aaab45-67b3-468a-a9a7-b74c46868bd1 [librdkafka]: 0 [sarama]: 0 [checksum % num]: 0 567591138 [key]: f63d51ca-e9c9-4703-987b-8d83c1efdf20 [librdkafka]: 7 [sarama]: 6 [checksum % num]: 7 2826634219 [key]: d4279299-34c6-40a4-8796-ac6ece9385cb [librdkafka]: 8 [sarama]: 5 [checksum % num]: 8 3979728134

As we can see partition definition is different, but 'crc32.Checksum % num' work as expected right. The problem in file partitioner.go on this line:

partition = int32(p.hasher.Sum32()) % numPartitions

Sarama always get signed integer and lose right value. If we change this line - all will be fine.

partition = int32(uint32(p.hasher.Sum32()) % uint32(numPartitions))

I know that comunity don't say thank to me, because with fix have terrible influence on work system (messages will go to another partition than before). May be need to code new default partition function and leave without change for compatibility?

@jgeiger
Copy link
Contributor

jgeiger commented Mar 26, 2019

The forced conversion of the p.hasher.Sum32() to an int32 breaks compatibility with other kafka libraries including https://github.com/zendesk/ruby-kafka which is using a crc32 key hasher.

gore> :import "hash/crc32"

// simple calculation
gore> z := crc32.ChecksumIEEE([]byte("01550EA4"))
(uint32)2636234337

// using the method in the library
gore> p := crc32.NewIEEE()
gore> n, _ := pp.Write([]byte("01550EA4"))
gore> p.Sum32()
(uint32)2636234337

// comparing the results when modded, including the forced cast
gore> zz := crc32.ChecksumIEEE([]byte("01550EA4"))
(uint32)2636234337
gore> int32(zz) % 840
(int32)-79
gore> zz % 840
(uint32)177

The uint32's match, but forcing the cast to int32 then breaks the result. As above, I'm not sure what the path forward is because there are probably people relying on the broken implementation.

@ghost
Copy link

ghost commented Feb 21, 2020

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
maintenance question stale Issues and pull requests without any recent activity
Projects
None yet
Development

No branches or pull requests

3 participants