diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java index 7a4126fedec64..70437d07dbee0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSet.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.utils; -import java.util.Iterator; import java.util.Map; import java.util.NavigableMap; import java.util.NavigableSet; @@ -29,8 +28,13 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.pulsar.common.util.collections.LongPairSet; +import org.roaringbitmap.PeekableIntIterator; import org.roaringbitmap.RoaringBitmap; +/** + * A concurrent set of pairs of longs. + * The right side of the value supports unsigned values up to 2^32. + */ public class ConcurrentBitmapSortedLongPairSet { private final NavigableMap map = new TreeMap<>(); @@ -139,10 +143,12 @@ public > void processItems(LongPairSet.LongPairFunction< lock.readLock().lock(); try { for (Map.Entry entry : map.entrySet()) { - Iterator iterator = entry.getValue().stream().iterator(); + PeekableIntIterator intIterator = entry.getValue().getIntIterator(); boolean continueProcessing = true; - while (continueProcessing && iterator.hasNext()) { - T item = longPairConverter.apply(entry.getKey(), iterator.next()); + while (continueProcessing && intIterator.hasNext()) { + // RoaringBitmap encodes values as unsigned 32-bit integers internally, it's necessary to use + // Integer.toUnsignedLong to convert them to unsigned long values + T item = longPairConverter.apply(entry.getKey(), Integer.toUnsignedLong(intIterator.next())); continueProcessing = itemProcessor.process(item); } if (!continueProcessing) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java index 5f8f13288cfe8..34f971e8841ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/utils/ConcurrentBitmapSortedLongPairSetTest.java @@ -18,18 +18,19 @@ */ package org.apache.pulsar.utils; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; -import lombok.Cleanup; -import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; -import org.testng.annotations.Test; import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import lombok.Cleanup; +import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet; +import org.testng.annotations.Test; @Test(groups = "utils") public class ConcurrentBitmapSortedLongPairSetTest { @@ -204,4 +205,27 @@ public void concurrentInsertions() throws Throwable { assertEquals(set.size(), N * nThreads); } + + @Test + public void testValueLargerThanIntegerMAX_VALUE() { + ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet(); + long baseValue = Integer.MAX_VALUE; + List addedValues = new ArrayList<>(); + int items = 10; + for (int i = 0; i < items; i++) { + long value = baseValue + i; + set.add(1, value); + addedValues.add(value); + } + assertEquals(set.size(), items); + List values = new ArrayList<>(); + set.processItems((item1, item2) -> { + assertEquals(item1, 1); + return item2; + }, (value) -> { + values.add(value); + return true; + }); + assertThat(values).containsExactlyElementsOf(addedValues); + } }