Skip to content

Commit

Permalink
Zfunction and Zstream support (#1354)
Browse files Browse the repository at this point in the history
  • Loading branch information
akrambek authored Dec 27, 2024
1 parent 3e2a862 commit 063d7cb
Show file tree
Hide file tree
Showing 64 changed files with 3,792 additions and 1,600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ options {

Dollar: '$';

DOLLAR_DELIMITER: '$$';

OPEN_PAREN: '(';

CLOSE_PAREN: ')';
Expand All @@ -58,6 +60,10 @@ STAR: '*';

EQUAL: '=';

DISPATCH_ON: 'DISPATCH_ON';

HANDLERS: 'HANDLERS';

DOT: '.';
//NamedArgument : ':=';

Expand Down Expand Up @@ -295,7 +301,7 @@ TOPIC: 'TOPIC';

TOPICS: 'TOPICS';

STREAM: 'STREAM';
ZSTREAM: 'ZSTREAM';

ZVIEW: 'ZVIEW';

Expand All @@ -313,6 +319,8 @@ USER: 'USER';

USING: 'USING';

LINK: 'LINK';

VARIADIC: 'VARIADIC';

WHEN: 'WHEN';
Expand Down Expand Up @@ -548,6 +556,12 @@ FUNCTION: 'FUNCTION';

FUNCTIONS: 'FUNCTIONS';

ZFUNCTION: 'ZFUNCTION';

ZFUNCTIONS: 'ZFUNCTIONS';

ZSTREAMS: 'ZSTREAMS';

GLOBAL: 'GLOBAL';

GRANTED: 'GRANTED';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ stmt
| createforeignserverstmt
| createforeigntablestmt
| createfunctionstmt
| createzfunctionstmt
| creategroupstmt
| creatematviewstmt
| createzviewstmt
Expand All @@ -106,7 +107,7 @@ stmt
| createseqstmt
| createstmt
| createztstmt
| createstreamstmt
| createzstreamstmt
| createsubscriptionstmt
| createstatsstmt
| createtablespacestmt
Expand Down Expand Up @@ -414,15 +415,15 @@ altertablestmt
| ALTER MATERIALIZED VIEW (IF_P EXISTS)? qualified_name alter_table_cmds
| ALTER MATERIALIZED VIEW ALL IN_P TABLESPACE name (OWNED BY role_list)? SET TABLESPACE name opt_nowait
| ALTER FOREIGN TABLE (IF_P EXISTS)? relation_expr alter_table_cmds
| ALTER STREAM (IF_P EXISTS)? relation_expr alter_stream_cmds
| ALTER ZSTREAM (IF_P EXISTS)? relation_expr alter_zstream_cmds
| ALTER ZVIEW (IF_P EXISTS)? qualified_name alter_table_cmds
;

alter_stream_cmds
: alter_stream_cmd (COMMA alter_stream_cmd)*
alter_zstream_cmds
: alter_zstream_cmd (COMMA alter_zstream_cmd)*
;

alter_stream_cmd
alter_zstream_cmd
: ADD_P columnDef
| ADD_P IF_P NOT EXISTS columnDef
| ADD_P COLUMN columnDef
Expand Down Expand Up @@ -693,25 +694,55 @@ copy_generic_opt_arg_list_item
: opt_boolean_or_string
;

createstreamstmt
: CREATE STREAM (IF_P NOT EXISTS)? stream_name OPEN_PAREN stream_columns CLOSE_PAREN opt_with_stream
createzstreamstmt
: CREATE ZSTREAM (IF_P NOT EXISTS)? zstream_name OPEN_PAREN zstream_columns CLOSE_PAREN opt_with_zstream
;

stream_name
zstream_name
: qualified_name
;

stream_columns
: stream_column (COMMA stream_column)*
zstream_columns
: zstream_column (COMMA zstream_column)*
;

stream_column
: colid typename
zstream_column
: colid typename opt_generated_clause
;

opt_with_stream
: WITH reloptions
|
opt_generated_clause
: GENERATED ALWAYS AS generation_type
| /* Empty */
;

generation_type
: IDENTITY_P
| NOW
;

opt_with_zstream
: WITH OPEN_PAREN zreloptions CLOSE_PAREN
;

zreloptions
: zreloption_elem (COMMA zreloption_elem)*
;

zreloption_elem
: DISPATCH_ON EQUAL sconst
| HANDLERS EQUAL OPEN_PAREN handler_mappings CLOSE_PAREN
;

handler_mappings
: handler_mapping (COMMA handler_mapping)*
;

handler_mapping
: sconst TO function_name
;

function_name
: sconst
;

createztstmt
Expand Down Expand Up @@ -1645,9 +1676,10 @@ show_object_type_name
| VIEWS
| MATERIALIZED VIEWS
| TOPICS
| HEAD
| ZTABLES
| ZVIEWS
| ZFUNCTIONS
| ZSTREAMS
;

dropstmt
Expand All @@ -1671,9 +1703,8 @@ object_type_any_name
| VIEW
| MATERIALIZED VIEW
| TOPIC
| STREAM
| ZSTREAM
| ZVIEW
| HEAD
| ZTABLE
| INDEX
| FOREIGN TABLE
Expand Down Expand Up @@ -2010,14 +2041,19 @@ opt_nulls_order
|
;

createzfunctionstmt
: CREATE ZFUNCTION func_name func_args_with_defaults
RETURNS TABLE OPEN_PAREN table_func_column_list CLOSE_PAREN LANGUAGE SQL_P AS DOLLAR_DELIMITER selectstmt SEMI DOLLAR_DELIMITER
;

createfunctionstmt
: CREATE opt_or_replace (FUNCTION | PROCEDURE) func_name func_args_with_defaults (
RETURNS (func_return | TABLE OPEN_PAREN table_func_column_list CLOSE_PAREN)
)? createfunc_opt_list
;

opt_type_parameters
: '<' type_parameters '>'
: LT type_parameters GT
|
;

Expand Down Expand Up @@ -4456,6 +4492,8 @@ unreserved_keyword
| FORWARD
| FUNCTION
| FUNCTIONS
| ZFUNCTION
| ZFUNCTIONS
| GENERATED
| GLOBAL
| GRANTED
Expand Down Expand Up @@ -4618,6 +4656,7 @@ unreserved_keyword
| SYSID
| SYSTEM_P
| TABLES
| ZTABLES
| TABLESPACE
| TEMP
| TEMPLATE
Expand Down Expand Up @@ -4812,6 +4851,7 @@ from pl_gram.y, line ~2982
| SOME
| SYMMETRIC
| TABLE
| ZTABLE
| THEN
| TO
| TRAILING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,64 +14,78 @@
*/
package io.aklivity.zilla.runtime.binding.pgsql.parser;

import java.util.BitSet;
import java.util.List;

import org.antlr.v4.runtime.ANTLRErrorListener;
import org.antlr.v4.runtime.BailErrorStrategy;
import org.antlr.v4.runtime.CharStream;
import org.antlr.v4.runtime.CharStreams;
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.Parser;
import org.antlr.v4.runtime.RecognitionException;
import org.antlr.v4.runtime.Recognizer;
import org.antlr.v4.runtime.atn.ATNConfigSet;
import org.antlr.v4.runtime.dfa.DFA;
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlAlterStreamTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlAlterZstreamTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlAlterZtableTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCommandListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateFunctionListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateStreamListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZfunctionListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZstreamListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZtableTopicListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlCreateZviewListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlDropListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.listener.SqlShowListener;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Alter;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateStream;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateFunction;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateTable;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZfunction;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZstream;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.CreateZview;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Drop;
import io.aklivity.zilla.runtime.binding.pgsql.parser.model.Function;

public final class PgsqlParser
{
private final ParseTreeWalker walker;
private final BailErrorStrategy errorStrategy;
private final PostgreSqlLexer lexer;
private final CommonTokenStream tokens;
private final PostgreSqlParser parser;
private final SqlCommandListener commandListener;
private final SqlCreateStreamListener createStreamListener;
private final SqlCreateZstreamListener createStreamListener;
private final SqlCreateZtableTopicListener createTableListener;
private final SqlAlterZtableTopicListener alterTableListener;
private final SqlAlterStreamTopicListener alterStreamListener;
private final SqlAlterZstreamTopicListener alterStreamListener;
private final SqlCreateFunctionListener createFunctionListener;
private final SqlCreateZfunctionListener createZfunctionListener;
private final SqlCreateZviewListener createMaterializedViewListener;
private final SqlShowListener showListener;
private final SqlDropListener dropListener;

public PgsqlParser()
{
this.walker = new ParseTreeWalker();
this.errorStrategy = new BailErrorStrategy();
BailErrorStrategy errorStrategy = new BailErrorStrategy();
ParserErrorListener errorListener = new ParserErrorListener();
this.lexer = new PostgreSqlLexer(null);
this.tokens = new CommonTokenStream(lexer);
this.parser = new PostgreSqlParser(tokens);
this.parser.removeErrorListeners();
this.parser.addErrorListener(errorListener);
this.parser.setErrorHandler(errorStrategy);

this.commandListener = new SqlCommandListener(tokens);
this.createTableListener = new SqlCreateZtableTopicListener(tokens);
this.alterTableListener = new SqlAlterZtableTopicListener(tokens);
this.alterStreamListener = new SqlAlterStreamTopicListener(tokens);
this.createStreamListener = new SqlCreateStreamListener(tokens);
this.alterStreamListener = new SqlAlterZstreamTopicListener(tokens);
this.createStreamListener = new SqlCreateZstreamListener(tokens);
this.createFunctionListener = new SqlCreateFunctionListener(tokens);
this.createZfunctionListener = new SqlCreateZfunctionListener(tokens);
this.createMaterializedViewListener = new SqlCreateZviewListener(tokens);
this.dropListener = new SqlDropListener();
this.showListener = new SqlShowListener();
parser.setErrorHandler(errorStrategy);
}

public String parseCommand(
Expand Down Expand Up @@ -102,20 +116,27 @@ public Alter parseAlterStream(
return alterStreamListener.alter();
}

public CreateStream parseCreateStream(
public CreateZstream parseCreateZstream(
String sql)
{
parser(sql, createStreamListener);
return createStreamListener.stream();
}

public Function parseCreateFunction(
public CreateFunction parseCreateFunction(
String sql)
{
parser(sql, createFunctionListener);
return createFunctionListener.function();
}

public CreateZfunction parseCreateZfunction(
String sql)
{
parser(sql, createZfunctionListener);
return createZfunctionListener.zfunction();
}

public CreateZview parseCreateZView(
String sql)
{
Expand Down Expand Up @@ -158,4 +179,54 @@ private void parser(
{
}
}

private final class ParserErrorListener implements ANTLRErrorListener
{
@Override
public void syntaxError(
Recognizer<?, ?> recognizer,
Object offendingSymbol,
int line,
int charPositionInLine,
String msg,
RecognitionException e)
{
//Only for debugging
//System.err.println("Syntax error at line " + line + ":" + charPositionInLine + " " + msg);
}

@Override
public void reportAmbiguity(
Parser recognizer,
DFA dfa,
int startIndex,
int stopIndex,
boolean exact,
BitSet ambigAlts,
ATNConfigSet configs)
{
}

@Override
public void reportAttemptingFullContext(
Parser recognizer,
DFA dfa,
int startIndex,
int stopIndex,
BitSet conflictingAlts,
ATNConfigSet configs)
{
}

@Override
public void reportContextSensitivity(
Parser recognizer,
DFA dfa,
int startIndex,
int stopIndex,
int prediction,
ATNConfigSet configs)
{
}
}
}
Loading

0 comments on commit 063d7cb

Please sign in to comment.