Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more tests and add back validation for some agents #747

Merged
merged 10 commits into from
Dec 14, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ public void start() throws Exception {
throw new IllegalStateException("Channel not initialized");
}
blockingStub =
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
AgentServiceGrpc.newBlockingStub(channel)
.withMaxInboundMessageSize(Integer.MAX_VALUE)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withDeadlineAfter(30, TimeUnit.SECONDS);
asyncStub =
AgentServiceGrpc.newStub(channel)
.withWaitForReady()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void start() throws Exception {
public synchronized void close() throws Exception {
super.close();
if (server != null) {
server.close(false);
server.close(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void start() throws Exception {
public synchronized void close() throws Exception {
super.close();
if (server != null) {
server.close(false);
server.close(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void start() throws Exception {
public synchronized void close() throws Exception {
super.close();
if (server != null) {
server.close(false);
server.close(true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ public ManagedChannel start() throws Exception {
ManagedChannel channel =
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build();
AgentServiceGrpc.AgentServiceBlockingStub stub =
AgentServiceGrpc.newBlockingStub(channel).withDeadlineAfter(30, TimeUnit.SECONDS);
AgentServiceGrpc.newBlockingStub(channel)
.withMaxInboundMessageSize(Integer.MAX_VALUE)
.withMaxOutboundMessageSize(Integer.MAX_VALUE)
.withDeadlineAfter(30, TimeUnit.SECONDS);
for (int i = 0; ; i++) {
try {
stub.agentInfo(Empty.getDefaultInstance());
Expand All @@ -101,7 +104,12 @@ public ManagedChannel start() throws Exception {
throw e;
}
log.info("Waiting for python agent to start");
Thread.sleep(1000);
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
log.info("Sleep interrupted");
break;
}
}
}
return channel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public JstlEvaluator(String expression, Class<? extends T> type) {

@SneakyThrows
private void registerFunctions() {
this.expressionContext
.getFunctionMapper()
.mapFunction("fn", "length", JstlFunctions.class.getMethod("length", Object.class));
this.expressionContext
.getFunctionMapper()
.mapFunction("fn", "toJson", JstlFunctions.class.getMethod("toJson", Object.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ public static Map<String, Object> emptyMap() {
return Map.of();
}

public static long length(Object o) {
return o == null ? 0 : toString(o).length();
}

public static Map<String, Object> mapOf(Object... field) {
Map<String, Object> result = new HashMap<>();
for (int i = 0; i < field.length; i += 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;

/** Computes a field dynamically based on JSTL expressions and adds it to the key or the value . */
@Builder
@Slf4j
public class ComputeStep implements TransformStep {
public static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1);
@Builder.Default private final List<ComputeField> fields = new ArrayList<>();
Expand Down Expand Up @@ -85,6 +87,9 @@ public void process(MutableRecord mutableRecord) {
.filter(f -> "header.properties".equals(f.getScope()))
.collect(Collectors.toList()),
mutableRecord);
} catch (RuntimeException error) {
log.error("Error while computing fields on record {}", mutableRecord, error);
throw error;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public ComputeField build() {
this.evaluator =
new JstlEvaluator<>(String.format("${%s}", this.expression), getJavaType());
} catch (ELException ex) {
throw new IllegalArgumentException("invalid expression: " + "expression", ex);
throw new IllegalArgumentException("invalid expression: " + expression, ex);
}
return new ComputeField(name, evaluator, type, scope, optional);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,18 @@ void testPrimitiveValue() {
assertEquals("test-message", value);
}

@Test
void testLength() {
MutableRecord primitiveStringContext =
Utils.createContextWithPrimitiveRecord(Schema.STRING, "test-message", "");

String value =
new JstlEvaluator<>("${fn:length(value)}", String.class)
.evaluate(primitiveStringContext);

assertEquals("12", value);
}

@Test
void testNowFunction() {
MutableRecord primitiveStringContext =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,16 @@ protected final ComponentType getComponentType(AgentConfiguration agentConfigura
return ComponentType.PROCESSOR;
}

@Override
protected Class getAgentConfigModelClass(String type) {
StepConfigurationInitializer stepConfigurationInitializer = STEP_TYPES.get(type);
log.info(
"Validating agent configuration model for type {} with {}",
type,
stepConfigurationInitializer.getAgentConfigurationModelClass());
return stepConfigurationInitializer.getAgentConfigurationModelClass();
}

public interface TopicConfigurationGenerator {
void generateTopicConfiguration(String topicName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static class Field {
The type field is not required for the message headers [destinationTopic, messageKey, properties.] and STRING will be used.
For the value and key, if it is not provided, then the type will be inferred from the result of the expression evaluation.
""",
required = true)
required = false)
private String type;

@ConfigProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,22 @@ private static void validateProperty(
if (propertyValue.getExtendedValidationType() != null) {
validateExtendedValidationType(propertyValue.getExtendedValidationType(), actualValue);
}

if (propertyValue.getItems() != null) {
if (actualValue instanceof Collection collection) {
for (Object o : collection) {
validateProperty(
entityRef, fullPropertyKey, o, propertyValue.getItems(), propertyKey);
}
} else {
validateProperty(
entityRef,
fullPropertyKey,
actualValue,
propertyValue.getItems(),
propertyKey);
}
}
}

@Data
Expand Down Expand Up @@ -531,7 +547,7 @@ private static void validateExtendedValidationType(
case EL_EXPRESSION -> {
if (actualValue instanceof String expression) {
log.info("Validating EL expression: {}", expression);
new JstlEvaluator(actualValue.toString(), Object.class);
new JstlEvaluator("${" + actualValue + "}", Object.class);
} else if (actualValue instanceof Collection collection) {
log.info("Validating EL expressions {}", collection);
for (Object o : collection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ public void testSource() {
.formatted(applicationId)
.split(" "));
log.info("Output: {}", output);
String bigPayload = "test".repeat(10000);
String value = "the length is " + bigPayload.length();
Assertions.assertTrue(
output.contains("{\"record\":{\"key\":null,\"value\":\"test\",\"headers\":{}}"));
output.contains(
"{\"record\":{\"key\":null,\"value\":\"" + value + "\",\"headers\":{}}"),
"Output doesn't contain the expected payload: " + output);

deleteAppAndAwaitCleanup(tenant, applicationId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1235,7 +1235,8 @@ protected static String deployLocalApplication(
final String command =
"bin/langstream apps %s %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml"
.formatted(isUpdate ? "update" : "deploy", applicationId);
executeCommandOnClient((beforeCmd + command).split(" "));
String logs = executeCommandOnClient((beforeCmd + command).split(" "));
log.info("Logs after deploy: {}", logs);
return podUids;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,19 @@ topics:
creation-mode: create-if-not-exists
schema:
type: string
resources:
size: 2
pipeline:
- name: "Source using Python"
resources:
size: 2
id: "test-python-source"
type: "python-source"
configuration:
className: example.TestSource
- name: "Compute length (because we cannot write a big message to Kafka)"
id: "compute-length"
type: "compute"
output: ls-test-output
configuration:
className: example.TestSource
fields:
- name: "value"
expression: "fn:concat('the length is ', fn:length(value))"
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def read(self):
if not self.sent:
logging.info("Sending the record")
self.sent = True
return [SimpleRecord("test")]
return [SimpleRecord("test" * 10000)]
return []

def commit(self, records):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import ai.langstream.api.model.Application;
Expand Down Expand Up @@ -841,4 +842,45 @@ public void testForceAiService() throws Exception {
assertNull(configuration.get("service"));
}
}

@Test
public void testValidateBadComputeStep() throws Exception {
Application applicationInstance =
ModelBuilder.buildApplicationInstance(
Map.of(
"module.yaml",
"""
module: "module-1"
id: "pipeline-1"
topics:
- name: "input-topic"
creation-mode: create-if-not-exists
pipeline:
- name: "compute"
id: "step1"
type: "compute"
input: "input-topic"
configuration:
fields:
- name: value
expression: "fn:concat('something', fn:len(value))"
"""),
buildInstanceYaml(),
null)
.getApplication();

try (ApplicationDeployer deployer =
ApplicationDeployer.builder()
.registry(new ClusterRuntimeRegistry())
.pluginsRegistry(new PluginsRegistry())
.build()) {
Exception e =
assertThrows(
Exception.class,
() -> {
deployer.createImplementation("app", applicationInstance);
});
assertEquals("Function [fn:len] not found", e.getMessage());
}
}
}
Loading