Skip to content

Commit

Permalink
Fix QQL timestamp and symbols filters with parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman Kisel committed Feb 9, 2024
1 parent 32d77db commit 4968354
Show file tree
Hide file tree
Showing 9 changed files with 900 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.epam.deltix.qsrv.hf.tickdb.pub.SelectionOptions;
import com.epam.deltix.qsrv.hf.tickdb.pub.query.InstrumentMessageSource;
import com.epam.deltix.qsrv.hf.tickdb.pub.query.PreparedQuery;
import com.epam.deltix.util.collections.generated.IntegerArrayList;
import com.epam.deltix.util.io.IOUtil;
import com.epam.deltix.util.io.UncheckedIOException;
import com.epam.deltix.util.jcg.JClass;
Expand Down Expand Up @@ -788,6 +789,14 @@ private JExpr getMinLimit () {
return (limit == Long.MIN_VALUE ? null : CTXT.longLiteral (limit));
}

private IntegerArrayList getMinParams() {
return compFilter.tslimits.minParameters();
}

private IntegerArrayList getMaxParams() {
return compFilter.tslimits.maxParameters();
}

private void generateFirstOnlyCheck () {
if (compFilter.runningFilter != CompiledFilter.RunningFilter.FIRST_ONLY)
return;
Expand All @@ -803,68 +812,96 @@ private void generateFirstOnlyCheck () {
);
}

