diff --git a/langstream-api-gateway/pom.xml b/langstream-api-gateway/pom.xml
index 68228f091..e811f652d 100644
--- a/langstream-api-gateway/pom.xml
+++ b/langstream-api-gateway/pom.xml
@@ -112,6 +112,11 @@
kafka
test
+
+ org.testcontainers
+ pulsar
+ test
+
com.github.tomakehurst
wiremock
diff --git a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java
index 33b3c17d0..003f6aadc 100644
--- a/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java
+++ b/langstream-api-gateway/src/main/java/ai/langstream/apigateway/websocket/handlers/AbstractHandler.java
@@ -352,6 +352,8 @@ protected void sendEvent(EventRecord.Types type, AuthenticatedGatewayRequestCont
.getTopicConnectionsRuntime(streamingCluster)
.asTopicConnectionsRuntime();
+ topicConnectionsRuntime.init(streamingCluster);
+
try (final TopicProducer producer =
topicConnectionsRuntime.createProducer(
"langstream-events",
diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/KafkaProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/KafkaProduceConsumeHandlerTest.java
new file mode 100644
index 000000000..607eeee0c
--- /dev/null
+++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/KafkaProduceConsumeHandlerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.apigateway.websocket.handlers;
+
+import ai.langstream.api.model.StreamingCluster;
+import ai.langstream.api.storage.ApplicationStore;
+import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
+import ai.langstream.kafka.extensions.KafkaContainerExtension;
+import java.util.Map;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+
+public class KafkaProduceConsumeHandlerTest extends ProduceConsumeHandlerTest {
+
+ @RegisterExtension
+ static KafkaContainerExtension kafkaContainer = new KafkaContainerExtension();
+
+ @Override
+ protected StreamingCluster getStreamingCluster() {
+ return new StreamingCluster(
+ "kafka",
+ Map.of(
+ "admin",
+ Map.of(
+ "bootstrap.servers",
+ kafkaContainer.getBootstrapServers(),
+ "default.api.timeout.ms",
+ 5000)));
+ }
+
+ @TestConfiguration
+ public static class WebSocketTestConfig {
+
+ @Bean
+ @Primary
+ public ApplicationStore store() {
+ String instanceYaml =
+ """
+ instance:
+ streamingCluster:
+ type: "kafka"
+ configuration:
+ admin:
+ bootstrap.servers: "%s"
+ computeCluster:
+ type: "none"
+ """
+ .formatted(kafkaContainer.getBootstrapServers());
+ return getMockedStore(instanceYaml);
+ }
+
+ @Bean
+ @Primary
+ public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() {
+ return getGatewayTestAuthenticationProperties();
+ }
+ }
+}
diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java
index f2e04c063..0f99b7fc8 100644
--- a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java
+++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/ProduceConsumeHandlerTest.java
@@ -38,13 +38,12 @@
import ai.langstream.api.runtime.PluginsRegistry;
import ai.langstream.api.storage.ApplicationStore;
import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
+import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean;
import ai.langstream.apigateway.websocket.api.ConsumePushMessage;
import ai.langstream.apigateway.websocket.api.ProduceRequest;
import ai.langstream.apigateway.websocket.api.ProduceResponse;
import ai.langstream.impl.deploy.ApplicationDeployer;
-import ai.langstream.impl.nar.NarFileHandler;
import ai.langstream.impl.parser.ModelBuilder;
-import ai.langstream.kafka.extensions.KafkaContainerExtension;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -74,16 +73,12 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.server.LocalServerPort;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Primary;
@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
@@ -92,7 +87,7 @@
})
@WireMockTest
@Slf4j
-class ProduceConsumeHandlerTest {
+abstract class ProduceConsumeHandlerTest {
public static final Path agentsDirectory;
@@ -103,55 +98,49 @@ class ProduceConsumeHandlerTest {
protected static final ObjectMapper MAPPER = new ObjectMapper();
- @RegisterExtension
- static KafkaContainerExtension kafkaContainer = new KafkaContainerExtension();
-
static List topics;
static Gateways testGateways;
- @TestConfiguration
- public static class WebSocketTestConfig {
-
- @Bean
- @Primary
- public ApplicationStore store() {
- final ApplicationStore mock = Mockito.mock(ApplicationStore.class);
- doAnswer(
- invocationOnMock -> {
- final StoredApplication storedApplication = new StoredApplication();
- final Application application = buildApp();
- storedApplication.setInstance(application);
- return storedApplication;
- })
- .when(mock)
- .get(anyString(), anyString(), anyBoolean());
- doAnswer(invocationOnMock -> ApplicationSpecs.builder().application(buildApp()).build())
- .when(mock)
- .getSpecs(anyString(), anyString());
-
- return mock;
- }
+ protected static ApplicationStore getMockedStore(String instanceYaml) {
+ ApplicationStore mock = Mockito.mock(ApplicationStore.class);
+ doAnswer(
+ invocationOnMock -> {
+ final StoredApplication storedApplication = new StoredApplication();
+ final Application application = buildApp(instanceYaml);
+ storedApplication.setInstance(application);
+ return storedApplication;
+ })
+ .when(mock)
+ .get(anyString(), anyString(), anyBoolean());
+ doAnswer(
+ invocationOnMock ->
+ ApplicationSpecs.builder()
+ .application(buildApp(instanceYaml))
+ .build())
+ .when(mock)
+ .getSpecs(anyString(), anyString());
+
+ return mock;
+ }
- @Bean
- @Primary
- public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() {
- final GatewayTestAuthenticationProperties props =
- new GatewayTestAuthenticationProperties();
- props.setType("http");
- props.setConfiguration(
- Map.of(
- "base-url",
- wireMockBaseUrl,
- "path-template",
- "/auth/{tenant}",
- "headers",
- Map.of("h1", "v1")));
- return props;
- }
+ protected static GatewayTestAuthenticationProperties getGatewayTestAuthenticationProperties() {
+ final GatewayTestAuthenticationProperties props = new GatewayTestAuthenticationProperties();
+ props.setType("http");
+ props.setConfiguration(
+ Map.of(
+ "base-url",
+ wireMockBaseUrl,
+ "path-template",
+ "/auth/{tenant}",
+ "headers",
+ Map.of("h1", "v1")));
+ return props;
}
+ @Autowired private TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeProvider;
+
@NotNull
- private static Application buildApp() throws Exception {
+ private static Application buildApp(String instanceYaml) throws Exception {
final Map module =
Map.of(
"module",
@@ -175,17 +164,7 @@ private static Application buildApp() throws Exception {
"module.yaml",
new ObjectMapper(new YAMLFactory())
.writeValueAsString(module)),
- """
- instance:
- streamingCluster:
- type: "kafka"
- configuration:
- admin:
- bootstrap.servers: "%s"
- computeCluster:
- type: "none"
- """
- .formatted(kafkaContainer.getBootstrapServers()),
+ instanceYaml,
null)
.getApplication();
application.setGateways(testGateways);
@@ -276,38 +255,25 @@ public void onError(Throwable throwable) {
}
}
+ protected abstract StreamingCluster getStreamingCluster();
+
private void prepareTopicsForTest(String... topic) throws Exception {
topics = List.of(topic);
- try (NarFileHandler narFileHandler =
- new NarFileHandler(
- agentsDirectory, List.of(), NarFileHandler.class.getClassLoader()); ) {
- narFileHandler.scan();
- TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry =
- new TopicConnectionsRuntimeRegistry();
- topicConnectionsRuntimeRegistry.setPackageLoader(narFileHandler);
- final ApplicationDeployer deployer =
- ApplicationDeployer.builder()
- .pluginsRegistry(new PluginsRegistry())
- .registry(new ClusterRuntimeRegistry())
- .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry)
- .build();
- final StreamingCluster streamingCluster =
- new StreamingCluster(
- "kafka",
- Map.of(
- "admin",
- Map.of(
- "bootstrap.servers",
- kafkaContainer.getBootstrapServers(),
- "default.api.timeout.ms",
- 5000)));
- topicConnectionsRuntimeRegistry
- .getTopicConnectionsRuntime(streamingCluster)
- .asTopicConnectionsRuntime()
- .deploy(
- deployer.createImplementation(
- "app", store.get("t", "app", false).getInstance()));
- }
+ TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry =
+ topicConnectionsRuntimeProvider.getTopicConnectionsRuntimeRegistry();
+ final ApplicationDeployer deployer =
+ ApplicationDeployer.builder()
+ .pluginsRegistry(new PluginsRegistry())
+ .registry(new ClusterRuntimeRegistry())
+ .topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry)
+ .build();
+ final StreamingCluster streamingCluster = getStreamingCluster();
+ topicConnectionsRuntimeRegistry
+ .getTopicConnectionsRuntime(streamingCluster)
+ .asTopicConnectionsRuntime()
+ .deploy(
+ deployer.createImplementation(
+ "app", store.get("t", "app", false).getInstance()));
}
@ParameterizedTest
diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java
new file mode 100644
index 000000000..b91d94633
--- /dev/null
+++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarContainerExtension.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.apigateway.websocket.handlers;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.PulsarContainer;
+import org.testcontainers.utility.DockerImageName;
+
+@Slf4j
+public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback {
+ private PulsarContainer pulsarContainer;
+
+ private Network network;
+
+ @Override
+ public void afterAll(ExtensionContext extensionContext) {
+ if (pulsarContainer != null) {
+ pulsarContainer.close();
+ }
+ if (network != null) {
+ network.close();
+ }
+ }
+
+ @Override
+ public void beforeAll(ExtensionContext extensionContext) {
+ network = Network.newNetwork();
+ pulsarContainer =
+ new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0"))
+ .withNetwork(network)
+ .withLogConsumer(
+ outputFrame ->
+ log.debug(
+ "pulsar> {}", outputFrame.getUtf8String().trim()));
+ // start Pulsar and wait for it to be ready to accept requests
+ pulsarContainer.start();
+ }
+
+ public String getBrokerUrl() {
+ return pulsarContainer.getPulsarBrokerUrl();
+ }
+
+ public String getHttpServiceUrl() {
+ return pulsarContainer.getHttpServiceUrl();
+ }
+
+ public PulsarContainer getPulsarContainer() {
+ return pulsarContainer;
+ }
+}
diff --git a/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarProduceConsumeHandlerTest.java b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarProduceConsumeHandlerTest.java
new file mode 100644
index 000000000..ffee695ee
--- /dev/null
+++ b/langstream-api-gateway/src/test/java/ai/langstream/apigateway/websocket/handlers/PulsarProduceConsumeHandlerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright DataStax, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package ai.langstream.apigateway.websocket.handlers;
+
+import ai.langstream.api.model.StreamingCluster;
+import ai.langstream.api.storage.ApplicationStore;
+import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
+import java.util.Map;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Primary;
+
+public class PulsarProduceConsumeHandlerTest extends ProduceConsumeHandlerTest {
+
+ @RegisterExtension
+ static PulsarContainerExtension pulsarContainer = new PulsarContainerExtension();
+
+ @Override
+ protected StreamingCluster getStreamingCluster() {
+ return new StreamingCluster(
+ "pulsar",
+ Map.of(
+ "admin",
+ Map.of("serviceUrl", pulsarContainer.getHttpServiceUrl()),
+ "service",
+ Map.of("serviceUrl", pulsarContainer.getBrokerUrl()),
+ "defaultTenant",
+ "public",
+ "defaultNamespace",
+ "default"));
+ }
+
+ @TestConfiguration
+ public static class WebSocketTestConfig {
+
+ @Bean
+ @Primary
+ public ApplicationStore store() {
+ String instanceYaml =
+ """
+ instance:
+ streamingCluster:
+ type: "pulsar"
+ configuration:
+ admin:
+ serviceUrl: "%s"
+ service:
+ serviceUrl: "%s"
+ defaultTenant: "public"
+ defaultNamespace: "default"
+ computeCluster:
+ type: "none"
+ """
+ .formatted(
+ pulsarContainer.getHttpServiceUrl(),
+ pulsarContainer.getBrokerUrl());
+ return getMockedStore(instanceYaml);
+ }
+
+ @Bean
+ @Primary
+ public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() {
+ return getGatewayTestAuthenticationProperties();
+ }
+ }
+}