Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug: implement unsigned modulus for partitioning with crc32 hashing #2560

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"hash"
"hash/crc32"
"hash/fnv"
"math/rand"
"time"
Expand Down Expand Up @@ -53,6 +54,15 @@ func WithAbsFirst() HashPartitionerOption {
}
}

// WithHashUnsigned means the partitioner treats the hashed value as unsigned when
// partitioning. This is intended to be combined with the crc32 hash algorithm to
// be compatible with librdkafka's implementation
func WithHashUnsigned() HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.hashUnsigned = true
}
}

// WithCustomHashFunction lets you specify what hash function to use for the partitioning
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
return func(hp *hashPartitioner) {
Expand Down Expand Up @@ -126,6 +136,7 @@ type hashPartitioner struct {
random Partitioner
hasher hash.Hash32
referenceAbs bool
hashUnsigned bool
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
Expand All @@ -137,6 +148,7 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
p.random = NewRandomPartitioner(topic)
p.hasher = hasher()
p.referenceAbs = false
p.hashUnsigned = false
return p
}
}
Expand All @@ -148,6 +160,7 @@ func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstruct
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
p.hashUnsigned = false
for _, option := range options {
option(p)
}
Expand All @@ -164,6 +177,7 @@ func NewHashPartitioner(topic string) Partitioner {
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
p.hashUnsigned = false
return p
}

Expand All @@ -176,6 +190,19 @@ func NewReferenceHashPartitioner(topic string) Partitioner {
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = true
p.hashUnsigned = false
return p
}

// NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash
// of the encoded bytes of the message key modulus the number of partitions. This is compatible with
// librdkafka's `consistent_random` partitioner
func NewConsistentCRCHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = crc32.NewIEEE()
p.referenceAbs = false
p.hashUnsigned = true
return p
}

Expand All @@ -199,6 +226,10 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
// but not past Sarama versions
if p.referenceAbs {
partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
} else if p.hashUnsigned {
// librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible
// with librdkafka's `consistent` partitioning but not past Sarama versions
partition = int32(p.hasher.Sum32() % uint32(numPartitions))
} else {
partition = int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
Expand Down
79 changes: 79 additions & 0 deletions partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"crypto/rand"
"hash/crc32"
"hash/fnv"
"log"
"testing"
Expand All @@ -26,6 +27,28 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message
}
}

type partitionerTestCase struct {
key string
expectedPartition int32
}

func partitionAndAssert(t *testing.T, partitioner Partitioner, numPartitions int32, testCase partitionerTestCase) {
t.Run("partitionAndAssert "+testCase.key, func(t *testing.T) {
msg := &ProducerMessage{
Key: StringEncoder(testCase.key),
}

partition, err := partitioner.Partition(msg, numPartitions)

if err != nil {
t.Error(partitioner, err)
}
if partition != testCase.expectedPartition {
t.Error(partitioner, "partitioning", testCase.key, "returned partition", partition, "but expected", testCase.expectedPartition, ".")
}
})
}

func TestRandomPartitioner(t *testing.T) {
partitioner := NewRandomPartitioner("mytopic")

Expand Down Expand Up @@ -185,6 +208,62 @@ func TestHashPartitionerMinInt32(t *testing.T) {
}
}

func TestConsistentCRCHashPartitioner(t *testing.T) {
numPartitions := int32(100)
partitioner := NewConsistentCRCHashPartitioner("mytopic")

testCases := []partitionerTestCase{
{
key: "abc123def456",
expectedPartition: 57,
},
{
// `SheetJS` has a crc32 hash value of 2647669026 (which is -1647298270 as a signed int32)
// Modding the signed value will give a partition of 70. Modding the unsigned value will give 26
key: "SheetJS",
expectedPartition: 26,
},
{
key: "9e8c7f4cf45857cfff7645d6",
expectedPartition: 24,
},
{
key: "3900446192ff85a5f67da10c",
expectedPartition: 75,
},
{
key: "0f4407b7a67d6d27de372198",
expectedPartition: 50,
},
}

for _, tc := range testCases {
partitionAndAssert(t, partitioner, numPartitions, tc)
}
}

func TestCustomPartitionerWithConsistentHashing(t *testing.T) {
// Setting both `hashUnsigned` and the hash function to `crc32.NewIEEE` is equivalent to using `NewConsistentCRCHashPartitioner`
partitioner := NewCustomPartitioner(
WithHashUnsigned(),
WithCustomHashFunction(crc32.NewIEEE),
)("mytopic")

// See above re: why `SheetJS`
msg := ProducerMessage{
Key: StringEncoder("SheetJS"),
}

choice, err := partitioner.Partition(&msg, 100)
if err != nil {
t.Error(partitioner, err)
}
expectedPartition := int32(26)
if choice != expectedPartition {
t.Error(partitioner, "returned partition", choice, "but expected", expectedPartition, ".")
}
}

func TestManualPartitioner(t *testing.T) {
partitioner := NewManualPartitioner("mytopic")

Expand Down
Loading