Skip to content

Commit

Permalink
[Feature][Data Quality] Data Quality Support Choose Database (#14406)
Browse files Browse the repository at this point in the history
* add dataquality database api

* change ui

* api change

* update

* fix spotless

* fix h2

* fix pg

* fix-dead-line

* update

* fix-spotless

* update pg sql

* add ut

* fix ut

(cherry picked from commit 175d976)
  • Loading branch information
qingwli authored and zhongjiajie committed Jul 20, 2023
1 parent 6795554 commit d414f26
Show file tree
Hide file tree
Showing 29 changed files with 547 additions and 34 deletions.
4 changes: 4 additions & 0 deletions docs/docs/en/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ This document records the incompatible updates between each version. You need to
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)

## 3.2.0

* Add required field `database` in /datasources/tables && /datasources/tableColumns Api [#14406](https://github.com/apache/dolphinscheduler/pull/14406)

4 changes: 4 additions & 0 deletions docs/docs/zh/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)

## 3.2.0

* 在 /datasources/tables && /datasources/tableColumns 接口中添加了必选字段`database` [#14406](https://github.com/apache/dolphinscheduler/pull/14406)

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_DATABASES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
Expand Down Expand Up @@ -340,27 +341,43 @@ public Result getKerberosStartupState(@Parameter(hidden = true) @RequestAttribut

@Operation(summary = "tables", description = "GET_DATASOURCE_TABLES_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
})
@GetMapping(value = "/tables")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_TABLES_ERROR)
public Result getTables(@RequestParam("datasourceId") Integer datasourceId) {
Map<String, Object> result = dataSourceService.getTables(datasourceId);
public Result getTables(@RequestParam("datasourceId") Integer datasourceId,
@RequestParam(value = "database") String database) {
Map<String, Object> result = dataSourceService.getTables(datasourceId, database);
return returnDataList(result);
}

@Operation(summary = "tableColumns", description = "GET_DATASOURCE_TABLE_COLUMNS_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test"))
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test")),
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
})
@GetMapping(value = "/tableColumns")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_TABLE_COLUMNS_ERROR)
public Result getTableColumns(@RequestParam("datasourceId") Integer datasourceId,
@RequestParam("tableName") String tableName) {
Map<String, Object> result = dataSourceService.getTableColumns(datasourceId, tableName);
@RequestParam("tableName") String tableName,
@RequestParam(value = "database") String database) {
Map<String, Object> result = dataSourceService.getTableColumns(datasourceId, database, tableName);
return returnDataList(result);
}

