Skip to content

Commit

Permalink
[Flink] Fix sql submit main entry (#494)
Browse files Browse the repository at this point in the history
* fix sql submit entry

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* add arrow sink example

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix build

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix clippy action

Signed-off-by: chenxu <chenxu@dmetasoul.com>

* fix clippy deprecation errors

Signed-off-by: chenxu <chenxu@dmetasoul.com>

---------

Signed-off-by: chenxu <chenxu@dmetasoul.com>
Co-authored-by: chenxu <chenxu@dmetasoul.com>
  • Loading branch information
xuchen-plus and dmetasoul01 authored Jun 16, 2024
1 parent 9958faf commit 2635e2b
Show file tree
Hide file tree
Showing 22 changed files with 1,038 additions and 809 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/consistency-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down Expand Up @@ -65,7 +65,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down Expand Up @@ -103,7 +103,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/flink-cdc-hdfs-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/flink-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/maven-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/native-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down Expand Up @@ -78,7 +78,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down Expand Up @@ -117,7 +117,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/presto-cdc-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
default: true
- uses: Swatinem/rust-cache@v2
with:
Expand Down
3 changes: 2 additions & 1 deletion .github/workflows/rust-clippy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
profile: minimal
toolchain: nightly-2023-05-20
toolchain: stable
components: clippy
default: true
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: "23.x"
repo-token: ${{ secrets.GITHUB_TOKEN }}
- uses: Swatinem/rust-cache@v2
with:
workspaces: "./rust -> target"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;

public class SubmitMain {
private static final Logger LOG = LoggerFactory.getLogger(SubmitMain.class);

public static void main(String[] args) throws IOException, URISyntaxException {
public static void main(String[] args)
throws IOException, URISyntaxException, ExecutionException, InterruptedException {
for (String arg : args) {
LOG.info("arg: {}", arg);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;

public abstract class Submitter {
protected SubmitOption submitOption;
Expand All @@ -16,6 +17,6 @@ public Submitter(SubmitOption submitOption) {
this.submitOption = submitOption;
}

public abstract void submit() throws IOException, URISyntaxException;
public abstract void submit() throws IOException, URISyntaxException, ExecutionException, InterruptedException;

}
Original file line number Diff line number Diff line change
@@ -1,100 +1,119 @@
// THIS FILE IS OVERWRITE BY THE zhp8341/FLINK-STREAMING-PLATFORM-WEB PROJECT, UNDER MIT LICENSE.

// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
//
// SPDX-License-Identifier: Apache-2.0
// This file is modified from https://github.com/apache/flink-kubernetes-operator/blob/main/examples/flink-sql-runner-example/src/main/java/org/apache/flink/examples/SqlRunner.java

package org.apache.flink.lakesoul.entry.sql.flink;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.BeginStatementSetOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.command.SetOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.helpers.MessageFormatter;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;

public class ExecuteSql {
public static void exeSql(List<String> sqlList, TableEnvironment tEnv) {
Parser parser = ((TableEnvironmentInternal) tEnv).getParser();
List<ModifyOperation> modifyOperationList = new ArrayList<>();
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;

for (String stmtOri : sqlList) {
String stmt = trimBlank(stmtOri);
Operation operation = parser.parse(stmt).get(0);
public class ExecuteSql {

// flink version 1.14.5
switch (operation.getClass().getSimpleName()) {
case "PlannerQueryOperation":
case "ShowTablesOperation":
case "ShowCatalogsOperation":
case "ShowCreateTableOperation":
case "ShowCurrentCatalogOperation":
case "ShowCurrentDatabaseOperation":
case "ShowDatabasesOperation":
case "ShowFunctionsOperation":
case "ShowModulesOperation":
case "ShowPartitionsOperation":
case "ShowViewsOperation":
case "ExplainOperation":
case "DescribeTableOperation":
tEnv.executeSql(stmt).print();
break;
private static final Logger LOG = LoggerFactory.getLogger(ExecuteSql.class);

//set
case "SetOperation":
SetOperation setOperation = (SetOperation) operation;
Configurations.setSingleConfiguration(tEnv, setOperation.getKey().get(),
setOperation.getValue().get());
break;
private static final String STATEMENT_DELIMITER = ";"; // a statement should end with `;`
private static final String LINE_DELIMITER = "\n";

case "BeginStatementSetOperation":
case "EndStatementSetOperation":
break;
private static final String COMMENT_PATTERN = "(--.*)|(((\\/\\*)+?[\\w\\W]+?(\\*\\/)+))";

case "DropTableOperation":
case "DropCatalogFunctionOperation":
case "DropTempSystemFunctionOperation":
case "DropCatalogOperation":
case "DropDatabaseOperation":
case "DropViewOperation":
case "CreateTableOperation":
case "CreateViewOperation":
case "CreateDatabaseOperation":
case "CreateCatalogOperation":
case "CreateTableASOperation":
case "CreateCatalogFunctionOperation":
case "CreateTempSystemFunctionOperation":
case "AlterTableOperation":
case "AlterViewOperation":
case "AlterDatabaseOperation":
case "AlterCatalogFunctionOperation":
case "UseCatalogOperation":
case "UseDatabaseOperation":
case "LoadModuleOperation":
case "UnloadModuleOperation":
case "NopOperation": {
((TableEnvironmentInternal) tEnv).executeInternal(operation).print();
break;
public static void executeSqlFileContent(String script, TableEnvironment tableEnv)
throws ExecutionException, InterruptedException {
List<String> statements = parseStatements(script);
Parser parser = ((TableEnvironmentInternal) tableEnv).getParser();
for (String statement : statements) {
Operation operation = parser.parse(statement).get(0);
if (operation instanceof SetOperation) {
SetOperation setOperation = (SetOperation) operation;
if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
System.out.println(MessageFormatter.format("\n======Setting config: {}={}",
setOperation.getKey().get(),
setOperation.getValue().get()).getMessage());
tableEnv.getConfig().getConfiguration()
.setString(setOperation.getKey().get(), setOperation.getValue().get());
} else if (setOperation.getKey().isPresent()) {
String value = tableEnv.getConfig().getConfiguration().getString(setOperation.getKey().get(), "");
System.out.println(MessageFormatter.format("Config {}={}",
setOperation.getKey().get(), value).getMessage());
} else {
System.out.println(MessageFormatter.format("All configs: {}",
tableEnv.getConfig().getConfiguration()).getMessage());
}
} else if (operation instanceof ModifyOperation || operation instanceof BeginStatementSetOperation) {
System.out.println(MessageFormatter.format("\n======Executing insertion:\n{}", statement).getMessage());
// execute insertion and do not wait for results for stream mode
if (tableEnv.getConfig().get(RUNTIME_MODE) == RuntimeExecutionMode.BATCH) {
tableEnv.executeSql(statement).await();
} else {
tableEnv.executeSql(statement);
}
// insert
case "ModifyOperation":
modifyOperationList.add((ModifyOperation) operation);
break;
default:
throw new RuntimeException("not support sql=" + stmt);
} else {
// for all show/select/alter/create catalog/use, etc. statements
// execute and print results
System.out.println(MessageFormatter.format("\n======Executing:\n{}", statement).getMessage());
tableEnv.executeSql(statement).print();
}
}
int modifyOperationListLength = modifyOperationList.size();
if (modifyOperationListLength == 0) {
return;
}

public static List<String> parseStatements(String script) {
String formatted =
formatSqlFile(script)
.replaceAll(COMMENT_PATTERN, "");

List<String> statements = new ArrayList<String>();

StringBuilder current = null;
boolean statementSet = false;
for (String line : formatted.split("\n")) {
String trimmed = line.trim();
if (StringUtils.isBlank(trimmed)) {
continue;
}
if (current == null) {
current = new StringBuilder();
}
if (trimmed.startsWith("EXECUTE STATEMENT SET")) {
statementSet = true;
}
current.append(trimmed);
current.append("\n");
if (trimmed.endsWith(STATEMENT_DELIMITER)) {
if (!statementSet || trimmed.equals("END;")) {
statements.add(current.toString());
current = null;
statementSet = false;
}
}
}
((TableEnvironmentInternal) tEnv).executeInternal(modifyOperationList).print();
return statements;
}

private static String trimBlank(String str) {
return str.replace("\\n", " ").replaceAll("\\s+", " ").trim();
public static String formatSqlFile(String content) {
String trimmed = content.trim();
StringBuilder formatted = new StringBuilder();
formatted.append(trimmed);
if (!trimmed.endsWith(STATEMENT_DELIMITER)) {
formatted.append(STATEMENT_DELIMITER);
}
formatted.append(LINE_DELIMITER);
return formatted.toString();
}
}
Loading

0 comments on commit 2635e2b

Please sign in to comment.