Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel dynamic function calls #1031

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/main/java/org/rumbledb/api/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,15 @@ default public RuntimeIterator getBodyIterator() {
throw new UnsupportedOperationException("Operation not defined for type " + this.getDynamicType());
}

/**
* Returns the body iterator, if it is a function item.
*
* @return the function signature.
*/
default public Map<Long, RuntimeIterator> getBodyIterators() {
throw new UnsupportedOperationException("Operation not defined for type " + this.getDynamicType());
}

/**
* Returns the local variable bindings, if it is a function item.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import org.rumbledb.types.SequenceType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -497,12 +498,15 @@ public RuntimeIterator visitInlineFunctionExpr(InlineFunctionExpression expressi
paramNameToSequenceTypes.put(paramEntry.getKey(), paramEntry.getValue());
}
SequenceType returnType = expression.getReturnType();
RuntimeIterator bodyIterator = this.visit(expression.getBody(), argument);
Map<Long, RuntimeIterator> bodyIterators = new HashMap<>();
for (long l : expression.getBodies().keySet()) {
bodyIterators.put(l, this.visit(expression.getBodies().get(l), argument));
}
RuntimeIterator runtimeIterator = new FunctionRuntimeIterator(
expression.getName(),
paramNameToSequenceTypes,
returnType,
bodyIterator,
bodyIterators,
expression.getHighestExecutionMode(this.visitorConfig),
expression.getMetadata()
);
Expand Down
45 changes: 34 additions & 11 deletions src/main/java/org/rumbledb/compiler/StaticContextVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.rumbledb.context.BuiltinFunctionCatalogue;
import org.rumbledb.context.FunctionIdentifier;
Expand Down Expand Up @@ -191,23 +192,45 @@ public StaticContext visitFunctionDeclaration(FunctionDeclaration declaration, S
@Override
public StaticContext visitInlineFunctionExpr(InlineFunctionExpression expression, StaticContext argument) {
// define a static context for the function body, add params to the context and visit the body expression
StaticContext functionDeclarationContext = new StaticContext(argument);
expression.getParams()
.forEach(
(paramName, sequenceType) -> functionDeclarationContext.addVariable(
paramName,
sequenceType,
expression.getMetadata(),
ExecutionMode.LOCAL
)
StaticContext functionDeclarationContextLocal = new StaticContext(argument);
for (Entry<Name, SequenceType> entry : expression.getParams().entrySet()) {
functionDeclarationContextLocal.addVariable(
entry.getKey(),
entry.getValue(),
expression.getMetadata(),
ExecutionMode.LOCAL
);
}
// visit the body first to make its execution mode available while adding the function to the catalog
this.visit(expression.getBody(), functionDeclarationContext);
this.visit(expression.getBodies().get(0L), functionDeclarationContextLocal);

StaticContext functionDeclarationContextRDD = new StaticContext(argument);
boolean first = true;
for (Entry<Name, SequenceType> entry : expression.getParams().entrySet()) {
if (first) {
functionDeclarationContextRDD.addVariable(
entry.getKey(),
entry.getValue(),
expression.getMetadata(),
ExecutionMode.DATAFRAME
);
first = false;
} else {
functionDeclarationContextRDD.addVariable(
entry.getKey(),
entry.getValue(),
expression.getMetadata(),
ExecutionMode.LOCAL
);
}
}
StaticContext functionDeclarationContextDF = new StaticContext(argument);
this.visit(expression.getBodies().get(1L), functionDeclarationContextDF);
expression.initHighestExecutionMode(this.visitorConfig);
expression.registerUserDefinedFunctionExecutionMode(
this.visitorConfig
);
return functionDeclarationContext;
return argument;
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/rumbledb/compiler/TranslationVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public Name parseName(JsoniqParser.QnameContext ctx, boolean isFunction, boolean
@Override
public Node visitFunctionDecl(JsoniqParser.FunctionDeclContext ctx) {
Name name = parseName(ctx.qname(), true, false);
Map<Name, SequenceType> fnParams = new LinkedHashMap<>();
LinkedHashMap<Name, SequenceType> fnParams = new LinkedHashMap<>();
SequenceType fnReturnType = null;
Name paramName;
SequenceType paramType;
Expand Down Expand Up @@ -1371,7 +1371,7 @@ public Node visitNamedFunctionRef(JsoniqParser.NamedFunctionRefContext ctx) {

@Override
public Node visitInlineFunctionExpr(JsoniqParser.InlineFunctionExprContext ctx) {
Map<Name, SequenceType> fnParams = new LinkedHashMap<>();
LinkedHashMap<Name, SequenceType> fnParams = new LinkedHashMap<>();
SequenceType fnReturnType = SequenceType.MOST_GENERAL_SEQUENCE_TYPE;
Name paramName;
SequenceType paramType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,9 @@ public Void visitContextExpr(ContextItemExpression expression, Void argument) {

@Override
public Void visitInlineFunctionExpr(InlineFunctionExpression expression, Void argument) {
visit(expression.getBody(), null);
for (long l : expression.getBodies().keySet()) {
visit(expression.getBodies().get(l), null);
}
addInputVariableDependencies(expression, getInputVariableDependencies(expression.getBody()));
removeInputVariableDependencies(expression, expression.getParams().keySet());
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ private void processAnnotations(XQueryParser.AnnotationsContext annotations) {
@Override
public Node visitFunctionDecl(XQueryParser.FunctionDeclContext ctx) {
Name name = parseName(ctx.eqName(), true, false);
Map<Name, SequenceType> fnParams = new LinkedHashMap<>();
LinkedHashMap<Name, SequenceType> fnParams = new LinkedHashMap<>();
SequenceType fnReturnType = MOST_GENERAL_SEQUENCE_TYPE;
Name paramName;
SequenceType paramType;
Expand Down Expand Up @@ -1279,7 +1279,7 @@ public Node visitNamedFunctionRef(XQueryParser.NamedFunctionRefContext ctx) {

@Override
public Node visitInlineFunctionRef(XQueryParser.InlineFunctionRefContext ctx) {
Map<Name, SequenceType> fnParams = new LinkedHashMap<>();
LinkedHashMap<Name, SequenceType> fnParams = new LinkedHashMap<>();
SequenceType fnReturnType = SequenceType.MOST_GENERAL_SEQUENCE_TYPE;
Name paramName;
SequenceType paramType;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/rumbledb/expressions/CommaExpression.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

public class CommaExpression extends Expression {

private static final long serialVersionUID = 1L;

private final List<Expression> expressions;

public CommaExpression(List<Expression> expressions, ExceptionMetadata metadata) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/rumbledb/expressions/Expression.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
*/
public abstract class Expression extends Node {

protected StaticContext staticContext;
private static final long serialVersionUID = 1L;
protected transient StaticContext staticContext;

protected SequenceType inferredSequenceType;

Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/rumbledb/expressions/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.rumbledb.exceptions.ExceptionMetadata;
import org.rumbledb.exceptions.OurBadException;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
Expand All @@ -32,8 +33,9 @@
* This is the top-level class for nodes in the intermediate representation of a
* JSONiq query. Nodes include expressions, clauses, function declarations, etc.
*/
public abstract class Node {
public abstract class Node implements Serializable {

private static final long serialVersionUID = 1L;
private ExceptionMetadata metadata;

protected ExecutionMode highestExecutionMode = ExecutionMode.UNSET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.List;

public class AdditiveExpression extends Expression {
private static final long serialVersionUID = 1L;

private Expression leftExpression;
private Expression rightExpression;
private boolean isMinus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;

public class MultiplicativeExpression extends Expression {
private static final long serialVersionUID = 1L;

public static enum MultiplicativeOperator {
MUL("*"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

public class ComparisonExpression extends Expression {

private static final long serialVersionUID = 1L;

public static enum ComparisonOperator {
VC_EQ("eq"),
VC_NE("ne"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class ConditionalExpression extends Expression {

private static final long serialVersionUID = 1L;
private final Expression conditionExpression;
private final Expression thenExpression;
private final Expression elseExpression;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/rumbledb/expressions/flowr/Clause.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
public abstract class Clause extends Node {

/* Clauses are organized in doubly-linked lists */
private static final long serialVersionUID = 1L;

protected Clause previousClause;
protected Clause nextClause;
protected FLWOR_CLAUSES clauseType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@


public class CountClause extends Clause {
private static final long serialVersionUID = 1L;
private VariableReferenceExpression countClauseVar;

public CountClause(VariableReferenceExpression countClauseVar, ExceptionMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

public class FlworExpression extends Expression {

private static final long serialVersionUID = 1L;
private ReturnClause returnClause;

public FlworExpression(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

public class ForClause extends Clause {

private static final long serialVersionUID = 1L;
private final Name variableName;
private final boolean allowingEmpty;
private final Name positionalVariableName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

public class GroupByClause extends Clause {

private static final long serialVersionUID = 1L;
private final List<GroupByVariableDeclaration> variables;

public GroupByClause(List<GroupByVariableDeclaration> variables, ExceptionMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

public class LetClause extends Clause {

private static final long serialVersionUID = 1L;
private final Name variableName;
protected SequenceType sequenceType;
protected Expression expression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

public class OrderByClause extends Clause {

private static final long serialVersionUID = 1L;

private final List<OrderByClauseSortingKey> sortingKeys;
private final boolean isStable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

public class ReturnClause extends Clause {

private static final long serialVersionUID = 1L;

private final Expression returnExpr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;

public class SimpleMapExpression extends Expression {
private static final long serialVersionUID = 1L;
private Expression leftExpression;
private Expression rightExpression;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;

public class AndExpression extends Expression {
private static final long serialVersionUID = 1L;

private Expression leftExpression;
private Expression rightExpression;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

public class NotExpression extends Expression {

private static final long serialVersionUID = 1L;
private Expression mainExpression;

public NotExpression(Expression mainExpression, ExceptionMetadata metadata) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;

public class OrExpression extends Expression {
private static final long serialVersionUID = 1L;
private Expression leftExpression;
private Expression rightExpression;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

public class RangeExpression extends Expression {

private static final long serialVersionUID = 1L;
private Expression leftExpression;
private Expression rightExpression;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;

public class StringConcatExpression extends Expression {
private static final long serialVersionUID = 1L;
private Expression leftExpression;
private Expression rightExpression;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

public class FunctionDeclaration extends Node {

private static final long serialVersionUID = 1L;
private final InlineFunctionExpression functionExpression;

public FunctionDeclaration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

public class LibraryModule extends Module {

private static final long serialVersionUID = 1L;
protected StaticContext staticContext;
private String namespace;
private final Prolog prolog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class MainModule extends Module {

private static final long serialVersionUID = 1L;
protected StaticContext staticContext;
private final Prolog prolog;
private final Expression expression;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/rumbledb/expressions/module/Module.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.rumbledb.expressions.Node;

public abstract class Module extends Node {
private static final long serialVersionUID = 1L;

public Module(ExceptionMetadata metadata) {
super(metadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

public class Prolog extends Node {

private static final long serialVersionUID = 1L;
private List<Node> declarations;
private List<LibraryModule> importedModules;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

public class ArrayLookupExpression extends Expression {

private static final long serialVersionUID = 1L;

private Expression mainExpression;
private Expression lookupExpression;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

public class ArrayUnboxingExpression extends Expression {

private static final long serialVersionUID = 1L;

private Expression mainExpression;

public ArrayUnboxingExpression(Expression mainExpression, ExceptionMetadata metadata) {
Expand Down
Loading