diff --git a/partitioner.go b/partitioner.go index 57377760a7..50a345a3eb 100644 --- a/partitioner.go +++ b/partitioner.go @@ -2,6 +2,7 @@ package sarama import ( "hash" + "hash/crc32" "hash/fnv" "math/rand" "time" @@ -53,6 +54,15 @@ func WithAbsFirst() HashPartitionerOption { } } +// WithHashUnsigned means the partitioner treats the hashed value as unsigned when +// partitioning. This is intended to be combined with the crc32 hash algorithm to +// be compatible with librdkafka's implementation +func WithHashUnsigned() HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.hashUnsigned = true + } +} + // WithCustomHashFunction lets you specify what hash function to use for the partitioning func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption { return func(hp *hashPartitioner) { @@ -126,6 +136,7 @@ type hashPartitioner struct { random Partitioner hasher hash.Hash32 referenceAbs bool + hashUnsigned bool } // NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher. @@ -137,6 +148,7 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor p.random = NewRandomPartitioner(topic) p.hasher = hasher() p.referenceAbs = false + p.hashUnsigned = false return p } } @@ -148,6 +160,7 @@ func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstruct p.random = NewRandomPartitioner(topic) p.hasher = fnv.New32a() p.referenceAbs = false + p.hashUnsigned = false for _, option := range options { option(p) } @@ -164,6 +177,7 @@ func NewHashPartitioner(topic string) Partitioner { p.random = NewRandomPartitioner(topic) p.hasher = fnv.New32a() p.referenceAbs = false + p.hashUnsigned = false return p } @@ -176,6 +190,19 @@ func NewReferenceHashPartitioner(topic string) Partitioner { p.random = NewRandomPartitioner(topic) p.hasher = fnv.New32a() p.referenceAbs = true + p.hashUnsigned = false + return p +} + +// NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash +// of the encoded bytes of the message key modulus the number of partitions. This is compatible with +// librdkafka's `consistent_random` partitioner +func NewConsistentCRCHashPartitioner(topic string) Partitioner { + p := new(hashPartitioner) + p.random = NewRandomPartitioner(topic) + p.hasher = crc32.NewIEEE() + p.referenceAbs = false + p.hashUnsigned = true return p } @@ -199,6 +226,10 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3 // but not past Sarama versions if p.referenceAbs { partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions + } else if p.hashUnsigned { + // librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible + // with librdkafka's `consistent` partitioning but not past Sarama versions + partition = int32(p.hasher.Sum32() % uint32(numPartitions)) } else { partition = int32(p.hasher.Sum32()) % numPartitions if partition < 0 { diff --git a/partitioner_test.go b/partitioner_test.go index 55d954765b..7d25db1d34 100644 --- a/partitioner_test.go +++ b/partitioner_test.go @@ -2,6 +2,7 @@ package sarama import ( "crypto/rand" + "hash/crc32" "hash/fnv" "log" "testing" @@ -26,6 +27,28 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message } } +type partitionerTestCase struct { + key string + expectedPartition int32 +} + +func partitionAndAssert(t *testing.T, partitioner Partitioner, numPartitions int32, testCase partitionerTestCase) { + t.Run("partitionAndAssert "+testCase.key, func(t *testing.T) { + msg := &ProducerMessage{ + Key: StringEncoder(testCase.key), + } + + partition, err := partitioner.Partition(msg, numPartitions) + + if err != nil { + t.Error(partitioner, err) + } + if partition != testCase.expectedPartition { + t.Error(partitioner, "partitioning", testCase.key, "returned partition", partition, "but expected", testCase.expectedPartition, ".") + } + }) +} + func TestRandomPartitioner(t *testing.T) { partitioner := NewRandomPartitioner("mytopic") @@ -185,6 +208,62 @@ func TestHashPartitionerMinInt32(t *testing.T) { } } +func TestConsistentCRCHashPartitioner(t *testing.T) { + numPartitions := int32(100) + partitioner := NewConsistentCRCHashPartitioner("mytopic") + + testCases := []partitionerTestCase{ + { + key: "abc123def456", + expectedPartition: 57, + }, + { + // `SheetJS` has a crc32 hash value of 2647669026 (which is -1647298270 as a signed int32) + // Modding the signed value will give a partition of 70. Modding the unsigned value will give 26 + key: "SheetJS", + expectedPartition: 26, + }, + { + key: "9e8c7f4cf45857cfff7645d6", + expectedPartition: 24, + }, + { + key: "3900446192ff85a5f67da10c", + expectedPartition: 75, + }, + { + key: "0f4407b7a67d6d27de372198", + expectedPartition: 50, + }, + } + + for _, tc := range testCases { + partitionAndAssert(t, partitioner, numPartitions, tc) + } +} + +func TestCustomPartitionerWithConsistentHashing(t *testing.T) { + // Setting both `hashUnsigned` and the hash function to `crc32.NewIEEE` is equivalent to using `NewConsistentCRCHashPartitioner` + partitioner := NewCustomPartitioner( + WithHashUnsigned(), + WithCustomHashFunction(crc32.NewIEEE), + )("mytopic") + + // See above re: why `SheetJS` + msg := ProducerMessage{ + Key: StringEncoder("SheetJS"), + } + + choice, err := partitioner.Partition(&msg, 100) + if err != nil { + t.Error(partitioner, err) + } + expectedPartition := int32(26) + if choice != expectedPartition { + t.Error(partitioner, "returned partition", choice, "but expected", expectedPartition, ".") + } +} + func TestManualPartitioner(t *testing.T) { partitioner := NewManualPartitioner("mytopic")