Skip to content

Commit

Permalink
Merge pull request #555 from Kabimon/dev-1.2.0
Browse files Browse the repository at this point in the history
[Feature][datamodel][dataassets][datawarehouse] Fusion linkis1.0.3
  • Loading branch information
zqburde authored Mar 18, 2024
2 parents 66a7d38 + c7ae963 commit c91887a
Show file tree
Hide file tree
Showing 52 changed files with 986 additions and 1,038 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.webank.wedatasphere.dss.data.governance


import com.webank.wedatasphere.dss.data.governance.request._
import com.webank.wedatasphere.dss.data.governance.response._

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.webank.wedatasphere.dss.data.governance.impl


import com.webank.wedatasphere.dss.data.governance.request._
import com.webank.wedatasphere.dss.data.governance.response._
import com.webank.wedatasphere.dss.data.governance.{AbstractRemoteClient, DataAssetsRemoteClient}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.webank.wedatasphere.dss.data.governance

import com.webank.wedatasphere.dss.data.governance.impl.LinkisDataAssetsRemoteClient
import com.webank.wedatasphere.dss.data.governance.request.{GetHiveTblPartitionAction, GetTblPartInfoByNameAction}
import com.webank.wedatasphere.dss.data.governance.request.{GetHiveTblPartitionAction, GetTblPartInfoByNameAction, SearchHiveDbAction}
import org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy
import org.apache.linkis.httpclient.dws.config.DWSClientConfigBuilder

