Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Data Quality] Data Quality Support Choose Database #14406

Merged
merged 14 commits into from
Jul 12, 2023
Merged
4 changes: 4 additions & 0 deletions docs/docs/en/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ This document records the incompatible updates between each version. You need to
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)

## 3.2.0

* Add required field `database` in /datasources/tables && /datasources/tableColumns Api [#14406](https://github.com/apache/dolphinscheduler/pull/14406)

4 changes: 4 additions & 0 deletions docs/docs/zh/guide/upgrade/incompatible.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@
* Copy and import workflow without 'copy' suffix [#10607](https://github.com/apache/dolphinscheduler/pull/10607)
* Use semicolon as default sql segment separator [#10869](https://github.com/apache/dolphinscheduler/pull/10869)

## 3.2.0

* 在 /datasources/tables && /datasources/tableColumns 接口中添加了必选字段`database` [#14406](https://github.com/apache/dolphinscheduler/pull/14406)

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.dolphinscheduler.api.enums.Status.CONNECT_DATASOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.CREATE_DATASOURCE_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.DELETE_DATA_SOURCE_FAILURE;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_DATABASES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLES_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.GET_DATASOURCE_TABLE_COLUMNS_ERROR;
import static org.apache.dolphinscheduler.api.enums.Status.KERBEROS_STARTUP_STATE;
Expand Down Expand Up @@ -340,27 +341,43 @@ public Result getKerberosStartupState(@Parameter(hidden = true) @RequestAttribut

@Operation(summary = "tables", description = "GET_DATASOURCE_TABLES_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
qingwli marked this conversation as resolved.
Show resolved Hide resolved
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
})
@GetMapping(value = "/tables")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_TABLES_ERROR)
public Result getTables(@RequestParam("datasourceId") Integer datasourceId) {
Map<String, Object> result = dataSourceService.getTables(datasourceId);
public Result getTables(@RequestParam("datasourceId") Integer datasourceId,
@RequestParam(value = "database") String database) {
Map<String, Object> result = dataSourceService.getTables(datasourceId, database);
return returnDataList(result);
}

@Operation(summary = "tableColumns", description = "GET_DATASOURCE_TABLE_COLUMNS_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1")),
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test"))
@Parameter(name = "tableName", description = "TABLE_NAME", required = true, schema = @Schema(implementation = String.class, example = "test")),
@Parameter(name = "database", description = "DATABASE", required = true, schema = @Schema(implementation = String.class, example = "test"))
})
@GetMapping(value = "/tableColumns")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_TABLE_COLUMNS_ERROR)
public Result getTableColumns(@RequestParam("datasourceId") Integer datasourceId,
@RequestParam("tableName") String tableName) {
Map<String, Object> result = dataSourceService.getTableColumns(datasourceId, tableName);
@RequestParam("tableName") String tableName,
@RequestParam(value = "database") String database) {
Map<String, Object> result = dataSourceService.getTableColumns(datasourceId, database, tableName);
return returnDataList(result);
}

