Skip to content

Commit

Permalink
Support inserting into table having CHECK constraint in Delta Lake
Browse files Browse the repository at this point in the history
Extract relevant tests from TestDeltaLakeCheckConstraintsCompatibility.
  • Loading branch information
ebyhr committed Jan 25, 2023
1 parent d2003fc commit 7fc8d72
Show file tree
Hide file tree
Showing 19 changed files with 1,427 additions and 87 deletions.
11 changes: 11 additions & 0 deletions plugin/trino-delta-lake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${dep.antlr.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
Expand Down Expand Up @@ -440,6 +446,11 @@
</ignoredNonTestScopedDependencies>
</configuration>
</plugin>

<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

grammar SparkExpression;

tokens {
DELIMITER
}

standaloneExpression
: expression EOF
;

expression
: booleanExpression
;

booleanExpression
: valueExpression predicate[$valueExpression.ctx]? #predicated
| booleanExpression AND booleanExpression #and
;

// workaround for https://github.com/antlr/antlr4/issues/780
predicate[ParserRuleContext value]
: comparisonOperator right=valueExpression #comparison
;

valueExpression
: primaryExpression #valueExpressionDefault
;

primaryExpression
: number #numericLiteral
| booleanValue #booleanLiteral
| string #stringLiteral
| identifier #columnReference
;

string
: STRING #basicStringLiteral
;

comparisonOperator
: EQ | NEQ | LT | LTE | GT | GTE
;

booleanValue
: TRUE | FALSE
;

identifier
: IDENTIFIER #unquotedIdentifier
| BACKQUOTED_IDENTIFIER #backQuotedIdentifier
;

number
: MINUS? INTEGER_VALUE #integerLiteral
;

AND: 'AND';
FALSE: 'FALSE';
TRUE: 'TRUE';

EQ: '=';
NEQ: '<>' | '!=';
LT: '<';
LTE: '<=';
GT: '>';
GTE: '>=';

MINUS: '-';

STRING
: '\'' ( ~'\'' | '\'\'' )* '\''
| '"' ( ~'"' | '""' )* '"'
;

INTEGER_VALUE
: DIGIT+
;

IDENTIFIER
: (LETTER | '_') (LETTER | DIGIT | '_')*
;

BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;

fragment DIGIT
: [0-9]
;

fragment LETTER
: [A-Z]
;

WS
: [ \r\n\t]+ -> channel(HIDDEN)
;

// Catch-all for anything we can't recognize.
// We use this to be able to ignore and recover all the text
// when splitting statements with DelimiterLexer
UNRECOGNIZED
: .
;
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.hdfs.HdfsContext;
import io.trino.hdfs.HdfsEnvironment;
import io.trino.plugin.deltalake.expression.SparkExpressions;
import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore;
import io.trino.plugin.deltalake.metastore.NotADeltaLakeTableException;
import io.trino.plugin.deltalake.procedure.DeltaLakeTableExecuteHandle;
Expand Down Expand Up @@ -443,6 +444,9 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
String location = metastore.getTableLocation(tableHandle.getSchemaTableName(), session);
Map<String, String> columnComments = getColumnComments(tableHandle.getMetadataEntry());
Map<String, Boolean> columnsNullability = getColumnsNullability(tableHandle.getMetadataEntry());
List<String> checkConstraints = getCheckConstraints(tableHandle.getMetadataEntry()).values().stream()
.map(SparkExpressions::toTrinoExpression)
.collect(toImmutableList());
List<ColumnMetadata> columns = getColumns(tableHandle.getMetadataEntry()).stream()
.map(column -> getColumnMetadata(column, columnComments.get(column.getName()), columnsNullability.getOrDefault(column.getName(), true)))
.collect(toImmutableList());
Expand All @@ -458,7 +462,8 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
tableHandle.getSchemaTableName(),
columns,
properties.buildOrThrow(),
Optional.ofNullable(tableHandle.getMetadataEntry().getDescription()));
Optional.ofNullable(tableHandle.getMetadataEntry().getDescription()),
checkConstraints);
}

@Override
Expand Down Expand Up @@ -1272,9 +1277,6 @@ public ConnectorInsertTableHandle beginInsert(ConnectorSession session, Connecto
if (!columnInvariants.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Inserts are not supported for tables with delta invariants");
}
if (!getCheckConstraints(table.getMetadataEntry()).isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with CHECK constraints is not supported");
}
checkUnsupportedGeneratedColumns(table.getMetadataEntry());
checkSupportedWriterVersion(session, table.getSchemaTableName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.expression;

import javax.annotation.Nullable;

public abstract class AstVisitor<R, C>
{
public R process(Node node, @Nullable C context)
{
return node.accept(this, context);
}

protected R visitNode(Node node, C context)
{
return null;
}

protected R visitExpression(Expression node, C context)
{
return visitNode(node, context);
}

protected R visitComparisonExpression(ComparisonExpression node, C context)
{
return visitExpression(node, context);
}

protected R visitLogicalExpression(LogicalExpression node, C context)
{
return visitExpression(node, context);
}

protected R visitIdentifier(Identifier node, C context)
{
return visitExpression(node, context);
}

protected R visitStringLiteral(StringLiteral node, C context)
{
return visitLiteral(node, context);
}

protected R visitLiteral(Literal node, C context)
{
return visitExpression(node, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.deltalake.expression;

import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.IntStream;
import org.antlr.v4.runtime.misc.Interval;

public class CaseInsensitiveStream
implements CharStream
{
private final CharStream stream;

public CaseInsensitiveStream(CharStream stream)
{
this.stream = stream;
}

@Override
public String getText(Interval interval)
{
return stream.getText(interval);
}

@Override
public void consume()
{
stream.consume();
}

@Override
public int LA(int i)
{
int result = stream.LA(i);

switch (result) {
case 0:
case IntStream.EOF:
return result;
default:
return Character.toUpperCase(result);
}
}

@Override
public int mark()
{
return stream.mark();
}

@Override
public void release(int marker)
{
stream.release(marker);
}

@Override
public int index()
{
return stream.index();
}

@Override
public void seek(int index)
{
stream.seek(index);
}

@Override
public int size()
{
return stream.size();
}

@Override
public String getSourceName()
{
return stream.getSourceName();
}
}
Loading

0 comments on commit 7fc8d72

Please sign in to comment.