Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/pulsar into issue17354
Browse files Browse the repository at this point in the history
  • Loading branch information
Denovo1998 committed Sep 4, 2022
2 parents cf12237 + d139d88 commit 74f23b4
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 21 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la

## Pulsar Runtime Java Version Recommendation

- pulsar master branch
- pulsar ver > 2.10 and master branch

| Pulsar Components | Java Version |
| ----------------- | :-----------: |
Expand All @@ -115,7 +115,7 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la
| CLI | 8 or 11 |
| Java Client | 8 or 11 |

- pulsar ver 2.8 <
- pulsar ver < 2.8

| Pulsar Components | Java Version |
| ----------------- | :----------: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,13 @@ private CompletableFuture<Void> revokePermissionsAsync(String topicUri, String r
}
// Write the new policies to metadata store
return namespaceResources().setPoliciesAsync(namespaceName, p -> {
p.auth_policies.getTopicAuthentication().get(topicUri).remove(role);
p.auth_policies.getTopicAuthentication().computeIfPresent(topicUri, (k, roles) -> {
roles.remove(role);
if (roles.isEmpty()) {
return null;
}
return roles;
});
return p;
}).thenAccept(__ ->
log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,9 +198,7 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId) {
if (log.isDebugEnabled()) {
log.debug("[{}] Fetching schema from store", schemaId);
}
CompletableFuture<StoredSchema> future = new CompletableFuture<>();

getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Got schema locator {}", schemaId, locator);
}
Expand All @@ -213,22 +211,12 @@ private CompletableFuture<StoredSchema> getSchema(String schemaId) {
return readSchemaEntry(schemaLocator.getInfo().getPosition())
.thenApply(entry -> new StoredSchema(entry.getSchemaData().toByteArray(),
new LongSchemaVersion(schemaLocator.getInfo().getVersion())));
}).handleAsync((res, ex) -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex);
}

// Cleanup the pending ops from the map
readSchemaOperations.remove(schemaId, future);
if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(res);
}
return null;
});

return future;
}).whenComplete((res, ex) -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex);
}
readSchemaOperations.remove(schemaId);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,45 @@ public void testTlsTransportWithAnyAuth(Supplier<String> url, Authentication aut
@Cleanup
Producer<byte[]> ignored = client.newProducer().topic(topicName).create();
}

@Test
public void testCleanupEmptyTopicAuthenticationMap() throws Exception {
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
internalSetup(authTls);

admin.clusters().createCluster("test", ClusterData.builder().build());
admin.tenants().createTenant("p1",
new TenantInfoImpl(Collections.emptySet(), new HashSet<>(admin.clusters().getClusters())));
admin.namespaces().createNamespace("p1/ns1");

String topic = "persistent://p1/ns1/topic";
admin.topics().createNonPartitionedTopic(topic);
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
.get().auth_policies.getTopicAuthentication().containsKey(topic));

// grant permission
admin.topics().grantPermission(topic, "test-user-1", EnumSet.of(AuthAction.consume));
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
.get().auth_policies.getTopicAuthentication().containsKey(topic));
});

// revoke permission
admin.topics().revokePermissions(topic, "test-user-1");
Awaitility.await().untilAsserted(() -> {
assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
.get().auth_policies.getTopicAuthentication().containsKey(topic));
});

// grant permission again
admin.topics().grantPermission(topic, "test-user-1", EnumSet.of(AuthAction.consume));
Awaitility.await().untilAsserted(() -> {
assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1"))
.get().auth_policies.getTopicAuthentication().containsKey(topic));
});
}
}

0 comments on commit 74f23b4

Please sign in to comment.