From 601d4b2d28ddab54c64a8b8e7dd782df66ee5b86 Mon Sep 17 00:00:00 2001 From: Bartosz Zablocki Date: Mon, 24 Jun 2024 20:04:46 +0200 Subject: [PATCH] Adding implementations of SempClient and SempClientFactory --- sdks/java/io/solace/build.gradle | 4 + .../io/solace/broker/BasicAuthSempClient.java | 102 +++++++++ .../broker/BasicAuthSempClientFactory.java | 84 ++++++++ .../sdk/io/solace/broker/BrokerResponse.java | 62 ++++++ .../broker/SempBasicAuthClientExecutor.java | 171 +++++++++++++++ .../apache/beam/sdk/io/solace/data/Semp.java | 74 +++++++ .../io/solace/utils/SerializableSupplier.java | 25 +++ .../sdk/io/solace/utils/package-info.java | 20 ++ .../SempBasicAuthClientExecutorTest.java | 202 ++++++++++++++++++ 9 files changed, 744 insertions(+) create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java create mode 100644 sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java create mode 100644 sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java diff --git a/sdks/java/io/solace/build.gradle b/sdks/java/io/solace/build.gradle index 8ee12f22ed03c..e612e6969dc55 100644 --- a/sdks/java/io/solace/build.gradle +++ b/sdks/java/io/solace/build.gradle @@ -44,6 +44,10 @@ dependencies { implementation library.java.google_api_common implementation library.java.gax implementation library.java.threetenbp + implementation library.java.google_http_client + implementation library.java.google_http_client_gson + implementation library.java.jackson_core + implementation library.java.jackson_databind testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration") diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java new file mode 100644 index 0000000000000..1e1b1c91cd12f --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClient.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.api.client.http.HttpRequestFactory; +import com.solacesystems.jcsmp.JCSMPFactory; +import java.io.IOException; +import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.io.solace.data.Semp.Queue; +import org.apache.beam.sdk.io.solace.utils.SerializableSupplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class that manages REST calls to the Solace Element Management Protocol (SEMP) using basic + * authentication. + * + *

This class provides methods to check necessary information, such as if the queue is + * non-exclusive, remaining backlog bytes of a queue. It can also create and execute calls to create + * queue for a topic. + */ +@Internal +public class BasicAuthSempClient implements SempClient { + private static final Logger LOG = LoggerFactory.getLogger(BasicAuthSempClient.class); + private final ObjectMapper objectMapper = + new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + + private final SempBasicAuthClientExecutor sempBasicAuthClientExecutor; + + public BasicAuthSempClient( + String host, + String username, + String password, + String vpnName, + SerializableSupplier httpRequestFactorySupplier) { + sempBasicAuthClientExecutor = + new SempBasicAuthClientExecutor( + host, username, password, vpnName, httpRequestFactorySupplier.get()); + } + + @Override + public boolean isQueueNonExclusive(String queueName) throws IOException { + LOG.info("SolaceIO.Read: SempOperations: query SEMP if queue {} is nonExclusive", queueName); + BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName); + if (response.content == null) { + throw new IOException("SolaceIO: response from SEMP is empty!"); + } + Queue q = mapJsonToClass(response.content, Queue.class); + return q.data().accessType().equals("non-exclusive"); + } + + @Override + public com.solacesystems.jcsmp.Queue createQueueForTopic(String queueName, String topicName) + throws IOException { + createQueue(queueName); + createSubscription(queueName, topicName); + return JCSMPFactory.onlyInstance().createQueue(queueName); + } + + @Override + public long getBacklogBytes(String queueName) throws IOException { + BrokerResponse response = sempBasicAuthClientExecutor.getQueueResponse(queueName); + if (response.content == null) { + throw new IOException("SolaceIO: response from SEMP is empty!"); + } + Queue q = mapJsonToClass(response.content, Queue.class); + return q.data().msgSpoolUsage(); + } + + private void createQueue(String queueName) throws IOException { + LOG.info("SolaceIO.Read: Creating new queue {}.", queueName); + sempBasicAuthClientExecutor.createQueueResponse(queueName); + } + + private void createSubscription(String queueName, String topicName) throws IOException { + LOG.info("SolaceIO.Read: Creating new subscription {} for topic {}.", queueName, topicName); + sempBasicAuthClientExecutor.createSubscriptionResponse(queueName, topicName); + } + + private T mapJsonToClass(String content, Class mapSuccessToClass) + throws JsonProcessingException { + return objectMapper.readValue(content, mapSuccessToClass); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java new file mode 100644 index 0000000000000..168c5164ba471 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthSempClientFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.javanet.NetHttpTransport; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.io.solace.utils.SerializableSupplier; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A factory for creating {@link BasicAuthSempClient} instances. + * + *

This factory provides a way to create {@link BasicAuthSempClient} instances with different + * configurations. + */ +@AutoValue +public abstract class BasicAuthSempClientFactory implements SempClientFactory { + + public abstract String host(); + + public abstract String username(); + + public abstract String password(); + + public abstract String vpnName(); + + public abstract @Nullable SerializableSupplier httpRequestFactorySupplier(); + + public static Builder builder() { + return new AutoValue_BasicAuthSempClientFactory.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + /** Set Solace SEMP host, format: [Protocol://]Host[:Port]. e.g. "http://127.0.0.1:8080" */ + public abstract Builder host(String host); + + /** Set Solace username. */ + public abstract Builder username(String username); + /** Set Solace password. */ + public abstract Builder password(String password); + + /** Set Solace vpn name. */ + public abstract Builder vpnName(String vpnName); + + @VisibleForTesting + abstract Builder httpRequestFactorySupplier( + SerializableSupplier httpRequestFactorySupplier); + + public abstract BasicAuthSempClientFactory build(); + } + + @Override + public SempClient create() { + return new BasicAuthSempClient( + host(), username(), password(), vpnName(), getHttpRequestFactorySupplier()); + } + + @SuppressWarnings("return") + private @NonNull SerializableSupplier getHttpRequestFactorySupplier() { + SerializableSupplier httpRequestSupplier = httpRequestFactorySupplier(); + return httpRequestSupplier != null + ? httpRequestSupplier + : () -> new NetHttpTransport().createRequestFactory(); + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java new file mode 100644 index 0000000000000..1a47f8012285a --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BrokerResponse.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.google.api.client.http.HttpResponse; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class BrokerResponse { + final int code; + final String message; + @Nullable String content; + + public BrokerResponse(int responseCode, String message, @Nullable InputStream content) { + this.code = responseCode; + this.message = message; + if (content != null) { + this.content = + new BufferedReader(new InputStreamReader(content, StandardCharsets.UTF_8)) + .lines() + .collect(Collectors.joining("\n")); + } + } + + public static BrokerResponse fromHttpResponse(HttpResponse response) throws IOException { + return new BrokerResponse( + response.getStatusCode(), response.getStatusMessage(), response.getContent()); + } + + @Override + public String toString() { + return "BrokerResponse{" + + "code=" + + code + + ", message='" + + message + + '\'' + + ", content=" + + content + + '}'; + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java new file mode 100644 index 0000000000000..23f4d64015267 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutor.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import com.google.api.client.http.GenericUrl; +import com.google.api.client.http.HttpContent; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpRequest; +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpResponse; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.json.JsonHttpContent; +import com.google.api.client.json.gson.GsonFactory; +import java.io.IOException; +import java.io.Serializable; +import java.net.CookieManager; +import java.net.HttpCookie; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * A class to execute requests to SEMP v2 with Basic Auth authentication. + * + *

This approach takes advantage of SEMP Sessions. The + * session is established when a user authenticates with HTTP Basic authentication. When the + * response is 401 Unauthorized, the client will execute an additional request with Basic Auth + * header to refresh the token. + */ +class SempBasicAuthClientExecutor implements Serializable { + private static final CookieManager COOKIE_MANAGER = new CookieManager(); + private static final String COOKIES_HEADER = "Set-Cookie"; + + private final String username; + private final String messageVpn; + private final String baseUrl; + private final String password; + private final transient HttpRequestFactory requestFactory; + + SempBasicAuthClientExecutor( + String host, + String username, + String password, + String vpnName, + HttpRequestFactory httpRequestFactory) { + this.baseUrl = String.format("%s/SEMP/v2", host); + this.username = username; + this.messageVpn = vpnName; + this.password = password; + this.requestFactory = httpRequestFactory; + } + + private static String getQueueEndpoint(String messageVpn, String queueName) { + return String.format("/monitor/msgVpns/%s/queues/%s", messageVpn, queueName); + } + + private static String createQueueEndpoint(String messageVpn) { + return String.format("/config/msgVpns/%s/queues", messageVpn); + } + + private static String subscriptionEndpoint(String messageVpn, String queueName) { + return String.format("/config/msgVpns/%s/queues/%s/subscriptions", messageVpn, queueName); + } + + BrokerResponse getQueueResponse(String queueName) throws IOException { + String queryUrl = getQueueEndpoint(messageVpn, queueName); + HttpResponse response = executeGet(new GenericUrl(baseUrl + queryUrl)); + return BrokerResponse.fromHttpResponse(response); + } + + BrokerResponse createQueueResponse(String queueName) throws IOException { + String queryUrl = createQueueEndpoint(messageVpn); + ImmutableMap params = + ImmutableMap.builder() + .put("accessType", "non-exclusive") + .put("queueName", queueName) + .put("owner", username) + .put("permission", "consume") + .put("ingressEnabled", true) + .put("egressEnabled", true) + .build(); + + HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), params); + return BrokerResponse.fromHttpResponse(response); + } + + BrokerResponse createSubscriptionResponse(String queueName, String topicName) throws IOException { + String queryUrl = subscriptionEndpoint(messageVpn, queueName); + + ImmutableMap params = + ImmutableMap.builder() + .put("subscriptionTopic", topicName) + .put("queueName", queueName) + .build(); + HttpResponse response = executePost(new GenericUrl(baseUrl + queryUrl), params); + return BrokerResponse.fromHttpResponse(response); + } + + private HttpResponse executeGet(GenericUrl url) throws IOException { + HttpRequest request = requestFactory.buildGetRequest(url); + return execute(request); + } + + private HttpResponse executePost(GenericUrl url, ImmutableMap parameters) + throws IOException { + HttpContent content = new JsonHttpContent(GsonFactory.getDefaultInstance(), parameters); + HttpRequest request = requestFactory.buildPostRequest(url, content); + return execute(request); + } + + private HttpResponse execute(HttpRequest request) throws IOException { + request.setNumberOfRetries(2); + HttpHeaders httpHeaders = new HttpHeaders(); + boolean authFromCookie = COOKIE_MANAGER.getCookieStore().getCookies().size() > 0; + if (authFromCookie) { + setCookiesFromCookieManager(httpHeaders); + request.setHeaders(httpHeaders); + } else { + httpHeaders.setBasicAuthentication(username, password); + request.setHeaders(httpHeaders); + } + + HttpResponse response; + try { + response = request.execute(); + } catch (HttpResponseException e) { + if (authFromCookie && e.getStatusCode() == 401) { + COOKIE_MANAGER.getCookieStore().removeAll(); + // execute again without cookies to refresh the token. + return execute(request); + } else { + throw e; + } + } + + storeCookiesInCookieManager(response.getHeaders()); + return response; + } + + private void setCookiesFromCookieManager(HttpHeaders httpHeaders) { + httpHeaders.setCookie( + COOKIE_MANAGER.getCookieStore().getCookies().stream() + .map(s -> s.getName() + "=" + s.getValue()) + .collect(Collectors.joining(";"))); + } + + private void storeCookiesInCookieManager(HttpHeaders headers) { + List cookiesHeader = headers.getHeaderStringValues(COOKIES_HEADER); + if (cookiesHeader != null) { + for (String cookie : cookiesHeader) { + COOKIE_MANAGER.getCookieStore().add(null, HttpCookie.parse(cookie).get(0)); + } + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java new file mode 100644 index 0000000000000..f6f0fb51d22eb --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/data/Semp.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.data; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import com.google.auto.value.AutoValue; + +public class Semp { + + @AutoValue + @JsonSerialize(as = Queue.class) + @JsonDeserialize(builder = AutoValue_Semp_Queue.Builder.class) + public abstract static class Queue { + + public abstract QueueData data(); + + public static Builder builder() { + return new AutoValue_Semp_Queue.Builder(); + } + + public abstract Builder toBuilder(); + + @AutoValue.Builder + @JsonPOJOBuilder(withPrefix = "set") + abstract static class Builder { + + public abstract Builder setData(QueueData queueData); + + public abstract Queue build(); + } + } + + @AutoValue + @JsonDeserialize(builder = AutoValue_Semp_QueueData.Builder.class) + public abstract static class QueueData { + public abstract String accessType(); + + public abstract long msgSpoolUsage(); + + public static Builder builder() { + return new AutoValue_Semp_QueueData.Builder(); + } + + public abstract Builder toBuilder(); + + @AutoValue.Builder + @JsonPOJOBuilder(withPrefix = "set") + abstract static class Builder { + + public abstract Builder setAccessType(String accessType); + + public abstract Builder setMsgSpoolUsage(long msgSpoolUsage); + + public abstract QueueData build(); + } + } +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java new file mode 100644 index 0000000000000..c2e4fdfa69fbb --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/SerializableSupplier.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.utils; + +import java.io.Serializable; + +@FunctionalInterface +public interface SerializableSupplier extends Serializable { + OutputT get(); +} diff --git a/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java new file mode 100644 index 0000000000000..abff210a62586 --- /dev/null +++ b/sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/utils/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Solace IO connector - utility classes. */ +package org.apache.beam.sdk.io.solace.utils; diff --git a/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java new file mode 100644 index 0000000000000..8cc48ed17ef6f --- /dev/null +++ b/sdks/java/io/solace/src/test/java/org/apache/beam/sdk/io/solace/broker/SempBasicAuthClientExecutorTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.solace.broker; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.api.client.http.HttpRequestFactory; +import com.google.api.client.http.HttpResponseException; +import com.google.api.client.http.LowLevelHttpRequest; +import com.google.api.client.http.LowLevelHttpResponse; +import com.google.api.client.json.Json; +import com.google.api.client.testing.http.MockHttpTransport; +import com.google.api.client.testing.http.MockLowLevelHttpRequest; +import com.google.api.client.testing.http.MockLowLevelHttpResponse; +import java.io.IOException; +import java.util.List; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.junit.Test; + +public class SempBasicAuthClientExecutorTest { + + @Test + public void testExecuteStatus4xx() { + MockHttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setStatusCode(404); + response.setContentType(Json.MEDIA_TYPE); + response.setContent( + "{\"meta\":{\"error\":{\"code\":404,\"description\":\"some" + + " error\",\"status\":\"xx\"}}}"); + return response; + } + }; + } + }; + + HttpRequestFactory requestFactory = transport.createRequestFactory(); + SempBasicAuthClientExecutor client = + new SempBasicAuthClientExecutor( + "http://host", "username", "password", "vpnName", requestFactory); + + assertThrows(HttpResponseException.class, () -> client.getQueueResponse("queue")); + } + + @Test + public void testExecuteStatus3xx() { + MockHttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + response.setStatusCode(301); + response.setContentType(Json.MEDIA_TYPE); + response.setContent( + "{\"meta\":{\"error\":{\"code\":301,\"description\":\"some" + + " error\",\"status\":\"xx\"}}}"); + return response; + } + }; + } + }; + + HttpRequestFactory requestFactory = transport.createRequestFactory(); + SempBasicAuthClientExecutor client = + new SempBasicAuthClientExecutor( + "http://host", "username", "password", "vpnName", requestFactory); + + assertThrows(HttpResponseException.class, () -> client.getQueueResponse("queue")); + } + + /** + * In this test case, we test a situation when a session that we used to authenticate to Semp + * expires. + * + *

To test this scenario, we need to do the following: + * + *

    + *
  1. Send the first request, to initialize a session. This request has to contain the Basic + * Auth header and should not include any cookie headers. The response for this request + * contains a session cookie we can re-use in the following requests. + *
  2. Send the second request - this request should use a cookie from the previous response. + * There should be no Authorization header. To simulate an expired session scenario, we set + * the response of this request to the "401 Unauthorized". This should cause a the request + * to be retried, this time with the Authorization header. + *
  3. Validate the third request to contain the Basic Auth header and no session cookies. + *
+ */ + @Test + public void testExecuteWithUnauthorized() throws IOException { + // Making it a final array, so that we can reference it from within the MockHttpTransport + // instance + final int[] requestCounter = {0}; + MockHttpTransport transport = + new MockHttpTransport() { + @Override + public LowLevelHttpRequest buildRequest(String method, String url) { + return new MockLowLevelHttpRequest() { + @Override + public LowLevelHttpResponse execute() throws IOException { + MockLowLevelHttpResponse response = new MockLowLevelHttpResponse(); + if (requestCounter[0] == 0) { + // The first request has to include Basic Auth header + assertTrue(this.getHeaders().containsKey("authorization")); + List authorizationHeaders = this.getHeaders().get("authorization"); + assertEquals(1, authorizationHeaders.size()); + assertTrue(authorizationHeaders.get(0).contains("Basic")); + assertFalse(this.getHeaders().containsKey("cookie")); + + // Set the response to include Session cookies + response + .setHeaderNames(ImmutableList.of("Set-Cookie", "Set-Cookie")) + .setHeaderValues( + ImmutableList.of( + "ProxySession=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w;" + + " HttpOnly; SameSite=Strict;" + + " Path=/proxy; Max-Age=2592000", + "Session=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w;" + + " HttpOnly; SameSite=Strict;" + + " Path=/SEMP; Max-Age=2592000")); + response.setStatusCode(200); + } else if (requestCounter[0] == 1) { + // The second request does not include Basic Auth header + assertFalse(this.getHeaders().containsKey("authorization")); + // It must include a cookie header + assertTrue(this.getHeaders().containsKey("cookie")); + boolean hasSessionCookie = + this.getHeaders().get("cookie").stream() + .filter( + c -> + c.contains( + "Session=JddSdJaGo6FYYmQk6nt8jXxFtq6n3FCFR14ebzRGQ5w")) + .count() + == 1; + assertTrue(hasSessionCookie); + + // Let's assume the Session expired - we return the 401 + // unauthorized + response.setStatusCode(401); + } else { + // The second request has to be retried with a Basic Auth header + // this time + assertTrue(this.getHeaders().containsKey("authorization")); + List authorizationHeaders = this.getHeaders().get("authorization"); + assertEquals(1, authorizationHeaders.size()); + assertTrue(authorizationHeaders.get(0).contains("Basic")); + assertFalse(this.getHeaders().containsKey("cookie")); + + response.setStatusCode(200); + } + response.setContentType(Json.MEDIA_TYPE); + requestCounter[0]++; + return response; + } + }; + } + }; + + HttpRequestFactory requestFactory = transport.createRequestFactory(); + SempBasicAuthClientExecutor client = + new SempBasicAuthClientExecutor( + "http://host", "username", "password", "vpnName", requestFactory); + + // The first, initial request + client.getQueueResponse("queue"); + // The second request, which will try to authenticate with a cookie, and then with Basic + // Auth when it receives a 401 unauthorized + client.getQueueResponse("queue"); + + // There should be 3 requests executed: + // the first one is the initial one with Basic Auth, + // the second one uses the session cookie, but we simulate it being expired, + // so there should be a third request with Basic Auth to create a new session. + assertEquals(3, requestCounter[0]); + } +}