Skip to content

Commit

Permalink
[improve][broker] Support customized shadow managed ledger implementa…
Browse files Browse the repository at this point in the history
…tion (#23179)
  • Loading branch information
BewareMyPower authored Aug 16, 2024
1 parent a1f3322 commit 7f04364
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.mledger.util.ManagedLedgerImplUtils;
import org.apache.bookkeeper.net.BookieId;
Expand Down Expand Up @@ -426,7 +425,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
this.transactionBuffer = new TransactionBufferDisable(this);
}
transactionBuffer.syncMaxReadPositionForNormalPublish(ledger.getLastConfirmedEntry(), true);
if (ledger instanceof ShadowManagedLedgerImpl) {
if (ledger.getConfig().getShadowSource() != null) {
shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource());
} else {
shadowSourceTopic = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -198,6 +199,7 @@ public void testSplitMapWithRefreshedStatMap() throws Exception {

ManagedLedger ledger = mock(ManagedLedger.class);
when(ledger.getCursors()).thenReturn(new ArrayList<>());
when(ledger.getConfig()).thenReturn(new ManagedLedgerConfig());

doReturn(CompletableFuture.completedFuture(null)).when(MockOwnershipCache).disableOwnership(any(NamespaceBundle.class));
Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
Expand Down Expand Up @@ -74,7 +75,9 @@ public void setup() throws Exception {
.when(serverCnx).getCommandSender();

String topicName = TopicName.get("MessageCumulativeAckTest").toString();
PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), pulsarTestContext.getBrokerService());
var mockManagedLedger = mock(ManagedLedger.class);
when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig());
var persistentTopic = new PersistentTopic(topicName, mockManagedLedger, pulsarTestContext.getBrokerService());
sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
mock(ManagedCursorImpl.class), false));
doNothing().when(sub).acknowledgeMessage(any(), any(), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ void setupMLAsyncCallbackMocks() {
cursorMock = mock(ManagedCursorImpl.class);

doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn("mockCursor").when(cursorMock).getName();

// call openLedgerComplete with ledgerMock on ML factory asyncOpen
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ public void teardown() throws Exception {
@Test
public void testCreateTopic() {
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();

final String topicName = "persistent://prop/use/ns-abc/topic1";
Expand Down Expand Up @@ -366,6 +367,7 @@ public void testPublishMessageMLFailure() throws Exception {
final String successTopicName = "persistent://prop/use/ns-abc/successTopic";

final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
Expand Down Expand Up @@ -1374,6 +1376,7 @@ void setupMLAsyncCallbackMocks() {
final CompletableFuture<Void> closeFuture = new CompletableFuture<>();

doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn("mockCursor").when(cursorMock).getName();
doReturn(true).when(cursorMock).isDurable();
// doNothing().when(cursorMock).asyncClose(new CloseCallback() {
Expand Down Expand Up @@ -1671,6 +1674,7 @@ public void testAtomicReplicationRemoval() throws Exception {
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();

PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
Expand Down Expand Up @@ -1730,6 +1734,7 @@ public void testClosingReplicationProducerTwice() throws Exception {
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doNothing().when(ledgerMock).asyncDeleteCursor(any(), any(), any());
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();

PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);

Expand Down Expand Up @@ -2120,6 +2125,7 @@ public void testTopicCloseFencingTimeout() throws Exception {
@Test
public void testGetDurableSubscription() throws Exception {
ManagedLedger mockLedger = mock(ManagedLedger.class);
doReturn(new ManagedLedgerConfig()).when(mockLedger).getConfig();
ManagedCursor mockCursor = mock(ManagedCursorImpl.class);
Position mockPosition = mock(Position.class);
doReturn("test").when(mockCursor).getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2919,6 +2919,7 @@ private void setupMLAsyncCallbackMocks() {
ledgerMock = mock(ManagedLedger.class);
cursorMock = mock(ManagedCursor.class);
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();

// call openLedgerComplete with ledgerMock on ML factory asyncOpen
doAnswer((Answer<Object>) invocationOnMock -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import static org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getEventKey;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.Sets;
import java.util.HashSet;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
Expand Down Expand Up @@ -99,7 +102,9 @@ public void testSystemTopicSchemaCompatibility() throws Exception {
TopicPoliciesSystemTopicClient systemTopicClientForNamespace1 = systemTopicFactory
.createTopicPoliciesSystemTopicClient(NamespaceName.get(NAMESPACE1));
String topicName = systemTopicClientForNamespace1.getTopicName().toString();
SystemTopic topic = new SystemTopic(topicName, mock(ManagedLedger.class), pulsar.getBrokerService());
final var mockManagedLedger = mock(ManagedLedger.class);
when(mockManagedLedger.getConfig()).thenReturn(new ManagedLedgerConfig());
SystemTopic topic = new SystemTopic(topicName, mockManagedLedger, pulsar.getBrokerService());

Assert.assertEquals(SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE, topic.getSchemaCompatibilityStrategy());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.bookkeeper.common.util.Bytes;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -1648,6 +1649,7 @@ public void testTBRecoverChangeStateError() throws InterruptedException, Timeout
// Mock managedLedger.
ManagedLedgerImpl managedLedger = mock(ManagedLedgerImpl.class);
ManagedCursorContainer managedCursors = new ManagedCursorContainer();
when(managedLedger.getConfig()).thenReturn(new ManagedLedgerConfig());
when(managedLedger.getCursors()).thenReturn(managedCursors);
Position position = PositionFactory.EARLIEST;
when(managedLedger.getLastConfirmedEntry()).thenReturn(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public boolean isTerminated() {

@Override
public ManagedLedgerConfig getConfig() {
return null;
return new ManagedLedgerConfig();
}

@Override
Expand Down

0 comments on commit 7f04364

Please sign in to comment.