private void generateLimitCheck () {
TimestampLimits tslimits = compFilter.tslimits;
private void generateLimitCheck() {
TimestampLimits tslimits = compFilter.tslimits;

if (tslimits == null)
return;

boolean forward = compFilter.isForward ();
JExpr startLimit;
JExpr endLimit;
String op;

boolean forward = compFilter.isForward();
JExpr startLimit;
IntegerArrayList startParams;
JExpr endLimit;
IntegerArrayList endParams;
if (forward) {
startLimit = getMinLimit ();
endLimit = getMaxLimit ();
op = ">";
}
else {
startLimit = getMaxLimit ();
endLimit = getMinLimit ();
op = "<";
startLimit = getMinLimit();
startParams = getMinParams();
endLimit = getMaxLimit();
endParams = getMaxParams();
} else {
startLimit = getMaxLimit();
startParams = getMaxParams();
endLimit = getMinLimit();
endParams = getMinParams();
}

//TODO - support parameters
if (startLimit != null || startParams != null) {
JMethod adj = msiClass.addMethod(Modifier.PROTECTED, long.class, "adjustResetPoint");
JMethodArgument timeArg = adj.addArg(Modifier.FINAL, long.class, "time");

if (startLimit != null) {
JMethod adj =
msiClass.addMethod (Modifier.PROTECTED, long.class, "adjustResetPoint");

JMethodArgument timeArg =
adj.addArg (Modifier.FINAL, long.class, "time");

JExpr inRange =
CTXT.binExpr (timeArg, op, startLimit);

adj.body ().add (
CTXT.condExpr (inRange, timeArg, startLimit).returnStmt ()
adj.body().add(
CTXT.call(forward ? "adjustForwardResetPoint" : "adjustBackwardResetPoint",
timeArg,
startLimit != null ? startLimit : CTXT.staticVarRef("Long", forward ? "MIN_VALUE" : "MAX_VALUE"),
generateParamsList(startParams))
.returnStmt()
);
}

if (endLimit != null)
filterBody.add (
CTXT.ifStmt (
CTXT.binExpr (inMsg.call ("getTimeStampMs"), op, endLimit),
returnAbort
if (endLimit != null || endParams != null) {
JMethod adj = msiClass.addMethod(Modifier.PROTECTED, long.class, "getStopPoint");
adj.body().add(
CTXT.call(forward ? "getMinTime" : "getMaxTime",
endLimit != null ? endLimit : CTXT.staticVarRef("Long", forward ? "MAX_VALUE" : "MIN_VALUE"),
generateParamsList(endParams)
).returnStmt()
);

filterBody.add(
CTXT.ifStmt(
CTXT.binExpr(inMsg.call("getTimeStampMs"), forward ? ">" : "<", CTXT.localVarRef("stopPoint")),
returnAbort
)
);
}
}

private JExpr generateParamsList(IntegerArrayList params) {
if (params == null) {
return CTXT.nullLiteral();
}

JExpr[] paramExprs = new JExpr[params.size()];
for (int i = 0; i < params.size(); ++i) {
paramExprs[i] = CTXT.intLiteral(params.get(i));
}
return CTXT.newArrayExpr(int.class, paramExprs);
}

private void generateSymbolSubscription() {
SymbolLimits symbolLimits = compFilter.symbolLimits;
if (symbolLimits == null || symbolLimits.symbols().size() == 0) {
return;
if (symbolLimits != null && symbolLimits.hasSymbols()) {
JMethod adj = msiClass.addMethod(Modifier.PROTECTED, String[].class, "symbolsToAdjust");
adj.body().add(
CTXT.newArrayExpr(
String.class,
getSymbolLimitsExpressions(symbolLimits)
).returnStmt()
);
}
}

JMethod adj = msiClass.addMethod(Modifier.PROTECTED, String[].class, "symbolsToAdjust");
adj.body().add(
CTXT.newArrayExpr(
String.class,
symbolLimits.symbols().stream().map(CTXT::stringLiteral)
.toArray(JExpr[]::new)
).returnStmt ()
private JExpr[] getSymbolLimitsExpressions(SymbolLimits symbolLimits) {
List<JExpr> result = symbolLimits.symbols().stream().map(CTXT::stringLiteral)
.collect(Collectors.toList());
result.addAll(
symbolLimits.parameterRefs().stream()
.map(CTXT::intLiteral)
.map(i -> CTXT.call("getVarcharParam", i))
.collect(Collectors.toList())
);

return result.toArray(new JExpr[0]);
}

private void overrideNext (String delegateTo) {
Expand Down Expand Up @@ -1096,4 +1133,4 @@ private void genDecoderForOneType (
}
*/

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1472,7 +1472,11 @@ private CompiledQuery compileSelect(SelectExpression e) {
tslimits =
QQLPostProcessingPatterns.adjustTimestampLimits(flatCond, e.getEndTime());

symbolLimits = QQLPostProcessingPatterns.symbolLimits(cond);
List<CompiledExpression> matchedConditions = new ArrayList<>();
symbolLimits = QQLPostProcessingPatterns.symbolLimits(cond, matchedConditions);
if (!symbolLimits.isSubscribeAll()) {
matchedConditions.forEach(flatCond::remove);
}

cond = QQLPostProcessingPatterns.reconstructConjunction(flatCond);
} else {
Expand Down Expand Up @@ -2702,14 +2706,15 @@ private CompiledExpression compileInExpression(InExpression e, DataType expected
}

// adding argument into constants
int constantsCount = constants.size();
constants.add(0, carg);

ConnectiveExpression expr = new ConnectiveExpression(!e.positive,
StandardTypes.CLEAN_BOOLEAN, constants.toArray(new CompiledExpression[constants.size()]));

if (ret == null) {
ret = expr;
} else if (constants.size() > 0) {
} else if (constantsCount > 0) {
ret = e.positive ? processOr(ret, expr) : processAnd(ret, expr);
}

Expand Down Expand Up @@ -2820,4 +2825,4 @@ private ClassDataType captureClassType(ClassDataType type) {

return type;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,33 +158,42 @@ public static TimestampLimits extractTimestampLimits(List<CompiledExpression> co
return (range);
}

public static SymbolLimits symbolLimits(CompiledExpression<?> e) {
public static SymbolLimits symbolLimits(CompiledExpression<?> e, List<CompiledExpression> matched) {
boolean conj = isConjunction(e);
boolean disj = isDisjunction(e);
if (conj || disj) {
LogicalOperation ae = (LogicalOperation) e;

SymbolLimits s1 = symbolLimits(ae.args[0]);
SymbolLimits s2 = symbolLimits(ae.args[1]);
SymbolLimits s1 = symbolLimits(ae.args[0], matched);
SymbolLimits s2 = symbolLimits(ae.args[1], matched);
if (s1.isSubscribeAll() && s2.isSubscribeAll()) {
return new SymbolLimits(true);
} else if (s1.isSubscribeAll() && !s2.isSubscribeAll()) {
matched.add(e);
if (disj) {
return new SymbolLimits(true);
} else {
return new SymbolLimits(false, s2.symbols());
return new SymbolLimits(false, s2.symbols(), s2.parameterRefs());
}
} else if (!s1.isSubscribeAll() && s2.isSubscribeAll()) {
matched.add(e);
if (disj) {
return new SymbolLimits(true);
} else {
return new SymbolLimits(false, s1.symbols());
return new SymbolLimits(false, s1.symbols(), s1.parameterRefs());
}
} else if (!s1.isSubscribeAll() && !s2.isSubscribeAll()) {
matched.add(e);
if (disj) {
return new SymbolLimits(false, symbolsUnion(s1.symbols(), s2.symbols()));
return new SymbolLimits(false,
union(s1.symbols(), s2.symbols()),
union(s1.parameterRefs(), s2.parameterRefs())
);
} else {
return new SymbolLimits(false, symbolsIntersection(s1.symbols(), s2.symbols()));
return new SymbolLimits(false,
intersection(s1.symbols(), s2.symbols()),
intersection(s1.parameterRefs(), s2.parameterRefs())
);
}
}
} else {
Expand All @@ -193,9 +202,17 @@ public static SymbolLimits symbolLimits(CompiledExpression<?> e) {
CompiledExpression<?> left = operation.args[0];
CompiledExpression<?> right = operation.args[1];
if (left instanceof SymbolSelector && right instanceof CompiledConstant) {
matched.add(e);
return new SymbolLimits(false, ((CompiledConstant) right).value.toString());
} else if (right instanceof SymbolSelector && left instanceof CompiledConstant) {
matched.add(e);
return new SymbolLimits(false, ((CompiledConstant) left).value.toString());
} else if (left instanceof SymbolSelector && right instanceof ParamAccess) {
matched.add(e);
return new SymbolLimits(false, ((ParamAccess) right).ref.index);
} else if (right instanceof SymbolSelector && left instanceof ParamAccess) {
matched.add(e);
return new SymbolLimits(false, ((ParamAccess) left).ref.index);
}
} else if (e instanceof ConnectiveExpression) {
ConnectiveExpression operation = (ConnectiveExpression) e;
Expand All @@ -207,28 +224,29 @@ public static SymbolLimits symbolLimits(CompiledExpression<?> e) {
}
}

return new SymbolLimits(false, symbols);
matched.add(e);
return new SymbolLimits(false, symbols, new ArrayList<>());
}
}
}

return new SymbolLimits(true);
}

private static List<String> symbolsUnion(List<String> s1, List<String> s2) {
HashSet<String> symbols = new HashSet<>(s1);
symbols.addAll(s2);
return new ArrayList<>(symbols);
private static <T> List<T> union(List<T> s1, List<T> s2) {
HashSet<T> values = new HashSet<>(s1);
values.addAll(s2);
return new ArrayList<>(values);
}

private static List<String> symbolsIntersection(List<String> s1, List<String> s2) {
HashSet<String> s1Symbols = new HashSet<>(s1);
ArrayList<String> symbols = new ArrayList<>();
private static <T> List<T> intersection(List<T> s1, List<T> s2) {
HashSet<T> values1 = new HashSet<>(s1);
ArrayList<T> values = new ArrayList<>();
s2.forEach(s -> {
if (s1Symbols.contains(s)) {
symbols.add(s);
if (values1.contains(s)) {
values.add(s);
}
});
return symbols;
return values;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
*
*/
public class SymbolLimits {
private List<String> symbols = new ArrayList<>();
private final List<String> symbols = new ArrayList<>();
private final List<Integer> parameterRefs = new ArrayList<>();
private boolean subscribeAll;

public SymbolLimits() {
Expand All @@ -40,9 +41,15 @@ public SymbolLimits(boolean subscribeAll, String symbol) {
this.symbols.add(symbol);
}

public SymbolLimits(boolean subscribeAll, List<String> symbols) {
public SymbolLimits(boolean subscribeAll, int paramNum) {
this.subscribeAll = subscribeAll;
this.parameterRefs.add(paramNum);
}

public SymbolLimits(boolean subscribeAll, List<String> symbols, List<Integer> parameterRefs) {
this.subscribeAll = subscribeAll;
this.symbols.addAll(symbols);
this.parameterRefs.addAll(parameterRefs);
}

public void addSymbol(String symbol) {
Expand All @@ -53,6 +60,18 @@ public List<String> symbols() {
return symbols;
}

public void addParameter(Integer paramNum) {
parameterRefs.add(paramNum);
}

public List<Integer> parameterRefs() {
return parameterRefs;
}

public boolean hasSymbols() {
return !symbols.isEmpty() || !parameterRefs.isEmpty();
}

public boolean isSubscribeAll() {
return subscribeAll;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
*/
public class TimestampLimits {
private static final int EXCLUSIVE_BIT = 0x80000000;
public static final int EXCLUSIVE_BIT = 0x80000000;

private long inclusiveMinimum = Long.MIN_VALUE;
private IntegerArrayList minParameters = null;
Expand Down Expand Up @@ -125,4 +125,13 @@ public long getInclusiveMaximum () {
public long getInclusiveMinimum () {
return (inclusiveMinimum);
}
}

public IntegerArrayList minParameters() {
return minParameters;
}

public IntegerArrayList maxParameters() {
return maxParameters;
}

}
Loading

0 comments on commit 4968354

Please sign in to comment.