Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add e2e tests for gateway with Pulsar #602

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions langstream-api-gateway/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@
<artifactId>kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>pulsar</artifactId>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please note that this dependency imports some version of the Pulsar client, is it polluting the classpath ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be OK. Testcontainers uses the same shaded jar that the runtime uses.

<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,8 @@ protected void sendEvent(EventRecord.Types type, AuthenticatedGatewayRequestCont
.getTopicConnectionsRuntime(streamingCluster)
.asTopicConnectionsRuntime();

topicConnectionsRuntime.init(streamingCluster);

try (final TopicProducer producer =
topicConnectionsRuntime.createProducer(
"langstream-events",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.apigateway.websocket.handlers;

import ai.langstream.api.model.StreamingCluster;
import ai.langstream.api.storage.ApplicationStore;
import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
import ai.langstream.kafka.extensions.KafkaContainerExtension;
import java.util.Map;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

public class KafkaProduceConsumeHandlerTest extends ProduceConsumeHandlerTest {

@RegisterExtension
static KafkaContainerExtension kafkaContainer = new KafkaContainerExtension();

@Override
protected StreamingCluster getStreamingCluster() {
return new StreamingCluster(
"kafka",
Map.of(
"admin",
Map.of(
"bootstrap.servers",
kafkaContainer.getBootstrapServers(),
"default.api.timeout.ms",
5000)));
}

@TestConfiguration
public static class WebSocketTestConfig {

@Bean
@Primary
public ApplicationStore store() {
String instanceYaml =
"""
instance:
streamingCluster:
type: "kafka"
configuration:
admin:
bootstrap.servers: "%s"
computeCluster:
type: "none"
"""
.formatted(kafkaContainer.getBootstrapServers());
return getMockedStore(instanceYaml);
}

@Bean
@Primary
public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() {
return getGatewayTestAuthenticationProperties();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@
import ai.langstream.api.runtime.PluginsRegistry;
import ai.langstream.api.storage.ApplicationStore;
import ai.langstream.apigateway.config.GatewayTestAuthenticationProperties;
import ai.langstream.apigateway.runner.TopicConnectionsRuntimeProviderBean;
import ai.langstream.apigateway.websocket.api.ConsumePushMessage;
import ai.langstream.apigateway.websocket.api.ProduceRequest;
import ai.langstream.apigateway.websocket.api.ProduceResponse;
import ai.langstream.impl.deploy.ApplicationDeployer;
import ai.langstream.impl.nar.NarFileHandler;
import ai.langstream.impl.parser.ModelBuilder;
import ai.langstream.kafka.extensions.KafkaContainerExtension;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
Expand Down Expand Up @@ -74,16 +73,12 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.web.server.LocalServerPort;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;

@SpringBootTest(
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
Expand All @@ -92,7 +87,7 @@
})
@WireMockTest
@Slf4j
class ProduceConsumeHandlerTest {
abstract class ProduceConsumeHandlerTest {

public static final Path agentsDirectory;

Expand All @@ -103,55 +98,49 @@ class ProduceConsumeHandlerTest {

protected static final ObjectMapper MAPPER = new ObjectMapper();

@RegisterExtension
static KafkaContainerExtension kafkaContainer = new KafkaContainerExtension();

static List<String> topics;
static Gateways testGateways;

@TestConfiguration
public static class WebSocketTestConfig {

@Bean
@Primary
public ApplicationStore store() {
final ApplicationStore mock = Mockito.mock(ApplicationStore.class);
doAnswer(
invocationOnMock -> {
final StoredApplication storedApplication = new StoredApplication();
final Application application = buildApp();
storedApplication.setInstance(application);
return storedApplication;
})
.when(mock)
.get(anyString(), anyString(), anyBoolean());
doAnswer(invocationOnMock -> ApplicationSpecs.builder().application(buildApp()).build())
.when(mock)
.getSpecs(anyString(), anyString());

return mock;
}
protected static ApplicationStore getMockedStore(String instanceYaml) {
ApplicationStore mock = Mockito.mock(ApplicationStore.class);
doAnswer(
invocationOnMock -> {
final StoredApplication storedApplication = new StoredApplication();
final Application application = buildApp(instanceYaml);
storedApplication.setInstance(application);
return storedApplication;
})
.when(mock)
.get(anyString(), anyString(), anyBoolean());
doAnswer(
invocationOnMock ->
ApplicationSpecs.builder()
.application(buildApp(instanceYaml))
.build())
.when(mock)
.getSpecs(anyString(), anyString());

return mock;
}

@Bean
@Primary
public GatewayTestAuthenticationProperties gatewayTestAuthenticationProperties() {
final GatewayTestAuthenticationProperties props =
new GatewayTestAuthenticationProperties();
props.setType("http");
props.setConfiguration(
Map.of(
"base-url",
wireMockBaseUrl,
"path-template",
"/auth/{tenant}",
"headers",
Map.of("h1", "v1")));
return props;
}
protected static GatewayTestAuthenticationProperties getGatewayTestAuthenticationProperties() {
final GatewayTestAuthenticationProperties props = new GatewayTestAuthenticationProperties();
props.setType("http");
props.setConfiguration(
Map.of(
"base-url",
wireMockBaseUrl,
"path-template",
"/auth/{tenant}",
"headers",
Map.of("h1", "v1")));
return props;
}

@Autowired private TopicConnectionsRuntimeProviderBean topicConnectionsRuntimeProvider;

@NotNull
private static Application buildApp() throws Exception {
private static Application buildApp(String instanceYaml) throws Exception {
final Map<String, Object> module =
Map.of(
"module",
Expand All @@ -175,17 +164,7 @@ private static Application buildApp() throws Exception {
"module.yaml",
new ObjectMapper(new YAMLFactory())
.writeValueAsString(module)),
"""
instance:
streamingCluster:
type: "kafka"
configuration:
admin:
bootstrap.servers: "%s"
computeCluster:
type: "none"
"""
.formatted(kafkaContainer.getBootstrapServers()),
instanceYaml,
null)
.getApplication();
application.setGateways(testGateways);
Expand Down Expand Up @@ -276,38 +255,25 @@ public void onError(Throwable throwable) {
}
}

protected abstract StreamingCluster getStreamingCluster();

private void prepareTopicsForTest(String... topic) throws Exception {
topics = List.of(topic);
try (NarFileHandler narFileHandler =
new NarFileHandler(
agentsDirectory, List.of(), NarFileHandler.class.getClassLoader()); ) {
narFileHandler.scan();
TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry =
new TopicConnectionsRuntimeRegistry();
topicConnectionsRuntimeRegistry.setPackageLoader(narFileHandler);
final ApplicationDeployer deployer =
ApplicationDeployer.builder()
.pluginsRegistry(new PluginsRegistry())
.registry(new ClusterRuntimeRegistry())
.topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry)
.build();
final StreamingCluster streamingCluster =
new StreamingCluster(
"kafka",
Map.of(
"admin",
Map.of(
"bootstrap.servers",
kafkaContainer.getBootstrapServers(),
"default.api.timeout.ms",
5000)));
topicConnectionsRuntimeRegistry
.getTopicConnectionsRuntime(streamingCluster)
.asTopicConnectionsRuntime()
.deploy(
deployer.createImplementation(
"app", store.get("t", "app", false).getInstance()));
}
TopicConnectionsRuntimeRegistry topicConnectionsRuntimeRegistry =
topicConnectionsRuntimeProvider.getTopicConnectionsRuntimeRegistry();
final ApplicationDeployer deployer =
ApplicationDeployer.builder()
.pluginsRegistry(new PluginsRegistry())
.registry(new ClusterRuntimeRegistry())
.topicConnectionsRuntimeRegistry(topicConnectionsRuntimeRegistry)
.build();
final StreamingCluster streamingCluster = getStreamingCluster();
topicConnectionsRuntimeRegistry
.getTopicConnectionsRuntime(streamingCluster)
.asTopicConnectionsRuntime()
.deploy(
deployer.createImplementation(
"app", store.get("t", "app", false).getInstance()));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ai.langstream.apigateway.websocket.handlers;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.utility.DockerImageName;

@Slf4j
public class PulsarContainerExtension implements BeforeAllCallback, AfterAllCallback {
private PulsarContainer pulsarContainer;

private Network network;

@Override
public void afterAll(ExtensionContext extensionContext) {
if (pulsarContainer != null) {
pulsarContainer.close();
}
if (network != null) {
network.close();
}
}

@Override
public void beforeAll(ExtensionContext extensionContext) {
network = Network.newNetwork();
pulsarContainer =
new PulsarContainer(DockerImageName.parse("apachepulsar/pulsar:3.1.0"))
.withNetwork(network)
.withLogConsumer(
outputFrame ->
log.debug(
"pulsar> {}", outputFrame.getUtf8String().trim()));
// start Pulsar and wait for it to be ready to accept requests
pulsarContainer.start();
}

public String getBrokerUrl() {
return pulsarContainer.getPulsarBrokerUrl();
}

public String getHttpServiceUrl() {
return pulsarContainer.getHttpServiceUrl();
}

public PulsarContainer getPulsarContainer() {
return pulsarContainer;
}
}
Loading
Loading