@Operation(summary = "databases", description = "GET_DATASOURCE_DATABASE_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
})
@GetMapping(value = "/databases")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_DATABASES_ERROR)
public Result getDatabases(@RequestParam("datasourceId") Integer datasourceId) {
Map<String, Object> result = dataSourceService.getDatabases(datasourceId);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ public enum Status {
GET_DATASOURCE_OPTIONS_ERROR(1200017, "get datasource options error", "获取数据源Options错误"),
GET_DATASOURCE_TABLES_ERROR(1200018, "get datasource tables error", "获取数据源表列表错误"),
GET_DATASOURCE_TABLE_COLUMNS_ERROR(1200019, "get datasource table columns error", "获取数据源表列名错误"),
GET_DATASOURCE_DATABASES_ERROR(1200035, "get datasource databases error", "获取数据库列表错误"),

CREATE_CLUSTER_ERROR(120020, "create cluster error", "创建集群失败"),
CLUSTER_NAME_EXISTS(120021, "this cluster name [{0}] already exists", "集群名称[{0}]已经存在"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,24 @@ public interface DataSourceService {
/**
* get tables
* @param datasourceId
* @param database
* @return
*/
Map<String, Object> getTables(Integer datasourceId);
Map<String, Object> getTables(Integer datasourceId, String database);

/**
* get table columns
* @param datasourceId
* @param database
* @param tableName
* @return
*/
Map<String, Object> getTableColumns(Integer datasourceId, String tableName);
Map<String, Object> getTableColumns(Integer datasourceId, String database, String tableName);

/**
* get databases
* @param datasourceId
* @return
*/
Map<String, Object> getDatabases(Integer datasourceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
}

@Override
public Map<String, Object> getTables(Integer datasourceId) {
public Map<String, Object> getTables(Integer datasourceId, String database) {
Map<String, Object> result = new HashMap<>();

DataSource dataSource = dataSourceMapper.selectById(datasourceId);
Expand Down Expand Up @@ -551,7 +551,7 @@ public Map<String, Object> getTables(Integer datasourceId) {
}

tables = metaData.getTables(
connectionParam.getDatabase(),
database,
getDbSchemaPattern(dataSource.getType(), schema, connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
Expand Down Expand Up @@ -583,7 +583,7 @@ public Map<String, Object> getTables(Integer datasourceId) {
}

@Override
public Map<String, Object> getTableColumns(Integer datasourceId, String tableName) {
public Map<String, Object> getTableColumns(Integer datasourceId, String database, String tableName) {
Map<String, Object> result = new HashMap<>();

DataSource dataSource = dataSourceMapper.selectById(datasourceId);
Expand All @@ -603,8 +603,6 @@ public Map<String, Object> getTableColumns(Integer datasourceId, String tableNam
ResultSet rs = null;

try {

String database = connectionParam.getDatabase();
if (null == connection) {
return result;
}
Expand Down Expand Up @@ -635,6 +633,62 @@ public Map<String, Object> getTableColumns(Integer datasourceId, String tableNam
return result;
}

@Override
public Map<String, Object> getDatabases(Integer datasourceId) {
Map<String, Object> result = new HashMap<>();

DataSource dataSource = dataSourceMapper.selectById(datasourceId);

if (dataSource == null) {
putMsg(result, Status.QUERY_DATASOURCE_ERROR);
return result;
}

List<String> tableList;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
dataSource.getType(),
dataSource.getConnectionParams());

if (null == connectionParam) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}

Connection connection =
DataSourceUtils.getConnection(dataSource.getType(), connectionParam);
ResultSet rs = null;

try {
if (null == connection) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}
if (dataSource.getType() == DbType.POSTGRESQL) {
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY_PG);
}
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY);
tableList = new ArrayList<>();
while (rs.next()) {
String name = rs.getString(1);
tableList.add(name);
}
} catch (Exception e) {
log.error("Get databases error, datasourceId:{}.", datasourceId, e);
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
return result;
} finally {
closeResult(rs);
releaseConnection(connection);
}

List<ParamsOptions> options = getParamsOptions(tableList);

result.put(Constants.DATA_LIST, options);
putMsg(result, Status.SUCCESS);
return result;
}

private List<ParamsOptions> getParamsOptions(List<String> columnList) {
List<ParamsOptions> options = null;
if (CollectionUtils.isNotEmpty(columnList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLConnectionParam;
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO;
Expand All @@ -48,6 +49,7 @@
import org.apache.commons.collections4.CollectionUtils;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -517,4 +519,31 @@ public void testCheckConnection() throws Exception {
}
}

@Test
public void testGetDatabases() throws SQLException {
DataSource dataSource = getOracleDataSource();
int datasourceId = 1;
dataSource.setId(datasourceId);
Map<String, Object> result;
Mockito.when(dataSourceMapper.selectById(datasourceId)).thenReturn(null);
result = dataSourceService.getDatabases(datasourceId);
Assertions.assertEquals(Status.QUERY_DATASOURCE_ERROR, result.get(Constants.STATUS));

Mockito.when(dataSourceMapper.selectById(datasourceId)).thenReturn(dataSource);
MySQLConnectionParam connectionParam = new MySQLConnectionParam();
Connection connection = Mockito.mock(Connection.class);
MockedStatic<DataSourceUtils> dataSourceUtils = Mockito.mockStatic(DataSourceUtils.class);
dataSourceUtils.when(() -> DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
dataSourceUtils.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
.thenReturn(connectionParam);
result = dataSourceService.getDatabases(datasourceId);
Assertions.assertEquals(Status.GET_DATASOURCE_TABLES_ERROR, result.get(Constants.STATUS));

dataSourceUtils.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
.thenReturn(null);
result = dataSourceService.getDatabases(datasourceId);
Assertions.assertEquals(Status.DATASOURCE_CONNECT_FAILED, result.get(Constants.STATUS));
connection.close();
dataSourceUtils.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -850,4 +850,10 @@ private Constants() {
public static final String REMOTE_LOGGING_GCS_CREDENTIAL = "remote.logging.google.cloud.storage.credential";

public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";

/**
* data quality
*/
public static final String DATABASES_QUERY = "show databases";
public static final String DATABASES_QUERY_PG = "SELECT datname FROM pg_database";
}
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,12 @@ VALUES(28, 'enum_list', 'input', '$t(enum_list)', NULL, NULL, 'Please enter enum
INSERT INTO `t_ds_dq_rule_input_entry`
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
VALUES(29, 'begin_time', 'input', '$t(begin_time)', NULL, NULL, 'Please enter begin time', 0, 0, 0, 1, 1, 0, 0, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
INSERT INTO `t_ds_dq_rule_input_entry`
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
VALUES(30, 'src_database', 'select', '$t(src_database)', NULL, NULL, 'Please select source database', 0, 0, 0, 1, 1, 1, 1, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
INSERT INTO `t_ds_dq_rule_input_entry`
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
VALUES(31, 'target_database', 'select', '$t(target_database)', NULL, NULL, 'Please select target database', 0, 0, 0, 1, 1, 1, 1, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');

--
-- Table structure for table `t_ds_dq_task_statistics_value`
Expand Down Expand Up @@ -1851,9 +1857,45 @@ VALUES(148, 10, 17, NULL, 11, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.00
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(149, 10, 19, NULL, 12, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO t_ds_relation_rule_input_entry
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(150, 8, 29, NULL, 7, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(151, 1, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(152, 2, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(153, 3, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(154, 4, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(155, 5, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(156, 6, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(157, 7, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(158, 8, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(159, 9, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(160, 10, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(161, 3, 31, NULL, 6, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(150, 8, 29, NULL, 7, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
VALUES(162, 4, 31, NULL, 7, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');

--
-- Table structure for table t_ds_environment
Expand Down
Loading

0 comments on commit d414f26

Please sign in to comment.