Skip to content

Commit

Permalink
[improve][broker] Support values up to 2^32 in ConcurrentBitmapSorted…
Browse files Browse the repository at this point in the history
…LongPairSet (apache#23878)
  • Loading branch information
lhotari authored Jan 23, 2025
1 parent 6d8d73d commit 0f9f661
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.utils;

import java.util.Iterator;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
Expand All @@ -29,8 +28,13 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.common.util.collections.LongPairSet;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.RoaringBitmap;

/**
* A concurrent set of pairs of longs.
* The right side of the value supports unsigned values up to 2^32.
*/
public class ConcurrentBitmapSortedLongPairSet {

private final NavigableMap<Long, RoaringBitmap> map = new TreeMap<>();
Expand Down Expand Up @@ -139,10 +143,12 @@ public <T extends Comparable<T>> void processItems(LongPairSet.LongPairFunction<
lock.readLock().lock();
try {
for (Map.Entry<Long, RoaringBitmap> entry : map.entrySet()) {
Iterator<Integer> iterator = entry.getValue().stream().iterator();
PeekableIntIterator intIterator = entry.getValue().getIntIterator();
boolean continueProcessing = true;
while (continueProcessing && iterator.hasNext()) {
T item = longPairConverter.apply(entry.getKey(), iterator.next());
while (continueProcessing && intIterator.hasNext()) {
// RoaringBitmap encodes values as unsigned 32-bit integers internally, it's necessary to use
// Integer.toUnsignedLong to convert them to unsigned long values
T item = longPairConverter.apply(entry.getKey(), Integer.toUnsignedLong(intIterator.next()));
continueProcessing = itemProcessor.process(item);
}
if (!continueProcessing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
*/
package org.apache.pulsar.utils;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import lombok.Cleanup;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import lombok.Cleanup;
import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
import org.testng.annotations.Test;

@Test(groups = "utils")
public class ConcurrentBitmapSortedLongPairSetTest {
Expand Down Expand Up @@ -204,4 +205,27 @@ public void concurrentInsertions() throws Throwable {

assertEquals(set.size(), N * nThreads);
}

@Test
public void testValueLargerThanIntegerMAX_VALUE() {
ConcurrentBitmapSortedLongPairSet set = new ConcurrentBitmapSortedLongPairSet();
long baseValue = Integer.MAX_VALUE;
List<Long> addedValues = new ArrayList<>();
int items = 10;
for (int i = 0; i < items; i++) {
long value = baseValue + i;
set.add(1, value);
addedValues.add(value);
}
assertEquals(set.size(), items);
List<Long> values = new ArrayList<>();
set.processItems((item1, item2) -> {
assertEquals(item1, 1);
return item2;
}, (value) -> {
values.add(value);
return true;
});
assertThat(values).containsExactlyElementsOf(addedValues);
}
}

0 comments on commit 0f9f661

Please sign in to comment.