-
-
Notifications
You must be signed in to change notification settings - Fork 83
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
BE: Replace groovy filters with CEL (#98)
- Loading branch information
1 parent
c75c5cc
commit 11a57d1
Showing
10 changed files
with
253 additions
and
131 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
204 changes: 142 additions & 62 deletions
204
api/src/main/java/io/kafbat/ui/emitter/MessageFilters.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,98 +1,178 @@ | ||
package io.kafbat.ui.emitter; | ||
|
||
import groovy.json.JsonSlurper; | ||
import io.kafbat.ui.exception.ValidationException; | ||
import static java.util.Collections.emptyMap; | ||
|
||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.core.type.TypeReference; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.google.common.collect.ImmutableCollection; | ||
import com.google.common.collect.ImmutableSet; | ||
import dev.cel.common.CelAbstractSyntaxTree; | ||
import dev.cel.common.CelOptions; | ||
import dev.cel.common.CelValidationException; | ||
import dev.cel.common.CelValidationResult; | ||
import dev.cel.common.types.CelType; | ||
import dev.cel.common.types.CelTypeProvider; | ||
import dev.cel.common.types.MapType; | ||
import dev.cel.common.types.SimpleType; | ||
import dev.cel.common.types.StructType; | ||
import dev.cel.compiler.CelCompiler; | ||
import dev.cel.compiler.CelCompilerFactory; | ||
import dev.cel.parser.CelStandardMacro; | ||
import dev.cel.runtime.CelEvaluationException; | ||
import dev.cel.runtime.CelRuntime; | ||
import dev.cel.runtime.CelRuntimeFactory; | ||
import io.kafbat.ui.exception.CelException; | ||
import io.kafbat.ui.model.MessageFilterTypeDTO; | ||
import io.kafbat.ui.model.TopicMessageDTO; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
import java.util.Optional; | ||
import java.util.function.Predicate; | ||
import javax.annotation.Nullable; | ||
import javax.script.CompiledScript; | ||
import javax.script.ScriptEngineManager; | ||
import javax.script.ScriptException; | ||
import lombok.SneakyThrows; | ||
import lombok.experimental.UtilityClass; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.commons.lang3.StringUtils; | ||
import org.codehaus.groovy.jsr223.GroovyScriptEngineImpl; | ||
|
||
@Slf4j | ||
@UtilityClass | ||
public class MessageFilters { | ||
private static final String CEL_RECORD_VAR_NAME = "record"; | ||
private static final String CEL_RECORD_TYPE_NAME = TopicMessageDTO.class.getSimpleName(); | ||
|
||
private static GroovyScriptEngineImpl GROOVY_ENGINE; | ||
private static final CelCompiler CEL_COMPILER = createCompiler(); | ||
private static final CelRuntime CEL_RUNTIME = CelRuntimeFactory.standardCelRuntimeBuilder() | ||
.build(); | ||
|
||
private MessageFilters() { | ||
} | ||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); | ||
|
||
public static Predicate<TopicMessageDTO> createMsgFilter(String query, MessageFilterTypeDTO type) { | ||
switch (type) { | ||
case STRING_CONTAINS: | ||
return containsStringFilter(query); | ||
case GROOVY_SCRIPT: | ||
return groovyScriptFilter(query); | ||
default: | ||
throw new IllegalStateException("Unknown query type: " + type); | ||
} | ||
return switch (type) { | ||
case STRING_CONTAINS -> containsStringFilter(query); | ||
case CEL_SCRIPT -> celScriptFilter(query); | ||
}; | ||
} | ||
|
||
static Predicate<TopicMessageDTO> containsStringFilter(String string) { | ||
return msg -> StringUtils.contains(msg.getKey(), string) | ||
|| StringUtils.contains(msg.getContent(), string); | ||
} | ||
|
||
static Predicate<TopicMessageDTO> groovyScriptFilter(String script) { | ||
var engine = getGroovyEngine(); | ||
var compiledScript = compileScript(engine, script); | ||
var jsonSlurper = new JsonSlurper(); | ||
return new Predicate<TopicMessageDTO>() { | ||
@SneakyThrows | ||
@Override | ||
public boolean test(TopicMessageDTO msg) { | ||
var bindings = engine.createBindings(); | ||
bindings.put("partition", msg.getPartition()); | ||
bindings.put("offset", msg.getOffset()); | ||
bindings.put("timestampMs", msg.getTimestamp().toInstant().toEpochMilli()); | ||
bindings.put("keyAsText", msg.getKey()); | ||
bindings.put("valueAsText", msg.getContent()); | ||
bindings.put("headers", msg.getHeaders()); | ||
bindings.put("key", parseToJsonOrReturnAsIs(jsonSlurper, msg.getKey())); | ||
bindings.put("value", parseToJsonOrReturnAsIs(jsonSlurper, msg.getContent())); | ||
var result = compiledScript.eval(bindings); | ||
if (result instanceof Boolean) { | ||
return (Boolean) result; | ||
} else { | ||
throw new ValidationException( | ||
"Unexpected script result: %s, Boolean should be returned instead".formatted(result)); | ||
} | ||
static Predicate<TopicMessageDTO> celScriptFilter(String script) { | ||
CelValidationResult celValidationResult = CEL_COMPILER.compile(script); | ||
if (celValidationResult.hasError()) { | ||
throw new CelException(script, celValidationResult.getErrorString()); | ||
} | ||
|
||
try { | ||
CelAbstractSyntaxTree ast = celValidationResult.getAst(); | ||
CelRuntime.Program program = CEL_RUNTIME.createProgram(ast); | ||
|
||
return createPredicate(script, program); | ||
} catch (CelValidationException | CelEvaluationException e) { | ||
throw new CelException(script, e); | ||
} | ||
} | ||
|
||
private static Predicate<TopicMessageDTO> createPredicate(String originalScript, CelRuntime.Program program) { | ||
return topicMessage -> { | ||
Object programResult; | ||
try { | ||
programResult = program.eval(recordToArgs(topicMessage)); | ||
} catch (CelEvaluationException e) { | ||
throw new CelException(originalScript, e); | ||
} | ||
|
||
if (programResult instanceof Boolean isMessageMatched) { | ||
return isMessageMatched; | ||
} | ||
|
||
throw new CelException( | ||
originalScript, | ||
"Unexpected script result, boolean should be returned instead. Script output: %s".formatted(programResult) | ||
); | ||
}; | ||
} | ||
|
||
@Nullable | ||
private static Object parseToJsonOrReturnAsIs(JsonSlurper parser, @Nullable String str) { | ||
if (str == null) { | ||
return null; | ||
private static Map<String, Map<String, Object>> recordToArgs(TopicMessageDTO topicMessage) { | ||
Map<String, Object> args = new HashMap<>(); | ||
|
||
args.put("partition", topicMessage.getPartition()); | ||
args.put("offset", topicMessage.getOffset()); | ||
|
||
if (topicMessage.getTimestamp() != null) { | ||
args.put("timestampMs", topicMessage.getTimestamp().toInstant().toEpochMilli()); | ||
} | ||
try { | ||
return parser.parseText(str); | ||
} catch (Exception e) { | ||
return str; | ||
|
||
if (topicMessage.getKey() != null) { | ||
args.put("key", parseToJsonOrReturnAsIs(topicMessage.getKey())); | ||
args.put("keyAsText", topicMessage.getKey()); | ||
} | ||
} | ||
|
||
private static synchronized GroovyScriptEngineImpl getGroovyEngine() { | ||
// it is pretty heavy object, so initializing it on-demand | ||
if (GROOVY_ENGINE == null) { | ||
GROOVY_ENGINE = (GroovyScriptEngineImpl) | ||
new ScriptEngineManager().getEngineByName("groovy"); | ||
if (topicMessage.getContent() != null) { | ||
args.put("value", parseToJsonOrReturnAsIs(topicMessage.getContent())); | ||
args.put("valueAsText", topicMessage.getContent()); | ||
} | ||
return GROOVY_ENGINE; | ||
|
||
args.put("headers", Objects.requireNonNullElse(topicMessage.getHeaders(), emptyMap())); | ||
|
||
return Map.of("record", args); | ||
} | ||
|
||
private static CompiledScript compileScript(GroovyScriptEngineImpl engine, String script) { | ||
private static CelCompiler createCompiler() { | ||
Map<String, CelType> fields = Map.of( | ||
"partition", SimpleType.INT, | ||
"offset", SimpleType.INT, | ||
"timestampMs", SimpleType.INT, | ||
"keyAsText", SimpleType.STRING, | ||
"valueAsText", SimpleType.STRING, | ||
"headers", MapType.create(SimpleType.STRING, SimpleType.STRING), | ||
"key", SimpleType.DYN, | ||
"value", SimpleType.DYN | ||
); | ||
|
||
ImmutableSet<String> names = ImmutableSet | ||
.<String>builder() | ||
.addAll(fields.keySet()) | ||
.build(); | ||
|
||
StructType recordType = StructType.create( | ||
CEL_RECORD_TYPE_NAME, | ||
names, | ||
fieldName -> Optional.ofNullable(fields.get(fieldName)) | ||
); | ||
|
||
return CelCompilerFactory.standardCelCompilerBuilder() | ||
.setOptions(CelOptions.DEFAULT) | ||
.setStandardMacros(CelStandardMacro.STANDARD_MACROS) | ||
.addVar(CEL_RECORD_VAR_NAME, recordType) | ||
.setResultType(SimpleType.BOOL) | ||
.setTypeProvider(new CelTypeProvider() { | ||
@Override | ||
public ImmutableCollection<CelType> types() { | ||
return ImmutableSet.of(recordType); | ||
} | ||
|
||
@Override | ||
public Optional<CelType> findType(String typeName) { | ||
return CEL_RECORD_TYPE_NAME.equals(typeName) ? Optional.of(recordType) : Optional.empty(); | ||
} | ||
}) | ||
.build(); | ||
} | ||
|
||
@Nullable | ||
private static Object parseToJsonOrReturnAsIs(@Nullable String str) { | ||
if (str == null) { | ||
return null; | ||
} | ||
|
||
try { | ||
return engine.compile(script); | ||
} catch (ScriptException e) { | ||
throw new ValidationException("Script syntax error: " + e.getMessage()); | ||
return OBJECT_MAPPER.readValue(str, new TypeReference<Map<String, Object>>() { | ||
}); | ||
} catch (JsonProcessingException e) { | ||
return str; | ||
} | ||
} | ||
|
||
} |
22 changes: 22 additions & 0 deletions
22
api/src/main/java/io/kafbat/ui/exception/CelException.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package io.kafbat.ui.exception; | ||
|
||
public class CelException extends CustomBaseException { | ||
private String celOriginalExpression; | ||
|
||
public CelException(String celOriginalExpression, String errorMessage) { | ||
super("CEL error. Original expression: %s. Error message: %s".formatted(celOriginalExpression, errorMessage)); | ||
|
||
this.celOriginalExpression = celOriginalExpression; | ||
} | ||
|
||
public CelException(String celOriginalExpression, Throwable celThrowable) { | ||
super("CEL error. Original expression: %s".formatted(celOriginalExpression), celThrowable); | ||
|
||
this.celOriginalExpression = celOriginalExpression; | ||
} | ||
|
||
@Override | ||
public ErrorCode getErrorCode() { | ||
return ErrorCode.CEL_ERROR; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.