Skip to content

Commit

Permalink
[feat] [broker] PIP-188 Fix cluster migration state store into local …
Browse files Browse the repository at this point in the history
…namespace policies (#21363)

Co-authored-by: Rajan Dhabalia <rdhabalia@oath.com>
  • Loading branch information
rdhabalia and Rajan Dhabalia authored Oct 25, 2023
1 parent 8056987 commit 3a9f99f
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.EntryFilters;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -304,7 +305,9 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
// fetch bundles from LocalZK-policies
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
Optional<LocalPolicies> localPolicies = getLocalPolicies().getLocalPolicies(namespaceName);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
policies.migrated = localPolicies.isPresent() ? localPolicies.get().migrated : false;
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
Expand All @@ -321,32 +324,31 @@ protected Policies getNamespacePolicies(NamespaceName namespaceName) {
}

protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
return namespaceResources().getPoliciesAsync(namespaceName).thenCompose(policies -> {
if (policies.isPresent()) {
return pulsar()
.getNamespaceService()
.getNamespaceBundleFactory()
.getBundlesAsync(namespaceName)
.thenCompose(bundles -> {
BundlesData bundleData = null;
try {
bundleData = bundles.getBundlesData();
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
return FutureUtil.failedFuture(new RestException(e));
}
policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
if (policies.get().is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.get().is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
CompletableFuture<Policies> result = new CompletableFuture<>();
namespaceResources().getPoliciesAsync(namespaceName)
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> {
if (pl.isPresent()) {
Policies policies = pl.get();
if (localPolicies.isPresent()) {
policies.bundles = localPolicies.get().bundles;
policies.migrated = localPolicies.get().migrated;
}
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return
// broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
result.complete(policies);
} else {
result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
return CompletableFuture.completedFuture(policies.get());
return null;
}).exceptionally(ex -> {
result.completeExceptionally(ex.getCause());
return null;
});
} else {
return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
});
return result;
}

protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2646,8 +2646,8 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse, BacklogQu
protected void internalEnableMigration(boolean migrated) {
validateSuperUserAccess();
try {
updatePolicies(namespaceName, policies -> {
policies.isMigrated = migrated;
getLocalPolicies().setLocalPolicies(namespaceName, (policies) -> {
policies.migrated = migrated;
return policies;
});
log.info("Successfully updated migration on namespace {}", namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1384,9 +1384,9 @@ public static CompletableFuture<Optional<ClusterUrl>> getMigratedClusterUrlAsync
}

private static CompletableFuture<Boolean> isNamespaceMigrationEnabledAsync(PulsarService pulsar, String topic) {
return pulsar.getPulsarResources().getNamespaceResources().
getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(policies -> policies.isPresent() && policies.get().isMigrated);
return pulsar.getPulsarResources().getLocalPolicies()
.getLocalPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenApply(policies -> policies.isPresent() && policies.get().migrated);
}

public static Optional<ClusterUrl> getMigratedClusterUrl(PulsarService pulsar, String topic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,7 @@ public void testNamespaceMigrationWithReplicationBacklog(SubscriptionType subTyp
pulsar2.getBrokerServiceUrl(), pulsar2.getBrokerServiceUrlTls());
admin1.clusters().updateClusterMigration("r1", isClusterMigrate, migratedUrl);
admin1.namespaces().updateMigrationState(namespace, isNamespaceMigrate);
assertEquals(admin1.namespaces().getPolicies(namespace).migrated, isNamespaceMigrate);
log.info("update cluster migration called");

retryStrategically((test) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public class Policies {
@SuppressWarnings("checkstyle:MemberName")
public String resource_group_name = null;

public boolean isMigrated;
public boolean migrated;

public enum BundleType {
LARGEST, HOT;
Expand Down Expand Up @@ -158,7 +158,7 @@ public int hashCode() {
offload_policies,
subscription_types_enabled,
properties,
resource_group_name, entryFilters);
resource_group_name, entryFilters, migrated);
}

@Override
Expand Down Expand Up @@ -204,6 +204,7 @@ public boolean equals(Object obj) {
&& Objects.equals(offload_policies, other.offload_policies)
&& Objects.equals(subscription_types_enabled, other.subscription_types_enabled)
&& Objects.equals(properties, other.properties)
&& Objects.equals(migrated, other.migrated)
&& Objects.equals(resource_group_name, other.resource_group_name)
&& Objects.equals(entryFilters, other.entryFilters);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class LocalPolicies {
public final BookieAffinityGroupData bookieAffinityGroup;
// namespace anti-affinity-group
public final String namespaceAntiAffinityGroup;
public boolean migrated;

public LocalPolicies() {
bundles = defaultBundle();
Expand Down

0 comments on commit 3a9f99f

Please sign in to comment.