import java.util.concurrent.TimeUnit
Expand All @@ -11,17 +12,17 @@ import scala.Console.println
object TestDataAssetsRemoteClient {
def main(args: Array[String]): Unit = {
val clientConfig = DWSClientConfigBuilder.newBuilder()
.addServerUrl("http://localhost:20082")
.addServerUrl("http://192.168.0.25:8088")
.connectionTimeout(30000)
.discoveryEnabled(false)
.discoveryFrequency(1,TimeUnit.MINUTES)
.loadbalancerEnabled(true)
.maxConnectionSize(5)
.retryEnabled(false)
.readTimeout(30000)
.setAuthenticationStrategy(null)
.setAuthenticationStrategy(new TokenAuthenticationStrategy())
.setAuthTokenKey("hdfs")
.setAuthTokenValue("hdfs")
.setAuthTokenValue("BML-AUTH")
.setDWSVersion("v1")
.build()

Expand All @@ -30,16 +31,18 @@ object TestDataAssetsRemoteClient {
// val searchHiveTblResult = dataAssetsClient.searchHiveTbl(SearchHiveTblAction.builder().setUser("hdfs").setQuery("").setLimit(10).setOffset(0).setOwner("undefined").build()).getHiveList
// println(searchHiveTblResult)
//
// val searchHiveDbResult = dataAssetsClient.searchHiveDb(SearchHiveDbAction.builder().setUser("hdfs").setQuery("").setLimit(10).setOffset(0).setOwner("undefined").build()).getHiveList
// println(searchHiveDbResult)
val searchHiveDbResult = dataAssetsClient.searchHiveDb(SearchHiveDbAction.builder().setUser("hdfs").setQuery("").setLimit(10).setOffset(0).setOwner("undefined").build()).getHiveList
println(searchHiveDbResult)
//
// val hiveTblBasicResult = dataAssetsClient
// .getHiveTblBasic(GetHiveTblBasicAction.builder().setUser("hdfs").setGuid("27920dc8-1eef-4d7d-9423-b5967d9e2d33").build())
// .result
// println(hiveTblBasicResult)
//
val hiveTblPartitionResult = dataAssetsClient.getHiveTblPartition(GetHiveTblPartitionAction.builder().setUser("hdfs").setGuid("a3be4a97-6465-4c3d-adee-76dfa662e531").build()).result
println(hiveTblPartitionResult)
// val hiveTblPartitionResult = dataAssetsClient.getHiveTblPartition(GetHiveTblPartitionAction.builder().setUser("hdfs").setGuid("a3be4a97-6465-4c3d-adee-76dfa662e531").build()).result
// println(hiveTblPartitionResult)


//
// val hiveTblCreateResult = dataAssetsClient.getHiveTblCreate(GetHiveTblCreateAction.builder().setUser("hdfs").setGuid("a3be4a97-6465-4c3d-adee-76dfa662e531").build()).result
// println(hiveTblCreateResult)
Expand Down Expand Up @@ -102,7 +105,7 @@ object TestDataAssetsRemoteClient {
//
// println(hiveTableStatsResult.getResult)

val partInfo = dataAssetsClient.getHiveTblPartInfoByNameResult(GetTblPartInfoByNameAction.builder().setUser("hdfs").setDbName("linkis_db").setTableName("linkis_partitions").build())
println(partInfo.getInfo)
// val partInfo = dataAssetsClient.getHiveTblPartInfoByNameResult(GetTblPartInfoByNameAction.builder().setUser("hdfs").setDbName("linkis_db").setTableName("linkis_partitions").build())
// println(partInfo.getInfo)
}
}
20 changes: 19 additions & 1 deletion dss-apps/dss-dataasset-management/data-assets-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<atlas.version>2.2.0</atlas.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<hadoop.version>2.7.2</hadoop.version>
</properties>

<dependencies>
Expand All @@ -31,6 +32,10 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand All @@ -48,6 +53,7 @@
<version>${linkis.version}</version>
</dependency>


<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client-v2</artifactId>
Expand All @@ -57,8 +63,13 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand All @@ -77,6 +88,13 @@
<artifactId>dss-framework-workspace-client</artifactId>
<version>1.0.1</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -115,7 +133,7 @@
</executions>
<configuration>
<skipAssembly>false</skipAssembly>
<finalName>data-assets-server</finalName>
<finalName>data-assets</finalName>
<appendAssemblyId>false</appendAssemblyId>
<attach>false</attach>
<descriptors>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public String createSubClassification(String name, String superType) throws Atla
atlasTypesDef.setClassificationDefs(Lists.newArrayList(atlasClassificationDef));
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
params.add("type","classification");
return callAPI(AtlasClientV2.API_V2.CREATE_TYPE_DEFS, String.class, gson.toJson(atlasTypesDef),params);
return callAPI(API_V2.CREATE_TYPE_DEFS, String.class, gson.toJson(atlasTypesDef),params);
}

// Glossary APIs
Expand Down Expand Up @@ -233,7 +233,23 @@ public String basicSearchPostForString(final String typeName, final String class
searchParameters.setIncludeClassificationAttributes(true);
searchParameters.setExcludeDeletedEntities(excludeDeletedEntities);
searchParameters.setAttributes(returnColumnsParams);
return callAPI(AtlasClientV2.API_V2.FACETED_SEARCH, String.class, gson.toJson(searchParameters),new MultivaluedMapImpl());
return callAPI(API_V2.FACETED_SEARCH, String.class, gson.toJson(searchParameters),new MultivaluedMapImpl());
}

/**
* 根据关键字检索实体
*/
public String basicSearchPostForLabel(final String typeName, final String termName,final boolean excludeDeletedEntities) throws AtlasServiceException {
SearchParameters searchParameters = new SearchParameters();
Set<String> returnColumnsParams = Sets.newHashSet( "parameters", "lastAccessTime");
searchParameters.setTypeName(typeName);
searchParameters.setTermName(termName);
searchParameters.setIncludeSubClassifications(true);
searchParameters.setIncludeSubTypes(true);
searchParameters.setIncludeClassificationAttributes(true);
searchParameters.setExcludeDeletedEntities(excludeDeletedEntities);
searchParameters.setAttributes(returnColumnsParams);
return callAPI(API_V2.FACETED_SEARCH, String.class, gson.toJson(searchParameters),new MultivaluedMapImpl());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ public List<AtlasEntityHeader> searchHiveTable0(String classification, String qu
//atlasClient.createAtlasTypeDefs()
return atlasSearchResult.getEntities();
}

/**
* 创建子类型
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

package com.webank.wedatasphere.dss.data.governance.conf;

import org.apache.linkis.common.conf.CommonVars;

import org.apache.linkis.common.conf.CommonVars;

public interface GovernanceConf {
CommonVars<String> ATLAS_REST_ADDRESS = CommonVars.apply("atlas.rest.address");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.webank.wedatasphere.dss.data.governance.dao;


import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.webank.wedatasphere.dss.data.governance.entity.PartInfo;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;

public interface PartInfoMapper extends BaseMapper<PartInfo> {

String sql = "select b.PART_NAME,b.CREATE_TIME,\n" +
" MAX(CASE c.PARAM_KEY WHEN 'transient_lastDdlTime' THEN c.PARAM_VALUE ELSE null END) last_access_time ,\n" +
" MAX(CASE c.PARAM_KEY WHEN 'numRows' THEN c.PARAM_VALUE ELSE null END) reord_cnt,\n" +
" MAX(CASE c.PARAM_KEY WHEN 'totalSize' THEN c.PARAM_VALUE ELSE null END) store,\n" +
" MAX(CASE c.PARAM_KEY WHEN 'numFiles' THEN c.PARAM_VALUE ELSE null END) file_count\n" +
" from TBLS a,PARTITIONS b,PARTITION_PARAMS c,DBS d \n" +
" where a.TBL_NAME= #{tableName} AND d.NAME= #{dbName} AND a.TBL_ID=b.TBL_ID AND a.DB_ID=d.DB_ID AND b.PART_ID=c.PART_ID \n" +
" GROUP BY c.PART_ID";
@Select(sql)
List<PartInfo> query(@Param("dbName") String dbName, @Param("tableName") String tableName) ;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.webank.wedatasphere.dss.data.governance.dao;

import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.toolkit.Constants;
import com.webank.wedatasphere.dss.data.governance.entity.TableInfo;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;

public interface TableStorageInfoMapper extends BaseMapper<TableInfo> {

//普通表获取表名和存储量
String sql= "select CONCAT(DBS.NAME, '.', TBLS.TBL_NAME) AS table_name, CASE WHEN TABLE_PARAMS.PARAM_KEY = 'totalSize' THEN TABLE_PARAMS.PARAM_VALUE ELSE 0 END as totalSize\n" +
"from DBS, TBLS,TABLE_PARAMS\n" +
"where TBLS.TBL_ID=TABLE_PARAMS.TBL_ID AND TBLS.DB_ID=DBS.DB_ID AND TABLE_PARAMS.PARAM_KEY = 'totalSize'";

//分区表获取表名和存储量
String sql2="select CONCAT(DBS.NAME, '.', TBLS.TBL_NAME) AS table_name,SUM(CASE WHEN PARTITION_PARAMS.PARAM_KEY = 'totalSize' THEN PARTITION_PARAMS.PARAM_VALUE ELSE 0 END) as totalSize\n" +
" from DBS,TBLS,PARTITIONS ,PARTITION_PARAMS \n" +
" where DBS.DB_ID=TBLS.DB_ID AND TBLS.TBL_ID=PARTITIONS.TBL_ID AND PARTITIONS.PART_ID =PARTITION_PARAMS.PART_ID AND PARTITION_PARAMS.PARAM_KEY = 'totalSize'\n" +
" group by table_name";

//合并分区表和普通表查询结果,并根据"DB.Table"过滤
String queryWrapperSql = "select table_name as tableName,totalSize as storage FROM (( " + sql + ")" + " UNION " +"(" + sql2 +")) as q ${ew.customSqlSegment}";

//合并分区表和普通表查询结果
String querySql = "select table_name as tableName,totalSize as storage FROM (( " + sql + ")" + " UNION " +"(" + sql2 +")) as q";

@Select(querySql)
List<TableInfo> query();

@Select(queryWrapperSql)
List<TableInfo> queryByTableName(@Param(Constants.WRAPPER) Wrapper queryWrapper);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.webank.wedatasphere.dss.data.governance.dao;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.util.List;


@Mapper
public interface TableStorageMapper extends BaseMapper<Long> {

String sql="select SUM(PARAM_VALUE) as storage from TABLE_PARAMS WHERE PARAM_KEY='totalSize'";

String sql2="select SUM(PARAM_VALUE) as storage from PARTITION_PARAMS WHERE PARAM_KEY='totalSize'";

String querySql = "select sum(storage) as storage from((" + sql + ") union all (" + sql2 + ")) as q";

@Select(querySql)
List<Long> getTableStorage();

String commonSql="select TABLE_PARAMS.PARAM_VALUE as totalSize from DBS, TBLS,TABLE_PARAMS where TBLS.TBL_ID=TABLE_PARAMS.TBL_ID AND TBLS.DB_ID=DBS.DB_ID AND TABLE_PARAMS.PARAM_KEY='totalSize' AND DBS.NAME= #{dbName} AND TBLS.TBL_NAME=#{tableName}";
@Select(commonSql)
List<Long> getTableInfo(@Param("dbName") String dbName,@Param("tableName") String tableName);


String partitionSql="select SUM(PARTITION_PARAMS.PARAM_VALUE) as totalSize\n" +
"from DBS,TBLS,PARTITIONS ,PARTITION_PARAMS\n" +
"where DBS.DB_ID=TBLS.DB_ID AND TBLS.TBL_ID=PARTITIONS.TBL_ID AND PARTITIONS.PART_ID =PARTITION_PARAMS.PART_ID AND PARTITION_PARAMS.PARAM_KEY='totalSize' AND DBS.NAME= #{dbName} AND TBLS.TBL_NAME= #{tableName}\n" +
"group by TBLS.TBL_NAME";

@Select(partitionSql)
List<Long> getPartTableInfo(@Param("dbName") String dbName,@Param("tableName") String tableName);
}
Loading

0 comments on commit c91887a

Please sign in to comment.