diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java index 62a492775e7c6..99a81f7164356 100644 --- a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java @@ -30,8 +30,11 @@ import com.google.api.client.json.gson.GsonFactory; import java.io.IOException; import java.io.Serializable; +import java.io.UnsupportedEncodingException; import java.net.CookieManager; import java.net.HttpCookie; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.Objects; @@ -78,16 +81,20 @@ class SempBasicAuthClientExecutor implements Serializable { COOKIE_MANAGER_MAP.putIfAbsent(this.cookieManagerKey, new CookieManager()); } - private static String getQueueEndpoint(String messageVpn, String queueName) { - return String.format("/monitor/msgVpns/%s/queues/%s", messageVpn, queueName); + private static String getQueueEndpoint(String messageVpn, String queueName) + throws UnsupportedEncodingException { + return String.format( + "/monitor/msgVpns/%s/queues/%s", urlEncode(messageVpn), urlEncode(queueName)); } - private static String createQueueEndpoint(String messageVpn) { - return String.format("/config/msgVpns/%s/queues", messageVpn); + private static String createQueueEndpoint(String messageVpn) throws UnsupportedEncodingException { + return String.format("/config/msgVpns/%s/queues", urlEncode(messageVpn)); } - private static String subscriptionEndpoint(String messageVpn, String queueName) { - return String.format("/config/msgVpns/%s/queues/%s/subscriptions", messageVpn, queueName); + private static String subscriptionEndpoint(String messageVpn, String queueName) + throws UnsupportedEncodingException { + return String.format( + "/config/msgVpns/%s/queues/%s/subscriptions", urlEncode(messageVpn), urlEncode(queueName)); } BrokerResponse getQueueResponse(String queueName) throws IOException { @@ -188,6 +195,10 @@ private void storeCookiesInCookieManager(HttpHeaders headers) { } } + private static String urlEncode(String queueName) throws UnsupportedEncodingException { + return URLEncoder.encode(queueName, StandardCharsets.UTF_8.name()); + } + private static class CookieManagerKey implements Serializable { private final String baseUrl; private final String username; diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java index 8cc48ed17ef6f..8e1d9f5260ac7 100644 --- a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java @@ -199,4 +199,85 @@ public LowLevelHttpResponse execute() throws IOException { // so there should be a third request with Basic Auth to create a new session. assertEquals(3, requestCounter[0]); } + + @Test + public void testGetQueueResponseEncoding() throws IOException { + MockHttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + assertTrue(url.contains("queues/queue%2Fxxx%2Fyyy")); + assertTrue(url.contains("msgVpns/vpnName%232")); + return response; + } + }; + } + }; + + HttpRequestFactory requestFactory = transport.createRequestFactory(); + SempBasicAuthClientExecutor client = + new SempBasicAuthClientExecutor( + "http://host", "username", "password", "vpnName#2", requestFactory); + + client.getQueueResponse("queue/xxx/yyy"); + } + + @Test + public void testCreateQueueResponseEncoding() throws IOException { + MockHttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + assertTrue(this.getContentAsString().contains("\"queueName\":\"queue/xxx/yyy\"")); + assertTrue(url.contains("msgVpns/vpnName%232")); + return response; + } + }; + } + }; + + HttpRequestFactory requestFactory = transport.createRequestFactory(); + SempBasicAuthClientExecutor client = + new SempBasicAuthClientExecutor( + "http://host", "username", "password", "vpnName#2", requestFactory); + + client.createQueueResponse("queue/xxx/yyy"); + } + + @Test + public void testCreateSubscriptionResponseEncoding() throws IOException { + MockHttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + assertTrue(this.getContentAsString().contains("\"queueName\":\"queue/xxx/yyy\"")); + assertTrue( + this.getContentAsString().contains("\"subscriptionTopic\":\"topic/aaa\"")); + assertTrue(url.contains("queues/queue%2Fxxx%2Fyyy/subscriptions")); + assertTrue(url.contains("msgVpns/vpnName%232")); + return response; + } + }; + } + }; + + HttpRequestFactory requestFactory = transport.createRequestFactory(); + SempBasicAuthClientExecutor client = + new SempBasicAuthClientExecutor( + "http://host", "username", "password", "vpnName#2", requestFactory); + + client.createSubscriptionResponse("queue/xxx/yyy", "topic/aaa"); + } }