Skip to content

Commit

Permalink
Fix the compatibility issue with Pulsar 2.11.0 (streamnative#1734)
Browse files Browse the repository at this point in the history
### Motivation

apache/pulsar#17696 removes the
`powermock-reflect` dependency, which leads to a compilation error in
KoP.

### Modifications

Upgrade the Pulsar dependency to 2.11.0.0-rc5. Then replace
`Whitebox.invokeMethod` with `MethodUtils.invokeMethod`.

It also fixes the bug that null values are not handled well for
non-batched messages. This bug was exposed because of
[PIP-189](apache/pulsar#16605).
  • Loading branch information
BewareMyPower authored and eolivelli committed Feb 28, 2023
1 parent b1f88c7 commit 9b34d1c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,19 @@ public static DecodeResult decodePulsarEntryToKafkaRecords(final MessageMetadata
final long timestamp = (metadata.getEventTime() > 0)
? metadata.getEventTime()
: metadata.getPublishTime();
final ByteBuffer value = metadata.isNullValue() ? null : getNioBuffer(uncompressedPayload);
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
final Header[] headers = getHeadersFromMetadata(metadata.getPropertiesList());
builder.appendWithOffset(baseOffset,
timestamp,
getKeyByteBuffer(metadata),
getNioBuffer(uncompressedPayload),
value,
headers);
} else {
builder.appendWithOffset(baseOffset,
timestamp,
getKeyByteBuffer(metadata),
getNioBuffer(uncompressedPayload));
value);
}
}

Expand Down
6 changes: 5 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
<lombok.version>1.18.24</lombok.version>
<mockito.version>2.22.0</mockito.version>
<pulsar.group.id>io.streamnative</pulsar.group.id>
<pulsar.version>2.11.0.0-rc3</pulsar.version>
<pulsar.version>2.11.0.0-rc5</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
<spotbugs-annotations.version>3.1.12</spotbugs-annotations.version>
<apicurio.version>2.1.3.Final</apicurio.version>
Expand Down Expand Up @@ -492,6 +492,10 @@
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>ossrh01</id>
<url>https://s01.oss.sonatype.org/service/local/repositories/iostreamnative-2022/content</url>
</repository>
</repositories>

<distributionManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.reflect.MethodUtils;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.awaitility.Awaitility;
import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -187,15 +187,15 @@ public void testGetProducerIdWithTTL() throws Exception {

PersistentTopicInternalStats stats = pulsar.getAdminClient().topics().getInternalStats(topic);
log.info("stats {}", stats);
Whitebox.invokeMethod(pulsar.getBrokerService(), "checkConsumedLedgers");
MethodUtils.invokeMethod(pulsar.getBrokerService(), true, "checkConsumedLedgers");

// wait for topic to be automatically trimmed
Awaitility
.await()
.pollInterval(5, TimeUnit.SECONDS)
.untilAsserted(
() -> {
Whitebox.invokeMethod(pulsar.getBrokerService(), "checkConsumedLedgers");
MethodUtils.invokeMethod(pulsar.getBrokerService(), true, "checkConsumedLedgers");
PersistentTopicInternalStats stats2 = pulsar.getAdminClient().topics().getInternalStats(topic);
log.info("stats2 {}", stats2);
assertEquals(0, stats2.numberOfEntries);
Expand Down

0 comments on commit 9b34d1c

Please sign in to comment.