@Operation(summary = "databases", description = "GET_DATASOURCE_DATABASE_NOTES")
@Parameters({
@Parameter(name = "datasourceId", description = "DATA_SOURCE_ID", required = true, schema = @Schema(implementation = int.class, example = "1"))
})
@GetMapping(value = "/databases")
@ResponseStatus(HttpStatus.OK)
@ApiException(GET_DATASOURCE_DATABASES_ERROR)
public Result getDatabases(@RequestParam("datasourceId") Integer datasourceId) {
Map<String, Object> result = dataSourceService.getDatabases(datasourceId);
return returnDataList(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ public enum Status {
GET_DATASOURCE_OPTIONS_ERROR(1200017, "get datasource options error", "获取数据源Options错误"),
GET_DATASOURCE_TABLES_ERROR(1200018, "get datasource tables error", "获取数据源表列表错误"),
GET_DATASOURCE_TABLE_COLUMNS_ERROR(1200019, "get datasource table columns error", "获取数据源表列名错误"),
GET_DATASOURCE_DATABASES_ERROR(1200035, "get datasource databases error", "获取数据库列表错误"),

CREATE_CLUSTER_ERROR(120020, "create cluster error", "创建集群失败"),
CLUSTER_NAME_EXISTS(120021, "this cluster name [{0}] already exists", "集群名称[{0}]已经存在"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,24 @@ public interface DataSourceService {
/**
* get tables
* @param datasourceId
* @param database
* @return
*/
Map<String, Object> getTables(Integer datasourceId);
Map<String, Object> getTables(Integer datasourceId, String database);

/**
* get table columns
* @param datasourceId
* @param database
* @param tableName
* @return
*/
Map<String, Object> getTableColumns(Integer datasourceId, String tableName);
Map<String, Object> getTableColumns(Integer datasourceId, String database, String tableName);

/**
* get databases
* @param datasourceId
* @return
*/
Map<String, Object> getDatabases(Integer datasourceId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ public Map<String, Object> authedDatasource(User loginUser, Integer userId) {
}

@Override
public Map<String, Object> getTables(Integer datasourceId) {
public Map<String, Object> getTables(Integer datasourceId, String database) {
Map<String, Object> result = new HashMap<>();

DataSource dataSource = dataSourceMapper.selectById(datasourceId);
Expand Down Expand Up @@ -551,7 +551,7 @@ public Map<String, Object> getTables(Integer datasourceId) {
}

tables = metaData.getTables(
connectionParam.getDatabase(),
database,
getDbSchemaPattern(dataSource.getType(), schema, connectionParam),
"%", TABLE_TYPES);
if (null == tables) {
Expand Down Expand Up @@ -583,7 +583,7 @@ public Map<String, Object> getTables(Integer datasourceId) {
}

@Override
public Map<String, Object> getTableColumns(Integer datasourceId, String tableName) {
public Map<String, Object> getTableColumns(Integer datasourceId, String database, String tableName) {
Map<String, Object> result = new HashMap<>();

DataSource dataSource = dataSourceMapper.selectById(datasourceId);
Expand All @@ -603,8 +603,6 @@ public Map<String, Object> getTableColumns(Integer datasourceId, String tableNam
ResultSet rs = null;

try {

String database = connectionParam.getDatabase();
if (null == connection) {
return result;
}
Expand Down Expand Up @@ -635,6 +633,62 @@ public Map<String, Object> getTableColumns(Integer datasourceId, String tableNam
return result;
}

@Override
public Map<String, Object> getDatabases(Integer datasourceId) {
qingwli marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> result = new HashMap<>();

DataSource dataSource = dataSourceMapper.selectById(datasourceId);

if (dataSource == null) {
putMsg(result, Status.QUERY_DATASOURCE_ERROR);
return result;
}

List<String> tableList;
BaseConnectionParam connectionParam =
(BaseConnectionParam) DataSourceUtils.buildConnectionParams(
dataSource.getType(),
dataSource.getConnectionParams());

if (null == connectionParam) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}

Connection connection =
DataSourceUtils.getConnection(dataSource.getType(), connectionParam);
ResultSet rs = null;

try {
if (null == connection) {
putMsg(result, Status.DATASOURCE_CONNECT_FAILED);
return result;
}
if (dataSource.getType() == DbType.POSTGRESQL) {
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY_PG);
}
rs = connection.createStatement().executeQuery(Constants.DATABASES_QUERY);
tableList = new ArrayList<>();
while (rs.next()) {
String name = rs.getString(1);
tableList.add(name);
}
} catch (Exception e) {
log.error("Get databases error, datasourceId:{}.", datasourceId, e);
putMsg(result, Status.GET_DATASOURCE_TABLES_ERROR);
return result;
} finally {
closeResult(rs);
releaseConnection(connection);
}

List<ParamsOptions> options = getParamsOptions(tableList);

result.put(Constants.DATA_LIST, options);
putMsg(result, Status.SUCCESS);
return result;
}

private List<ParamsOptions> getParamsOptions(List<String> columnList) {
List<ParamsOptions> options = null;
if (CollectionUtils.isNotEmpty(columnList)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.dolphinscheduler.plugin.datasource.api.utils.CommonUtils;
import org.apache.dolphinscheduler.plugin.datasource.api.utils.DataSourceUtils;
import org.apache.dolphinscheduler.plugin.datasource.hive.param.HiveDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLConnectionParam;
import org.apache.dolphinscheduler.plugin.datasource.mysql.param.MySQLDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.oracle.param.OracleDataSourceParamDTO;
import org.apache.dolphinscheduler.plugin.datasource.postgresql.param.PostgreSQLDataSourceParamDTO;
Expand All @@ -48,6 +49,7 @@
import org.apache.commons.collections4.CollectionUtils;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -517,4 +519,31 @@ public void testCheckConnection() throws Exception {
}
}

@Test
public void testGetDatabases() throws SQLException {
DataSource dataSource = getOracleDataSource();
int datasourceId = 1;
dataSource.setId(datasourceId);
Map<String, Object> result;
Mockito.when(dataSourceMapper.selectById(datasourceId)).thenReturn(null);
result = dataSourceService.getDatabases(datasourceId);
Assertions.assertEquals(Status.QUERY_DATASOURCE_ERROR, result.get(Constants.STATUS));

Mockito.when(dataSourceMapper.selectById(datasourceId)).thenReturn(dataSource);
MySQLConnectionParam connectionParam = new MySQLConnectionParam();
Connection connection = Mockito.mock(Connection.class);
MockedStatic<DataSourceUtils> dataSourceUtils = Mockito.mockStatic(DataSourceUtils.class);
dataSourceUtils.when(() -> DataSourceUtils.getConnection(Mockito.any(), Mockito.any())).thenReturn(connection);
dataSourceUtils.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
.thenReturn(connectionParam);
result = dataSourceService.getDatabases(datasourceId);
Assertions.assertEquals(Status.GET_DATASOURCE_TABLES_ERROR, result.get(Constants.STATUS));

dataSourceUtils.when(() -> DataSourceUtils.buildConnectionParams(Mockito.any(), Mockito.any()))
.thenReturn(null);
result = dataSourceService.getDatabases(datasourceId);
Assertions.assertEquals(Status.DATASOURCE_CONNECT_FAILED, result.get(Constants.STATUS));
connection.close();
dataSourceUtils.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -850,4 +850,10 @@ private Constants() {
public static final String REMOTE_LOGGING_GCS_CREDENTIAL = "remote.logging.google.cloud.storage.credential";

public static final String REMOTE_LOGGING_GCS_BUCKET_NAME = "remote.logging.google.cloud.storage.bucket.name";

/**
* data quality
*/
public static final String DATABASES_QUERY = "show databases";
qingwli marked this conversation as resolved.
Show resolved Hide resolved
public static final String DATABASES_QUERY_PG = "SELECT datname FROM pg_database";
}
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,12 @@ VALUES(28, 'enum_list', 'input', '$t(enum_list)', NULL, NULL, 'Please enter enum
INSERT INTO `t_ds_dq_rule_input_entry`
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
VALUES(29, 'begin_time', 'input', '$t(begin_time)', NULL, NULL, 'Please enter begin time', 0, 0, 0, 1, 1, 0, 0, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
INSERT INTO `t_ds_dq_rule_input_entry`
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
VALUES(30, 'src_database', 'select', '$t(src_database)', NULL, NULL, 'Please select source database', 0, 0, 0, 1, 1, 1, 1, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
INSERT INTO `t_ds_dq_rule_input_entry`
(`id`, `field`, `type`, `title`, `value`, `options`, `placeholder`, `option_source_type`, `value_type`, `input_type`, `is_show`, `can_edit`, `is_emit`, `is_validate`, `create_time`, `update_time`)
VALUES(31, 'target_database', 'select', '$t(target_database)', NULL, NULL, 'Please select target database', 0, 0, 0, 1, 1, 1, 1, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
qingwli marked this conversation as resolved.
Show resolved Hide resolved

--
-- Table structure for table `t_ds_dq_task_statistics_value`
Expand Down Expand Up @@ -1851,9 +1857,45 @@ VALUES(148, 10, 17, NULL, 11, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.00
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(149, 10, 19, NULL, 12, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO t_ds_relation_rule_input_entry
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(150, 8, 29, NULL, 7, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(151, 1, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(152, 2, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(153, 3, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(154, 4, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(155, 5, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(156, 6, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(157, 7, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(158, 8, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(159, 9, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(160, 10, 30, NULL, 2, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(161, 3, 31, NULL, 6, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');
INSERT INTO `t_ds_relation_rule_input_entry`
(`id`, `rule_id`, `rule_input_entry_id`, `values_map`, `index`, `create_time`, `update_time`)
VALUES(150, 8, 29, NULL, 7, '2021-03-03 11:31:24.0', '2021-03-03 11:31:24.0');
VALUES(162, 4, 31, NULL, 7, '2021-03-03 11:31:24.000', '2021-03-03 11:31:24.000');

--
-- Table structure for table t_ds_environment
Expand Down
Loading