Skip to content

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason918 committed Dec 6, 2021
1 parent 09d1322 commit 948fefd
Showing 1 changed file with 14 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,15 @@
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
Expand All @@ -65,22 +73,16 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.DefaultEventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Cleanup;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -144,12 +146,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

@Test(groups = "broker")
public class PersistentTopicTest extends MockedBookKeeperTestCase {
protected PulsarService pulsar;
Expand Down Expand Up @@ -325,7 +321,7 @@ public void testPublishMessage() throws Exception {
* messageMetadata.setPublishTime(System.currentTimeMillis()); messageMetadata.setProducerName("producer-name");
* messageMetadata.setSequenceId(1);
*/
ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());
ByteBuf payload = getMessageWithMetadata("content".getBytes());

final CountDownLatch latch = new CountDownLatch(1);

Expand All @@ -341,7 +337,7 @@ public void completed(Exception e, long ledgerId, long entryId) {
public void setMetadataFromEntryData(ByteBuf entryData) {
// This method must be invoked before `completed`
assertEquals(latch.getCount(), 1);
assertEquals(entryData.array(), payload.array());
assertEquals(entryData.compareTo(payload), 0);
}
};

Expand Down Expand Up @@ -399,7 +395,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());

topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
topic.publishMessage(getMessageWithMetadata(payload.array()), (exception, ledgerId, entryId) -> {
if (exception == null) {
fail("publish should have failed");
} else {
Expand Down

0 comments on commit 948fefd

Please sign in to comment.