From 596d6995ccbd4e51fd86dd34c6019f88a6d8109e Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Sun, 4 Sep 2022 00:24:40 +0800 Subject: [PATCH 1/3] [improve][broker] Using `handle` instead of `handleAsync` to avoid using common pool thread (#17403) * Using `handle` instead of `handleAsync` to avoid using common pool thread. * fix deadlock. --- .../schema/BookkeeperSchemaStorage.java | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java index 88a94198f4ff3..8043d3527de06 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java @@ -198,9 +198,7 @@ private CompletableFuture getSchema(String schemaId) { if (log.isDebugEnabled()) { log.debug("[{}] Fetching schema from store", schemaId); } - CompletableFuture future = new CompletableFuture<>(); - - getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> { + return getSchemaLocator(getSchemaPath(schemaId)).thenCompose(locator -> { if (log.isDebugEnabled()) { log.debug("[{}] Got schema locator {}", schemaId, locator); } @@ -213,22 +211,12 @@ private CompletableFuture getSchema(String schemaId) { return readSchemaEntry(schemaLocator.getInfo().getPosition()) .thenApply(entry -> new StoredSchema(entry.getSchemaData().toByteArray(), new LongSchemaVersion(schemaLocator.getInfo().getVersion()))); - }).handleAsync((res, ex) -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex); - } - - // Cleanup the pending ops from the map - readSchemaOperations.remove(schemaId, future); - if (ex != null) { - future.completeExceptionally(ex); - } else { - future.complete(res); - } - return null; }); - - return future; + }).whenComplete((res, ex) -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Get operation completed. res={} -- ex={}", schemaId, res, ex); + } + readSchemaOperations.remove(schemaId); }); } From 0cbf7c35276a15ab632cbbfb4534329ff45fff98 Mon Sep 17 00:00:00 2001 From: leixin <1403342953@qq.com> Date: Sun, 4 Sep 2022 00:25:21 +0800 Subject: [PATCH 2/3] [doc] Update pulsar version in README (#17405) --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 47388cdf5574e..72e1e7c38b06a 100644 --- a/README.md +++ b/README.md @@ -97,7 +97,7 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la ## Pulsar Runtime Java Version Recommendation -- pulsar master branch +- pulsar ver > 2.10 and master branch | Pulsar Components | Java Version | | ----------------- | :-----------: | @@ -115,7 +115,7 @@ components in the Pulsar ecosystem, including connectors, adapters, and other la | CLI | 8 or 11 | | Java Client | 8 or 11 | -- pulsar ver 2.8 < +- pulsar ver < 2.8 | Pulsar Components | Java Version | | ----------------- | :----------: | From d139d884bcf38d8d9f2ff99bb355591819b85ef5 Mon Sep 17 00:00:00 2001 From: ken <1647023764@qq.com> Date: Sun, 4 Sep 2022 12:06:55 +0800 Subject: [PATCH 3/3] [improve] clean the empty topicAuthenticationMap in zk when revoke permission (#16815) Co-authored-by: fanjianye --- .../admin/impl/PersistentTopicsBase.java | 8 +++- .../AuthenticatedProducerConsumerTest.java | 41 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9d13c43ed3dbe..6fb839bbd3647 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -361,7 +361,13 @@ private CompletableFuture revokePermissionsAsync(String topicUri, String r } // Write the new policies to metadata store return namespaceResources().setPoliciesAsync(namespaceName, p -> { - p.auth_policies.getTopicAuthentication().get(topicUri).remove(role); + p.auth_policies.getTopicAuthentication().computeIfPresent(topicUri, (k, roles) -> { + roles.remove(role); + if (roles.isEmpty()) { + return null; + } + return roles; + }); return p; }).thenAccept(__ -> log.info("[{}] Successfully revoke access for role {} - topic {}", clientAppId(), role, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java index 5a4f5dd48774a..0279116821094 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthenticatedProducerConsumerTest.java @@ -466,4 +466,45 @@ public void testTlsTransportWithAnyAuth(Supplier url, Authentication aut @Cleanup Producer ignored = client.newProducer().topic(topicName).create(); } + + @Test + public void testCleanupEmptyTopicAuthenticationMap() throws Exception { + Map authParams = new HashMap<>(); + authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH); + authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH); + Authentication authTls = new AuthenticationTls(); + authTls.configure(authParams); + internalSetup(authTls); + + admin.clusters().createCluster("test", ClusterData.builder().build()); + admin.tenants().createTenant("p1", + new TenantInfoImpl(Collections.emptySet(), new HashSet<>(admin.clusters().getClusters()))); + admin.namespaces().createNamespace("p1/ns1"); + + String topic = "persistent://p1/ns1/topic"; + admin.topics().createNonPartitionedTopic(topic); + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + + // grant permission + admin.topics().grantPermission(topic, "test-user-1", EnumSet.of(AuthAction.consume)); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + }); + + // revoke permission + admin.topics().revokePermissions(topic, "test-user-1"); + Awaitility.await().untilAsserted(() -> { + assertFalse(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + }); + + // grant permission again + admin.topics().grantPermission(topic, "test-user-1", EnumSet.of(AuthAction.consume)); + Awaitility.await().untilAsserted(() -> { + assertTrue(pulsar.getPulsarResources().getNamespaceResources().getPolicies(NamespaceName.get("p1/ns1")) + .get().auth_policies.getTopicAuthentication().containsKey(topic)); + }); + } }