Skip to content

Commit

Permalink
[cleanup][broker] Various cleanups (#20658)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Jun 30, 2023
1 parent e360379 commit 8d53035
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver;
import org.apache.pulsar.metadata.bookkeeper.PulsarMetadataClientDriver;

@SuppressWarnings("deprecation")
@Slf4j
public class BookKeeperClientFactoryImpl implements BookKeeperClientFactory {

Expand All @@ -71,7 +70,7 @@ public BookKeeper create(ServiceConfiguration conf, MetadataStoreExtended store,

ClientConfiguration bkConf = createBkClientConfiguration(store, conf);
if (properties != null) {
properties.forEach((key, value) -> bkConf.setProperty(key, value));
properties.forEach(bkConf::setProperty);
}
if (ensemblePlacementPolicyClass.isPresent()) {
setEnsemblePlacementPolicy(bkConf, conf, store, ensemblePlacementPolicyClass.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,15 @@ public void close() throws IOException {
// factory, however that might be introducing more unknowns.
log.warn("Encountered exceptions on closing bookkeeper client", ree);
}
if (bkEnsemblePolicyToBkClientMap != null) {
bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
try {
if (bk != null) {
bk.close();
}
} catch (Exception e) {
log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
bkEnsemblePolicyToBkClientMap.forEach((policy, bk) -> {
try {
if (bk != null) {
bk.close();
}
});
}
} catch (Exception e) {
log.warn("Failed to close bookkeeper-client for policy {}", policy, e);
}
});
log.info("Closed BookKeeper client");
} catch (Exception e) {
log.warn(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,12 @@
/**
* Main class for Pulsar broker service.
*/

@Getter(AccessLevel.PUBLIC)
@Setter(AccessLevel.PROTECTED)
public class PulsarService implements AutoCloseable, ShutdownService {
private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class);
private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d;
private ServiceConfiguration config = null;
private final ServiceConfiguration config;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
private LeaderElectionService leaderElectionService = null;
Expand Down Expand Up @@ -255,7 +254,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private AdditionalServlets brokerAdditionalServlets;

// packages management service
private Optional<PackagesManagement> packagesManagement = Optional.empty();
private PackagesManagement packagesManagement = null;
private PulsarPrometheusMetricsServlet metricsServlet;
private List<PrometheusRawMetricsProvider> pendingMetricsProviders;

Expand Down Expand Up @@ -285,10 +284,8 @@ public enum State {
private Map<String, AdvertisedListener> advertisedListeners;

public PulsarService(ServiceConfiguration config) {
this(config, Optional.empty(), (exitCode) -> {
LOG.info("Process termination requested with code {}. "
+ "Ignoring, as this constructor is intended for tests. ", exitCode);
});
this(config, Optional.empty(), (exitCode) -> LOG.info("Process termination requested with code {}. "
+ "Ignoring, as this constructor is intended for tests. ", exitCode));
}

public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService,
Expand Down Expand Up @@ -370,7 +367,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro

/**
* Close the session to the metadata service.
*
* <p>
* This will immediately release all the resource locks held by this broker on the coordination service.
*
* @throws Exception if the close operation fails
Expand Down Expand Up @@ -400,8 +397,12 @@ public void close() throws PulsarServerException {
throw (PulsarServerException) cause;
} else if (getConfiguration().getBrokerShutdownTimeoutMs() == 0
&& (cause instanceof TimeoutException || cause instanceof CancellationException)) {
// ignore shutdown timeout when timeout is 0, which is primarily used in tests
// to forcefully shutdown the broker
if (LOG.isDebugEnabled()) {
LOG.debug(
"Shutdown timeout ignored when timeout is 0, "
+ "which is primarily used in tests to forcefully shutdown the broker",
cause);
}
} else {
throw new PulsarServerException(cause);
}
Expand Down Expand Up @@ -693,7 +694,7 @@ public void start() throws PulsarServerException {
throw new PulsarServerException("Cannot start the service once it was stopped");
}

if (!config.getWebServicePort().isPresent() && !config.getWebServicePortTls().isPresent()) {
if (config.getWebServicePort().isEmpty() && config.getWebServicePortTls().isEmpty()) {
throw new IllegalArgumentException("webServicePort/webServicePortTls must be present");
}

Expand Down Expand Up @@ -722,7 +723,7 @@ public void start() throws PulsarServerException {
config.getDefaultRetentionTimeInMinutes() * 60));
}

if (!config.getLoadBalancerOverrideBrokerNicSpeedGbps().isPresent()
if (config.getLoadBalancerOverrideBrokerNicSpeedGbps().isEmpty()
&& config.isLoadBalancerEnabled()
&& LinuxInfoUtils.isLinux()
&& !LinuxInfoUtils.checkHasNicSpeeds()) {
Expand Down Expand Up @@ -896,7 +897,7 @@ public void start() throws PulsarServerException {
if (isNotBlank(config.getResourceUsageTransportClassName())) {
Class<?> clazz = Class.forName(config.getResourceUsageTransportClassName());
Constructor<?> ctor = clazz.getConstructor(PulsarService.class);
Object object = ctor.newInstance(new Object[]{this});
Object object = ctor.newInstance(this);
this.resourceUsageTransportManager = (ResourceUsageTopicTransportManager) object;
}
this.resourceGroupServiceManager = new ResourceGroupService(this);
Expand Down Expand Up @@ -1241,7 +1242,6 @@ protected void startLoadManagementService() throws PulsarServerException {
* Load all the topics contained in a namespace.
*
* @param bundle <code>NamespaceBundle</code> to identify the service unit
* @throws Exception
*/
public void loadNamespaceTopics(NamespaceBundle bundle) {
executor.submit(() -> {
Expand Down Expand Up @@ -1296,7 +1296,7 @@ public InternalConfigurationData getInternalConfigurationData() {
config.getConfigurationMetadataStoreUrl(),
new ClientConfiguration().getZkLedgersRootPath(),
config.isBookkeeperMetadataStoreSeparated() ? config.getBookkeeperMetadataStoreUrl() : null,
this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
this.getWorkerConfig().map(WorkerConfig::getStateStorageServiceUrl).orElse(null));
}

/**
Expand Down Expand Up @@ -1411,7 +1411,7 @@ public LedgerOffloader createManagedLedgerOffloader(OffloadPoliciesImpl offloadP
Offloaders offloaders = offloadersCache.getOrLoadOffloaders(
offloadPolicies.getOffloadersDirectory(), config.getNarExtractionDirectory());

LedgerOffloaderFactory offloaderFactory = offloaders.getOffloaderFactory(
LedgerOffloaderFactory<?> offloaderFactory = offloaders.getOffloaderFactory(
offloadPolicies.getManagedLedgerOffloadDriver());
try {
return offloaderFactory.create(
Expand Down Expand Up @@ -1699,7 +1699,8 @@ public String webAddress(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http");
return internalListener.getBrokerHttpUrl() != null
? internalListener.getBrokerHttpUrl().toString()
: webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
: webAddress(ServiceConfigurationUtils.getWebServiceAddress(config),
getListenPortHTTP().orElseThrow());
} else {
return null;
}
Expand All @@ -1714,7 +1715,8 @@ public String webAddressTls(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https");
return internalListener.getBrokerHttpsUrl() != null
? internalListener.getBrokerHttpsUrl().toString()
: webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
: webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config),
getListenPortHTTPS().orElseThrow());
} else {
return null;
}
Expand All @@ -1736,7 +1738,7 @@ public String getSafeBrokerServiceUrl() {
public String getLookupServiceAddress() {
return String.format("%s:%s", advertisedAddress, config.getWebServicePort().isPresent()
? config.getWebServicePort().get()
: config.getWebServicePortTls().get());
: config.getWebServicePortTls().orElseThrow());
}

public TopicPoliciesService getTopicPoliciesService() {
Expand Down Expand Up @@ -1798,21 +1800,22 @@ private void startWorkerService(AuthenticationService authenticationService,
}

public PackagesManagement getPackagesManagement() throws UnsupportedOperationException {
return packagesManagement.orElseThrow(() -> new UnsupportedOperationException("Package Management Service "
+ "is not enabled in the broker."));
if (packagesManagement == null) {
throw new UnsupportedOperationException("Package Management Service is not enabled in the broker.");
}
return packagesManagement;
}

private void startPackagesManagementService() throws IOException {
// TODO: using provider to initialize the packages management service.
PackagesManagement packagesManagementService = new PackagesManagementImpl();
this.packagesManagement = Optional.of(packagesManagementService);
this.packagesManagement = new PackagesManagementImpl();
PackagesStorageProvider storageProvider = PackagesStorageProvider
.newProvider(config.getPackagesManagementStorageProvider());
DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration();
storageConfiguration.setProperty(config.getProperties());
PackagesStorage storage = storageProvider.getStorage(storageConfiguration);
storage.initialize();
packagesManagementService.initialize(storage);
this.packagesManagement.initialize(storage);
}

public Optional<Integer> getListenPortHTTP() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,13 @@ public class TransactionMetadataStoreService {
private final Timer transactionOpRetryTimer;
// this semaphore for loading one transaction coordinator with the same tc id on the same time
private final ConcurrentLongHashMap<Semaphore> tcLoadSemaphores;
// one connect request open the transactionMetaStore the other request will add to the queue, when the open op
// finished the request will be poll and complete the future
// one connect request opens the transactionMetaStore the other request will add to the queue, when the open op
// finishes the request will be polled and will complete the future
private final ConcurrentLongHashMap<ConcurrentLinkedDeque<CompletableFuture<Void>>> pendingConnectRequests;
private final ExecutorService internalPinnedExecutor;

private static final long HANDLE_PENDING_CONNECT_TIME_OUT = 30000L;

private final ThreadFactory threadFactory =
new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");


public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
PulsarService pulsarService, TransactionBufferClient tbClient,
Expand All @@ -108,6 +105,8 @@ public TransactionMetadataStoreService(TransactionMetadataStoreProvider transact
this.tcLoadSemaphores = ConcurrentLongHashMap.<Semaphore>newBuilder().build();
this.pendingConnectRequests =
ConcurrentLongHashMap.<ConcurrentLinkedDeque<CompletableFuture<Void>>>newBuilder().build();
ThreadFactory threadFactory =
new ExecutorProvider.ExtendedThreadFactory("transaction-coordinator-thread-factory");
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
}

Expand Down Expand Up @@ -200,7 +199,7 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc
// then handle the requests witch in the queue
deque.add(completableFuture);
if (LOG.isDebugEnabled()) {
LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId.toString());
LOG.debug("Handle tc client connect added into pending queue! tcId : {}", tcId);
}
}
})).exceptionally(ex -> {
Expand Down Expand Up @@ -367,17 +366,11 @@ public void endTransaction(TxnID txnID, int txnAction, boolean isTimeout,

private CompletionStage<Void> fakeAsyncCheckTxnStatus(TxnStatus txnStatus, int txnAction,
TxnID txnID, TxnStatus expectStatus) {
boolean isLegal;
switch (txnStatus) {
case COMMITTING:
isLegal = (txnAction == TxnAction.COMMIT.getValue());
break;
case ABORTING:
isLegal = (txnAction == TxnAction.ABORT.getValue());
break;
default:
isLegal = false;
}
boolean isLegal = switch (txnStatus) {
case COMMITTING -> (txnAction == TxnAction.COMMIT.getValue());
case ABORTING -> (txnAction == TxnAction.ABORT.getValue());
default -> false;
};
if (!isLegal) {
if (LOG.isDebugEnabled()) {
LOG.debug("EndTxnInTransactionBuffer op retry! TxnId : {}, TxnAction : {}", txnID, txnAction);
Expand Down Expand Up @@ -502,15 +495,14 @@ public CompletableFuture<Boolean> verifyTxnOwnership(TxnID txnID, String checkOw

public void close () {
this.internalPinnedExecutor.shutdown();
stores.forEach((tcId, metadataStore) -> {
stores.forEach((tcId, metadataStore) ->
metadataStore.closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("Close transaction metadata store with id " + tcId, ex);
} else {
LOG.info("Removed and closed transaction meta store {}", tcId);
}
});
});
}));
stores.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* LoadManager runs through set of load reports collected from different brokers and generates a recommendation of
* namespace/ServiceUnit placement on machines/ResourceUnit. Each Concrete Load Manager will use different algorithms to
* generate this mapping.
*
* <p>
* Concrete Load Manager is also return the least loaded broker that should own the new namespace.
*/
public interface LoadManager {
Expand Down Expand Up @@ -88,7 +88,7 @@ default CompletableFuture<Boolean> checkOwnershipAsync(Optional<ServiceUnitId> t

/**
* Publish the current load report on ZK, forced or not.
* By default rely on method writeLoadReportOnZookeeper().
* By default, rely on method writeLoadReportOnZookeeper().
*/
default void writeLoadReportOnZookeeper(boolean force) throws Exception {
writeLoadReportOnZookeeper();
Expand Down Expand Up @@ -118,15 +118,15 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception {
* Removes visibility of current broker from loadbalancer list so, other brokers can't redirect any request to this
* broker and this broker won't accept new connection requests.
*
* @throws Exception
* @throws Exception if there is any error while disabling broker
*/
void disableBroker() throws Exception;

/**
* Get list of available brokers in cluster.
*
* @return
* @throws Exception
* @return the list of available brokers
* @throws Exception if there is any error while getting available brokers
*/
Set<String> getAvailableBrokers() throws Exception;

Expand All @@ -150,12 +150,11 @@ static LoadManager create(final PulsarService pulsar) {
// Assume there is a constructor with one argument of PulsarService.
final Object loadManagerInstance = Reflections.createInstance(conf.getLoadManagerClassName(),
Thread.currentThread().getContextClassLoader());
if (loadManagerInstance instanceof LoadManager) {
final LoadManager casted = (LoadManager) loadManagerInstance;
if (loadManagerInstance instanceof LoadManager casted) {
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ModularLoadManager) {
final LoadManager casted = new ModularLoadManagerWrapper((ModularLoadManager) loadManagerInstance);
} else if (loadManagerInstance instanceof ModularLoadManager modularLoadManager) {
final LoadManager casted = new ModularLoadManagerWrapper(modularLoadManager);
casted.initialize(pulsar);
return casted;
} else if (loadManagerInstance instanceof ExtensibleLoadManager) {
Expand Down
Loading

0 comments on commit 8d53035

Please sign in to comment.