diff --git a/assembly/pom.xml b/assembly/pom.xml index 29934071c0..3aa0e2d9a4 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 @@ -103,7 +103,7 @@ com.fasterxml.jackson.core jackson-core - 2.9.6 + 2.10.0 net.databinder.dispatch diff --git a/bin/checkServices.sh b/bin/checkServices.sh index 2b45c3e666..72df04be43 100644 --- a/bin/checkServices.sh +++ b/bin/checkServices.sh @@ -31,7 +31,7 @@ MICRO_SERVICE_PORT=$3 local_host="`hostname --fqdn`" -ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') function isLocal(){ if [ "$1" == "127.0.0.1" ];then diff --git a/bin/install.sh b/bin/install.sh index 7b9c85ddf1..7a0b46db6d 100644 --- a/bin/install.sh +++ b/bin/install.sh @@ -104,7 +104,7 @@ source ${DISTRIBUTION} isSuccess "load config" local_host="`hostname --fqdn`" -ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') function isLocal(){ if [ "$1" == "127.0.0.1" ];then diff --git a/bin/start-all.sh b/bin/start-all.sh index b61af2ff0b..98a07f6bdc 100644 --- a/bin/start-all.sh +++ b/bin/start-all.sh @@ -43,7 +43,7 @@ fi } local_host="`hostname --fqdn`" -ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') function isLocal(){ if [ "$1" == "127.0.0.1" ];then diff --git a/bin/stop-all.sh b/bin/stop-all.sh index c707085a39..838b9babc9 100644 --- a/bin/stop-all.sh +++ b/bin/stop-all.sh @@ -34,7 +34,7 @@ export DISTRIBUTION=${DISTRIBUTION:-"${CONF_DIR}/config.sh"} source ${DISTRIBUTION} local_host="`hostname --fqdn`" -ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') function isSuccess(){ if [ $? -ne 0 ]; then diff --git a/conf/config.sh b/conf/config.sh index dea8a60d7b..c6d910aca9 100644 --- a/conf/config.sh +++ b/conf/config.sh @@ -77,4 +77,4 @@ AZKABAN_ADRESS_PORT=8081 QUALITIS_ADRESS_IP=127.0.0.1 QUALITIS_ADRESS_PORT=8090 -DSS_VERSION=0.7.0 +DSS_VERSION=0.9.0 diff --git a/datachecker-appjoint/pom.xml b/datachecker-appjoint/pom.xml index c2d804486c..9d36a02ae5 100644 --- a/datachecker-appjoint/pom.xml +++ b/datachecker-appjoint/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/datachecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/connector/DataCheckerDao.java b/datachecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/connector/DataCheckerDao.java index d3caeac010..d84a0d4bc6 100644 --- a/datachecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/connector/DataCheckerDao.java +++ b/datachecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/connector/DataCheckerDao.java @@ -158,8 +158,9 @@ private long getTotalCount(Map proObjectMap, Connection conn, L private PreparedStatement getStatement(Connection conn, String dataObject) throws SQLException { String dataScape = dataObject.contains("{") ? "Partition" : "Table"; - String dbName = dataObject.split("\\.")[0]; - String tableName = dataObject.split("\\.")[1]; + String[] dataObjectArray = dataObject.split("\\."); + String dbName = dataObjectArray[0]; + String tableName = dataObjectArray[1]; if(dataScape.equals("Partition")) { Pattern pattern = Pattern.compile("\\{([^\\}]+)\\}"); Matcher matcher = pattern.matcher(dataObject); @@ -174,11 +175,13 @@ private PreparedStatement getStatement(Connection conn, String dataObject) throw pstmt.setString(2, tableName); pstmt.setString(3, partitionName); return pstmt; - } else { + } else if(dataObjectArray.length == 2){ PreparedStatement pstmt = conn.prepareCall(SQL_SOURCE_TYPE_JOB_TABLE); pstmt.setString(1, dbName); pstmt.setString(2, tableName); return pstmt; + }else { + throw new SQLException("Incorrect input format for dataObject "+ dataObject); } } diff --git a/docs/en_US/ch1/DataSphereStudio_Compile_Manual.md b/docs/en_US/ch1/DataSphereStudio_Compile_Manual.md index 6a24df612d..598ff158b8 100644 --- a/docs/en_US/ch1/DataSphereStudio_Compile_Manual.md +++ b/docs/en_US/ch1/DataSphereStudio_Compile_Manual.md @@ -6,8 +6,8 @@ ```xml - 0.7.0 - 0.9.3 + 0.9.0 + 0.9.4 2.11.8 1.8 3.3.3 diff --git a/docs/en_US/ch2/Azkaban_LinkisJobType_Deployment_Manual.md b/docs/en_US/ch2/Azkaban_LinkisJobType_Deployment_Manual.md index d70a87eedc..33cb3808cb 100644 --- a/docs/en_US/ch2/Azkaban_LinkisJobType_Deployment_Manual.md +++ b/docs/en_US/ch2/Azkaban_LinkisJobType_Deployment_Manual.md @@ -2,7 +2,7 @@ ## 1. Ready work -1.Click [release](https://github.com/WeBankFinTech/DataSphereStudio/releases/download/0.7.0/linkis-jobtype-0.7.0.zip) to select the corresponding installation package to download: +1.Click [release](https://github.com/WeBankFinTech/DataSphereStudio/releases/download/0.8.0/linkis-jobtype-0.8.0.zip) to select the corresponding installation package to download: - linkis-jobtype-$version.zip diff --git a/docs/en_US/ch2/DSS Quick Installation Guide.md b/docs/en_US/ch2/DSS Quick Installation Guide.md index f4b8cd1595..bc36886351 100644 --- a/docs/en_US/ch2/DSS Quick Installation Guide.md +++ b/docs/en_US/ch2/DSS Quick Installation Guide.md @@ -17,7 +17,7 @@ DSS also implements the integration of many external systems, such as [Qualitis] DSS environment configuration can be divided into three steps, including basic software installation, backend environment configuration, and frontend environment configuration. The details are as below: ### 2.1 Frontend and backend basic software installation -Linkis standard version (above 0.9.3). How to install [Linkis](https://github.com/WeBankFinTech/Linkis/blob/master/docs/en_US/ch1/deploy.md) +Linkis standard version (above 0.9.4). How to install [Linkis](https://github.com/WeBankFinTech/Linkis/blob/master/docs/en_US/ch1/deploy.md) JDK (above 1.8.0_141). How to install [JDK](https://www.runoob.com/java/java-environment-setup.html) @@ -103,7 +103,7 @@ dss_port="8088" linkis_url="http://127.0.0.1:9001" # dss ip address -dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') ``` The environment is ready, click me to enter ****[4. Installation and use](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/en_US/ch2/DSS%20Quick%20Installation%20Guide.md#four-installation-and-use)** @@ -111,7 +111,7 @@ The environment is ready, click me to enter ****[4. Installation and use](https: ## Three Standard DSS environment configuration preparation The standard DSS environment preparation is also divided into three parts, the frontEnd-end and back-end basic software installation, back-end environment preparation, and frontEnd-end environment preparation. The details are as follows: ### 3.1 frontEnd and BackEnd basic software installation -Linkis standard version (above 0.9.3), [How to install Linkis](https://github.com/WeBankFinTech/Linkis/blob/master/docs/en_US/ch1/deploy.md) +Linkis standard version (above 0.9.4), [How to install Linkis](https://github.com/WeBankFinTech/Linkis/blob/master/docs/en_US/ch1/deploy.md) JDK (above 1.8.0_141), How to install [JDK](https://www.runoob.com/java/java-environment-setup.html) @@ -219,7 +219,7 @@ dss_port="8088" linkis_url="http://127.0.0.1:9001" # dss ip address -dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') ``` The environment is ready, click me to enter **[Four Installation and use](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/en_US/ch2/DSS%20Quick%20Installation%20Guide.md#four-installation-and-use)** diff --git "a/docs/zh_CN/ch1/DSS\347\274\226\350\257\221\346\226\207\346\241\243.md" "b/docs/zh_CN/ch1/DSS\347\274\226\350\257\221\346\226\207\346\241\243.md" index 0b4d82f0b8..6ac3ece635 100644 --- "a/docs/zh_CN/ch1/DSS\347\274\226\350\257\221\346\226\207\346\241\243.md" +++ "b/docs/zh_CN/ch1/DSS\347\274\226\350\257\221\346\226\207\346\241\243.md" @@ -6,8 +6,8 @@ ```xml - 0.7.0 - 0.9.3 + 0.9.0 + 0.9.4 2.11.8 1.8 3.3.3 diff --git "a/docs/zh_CN/ch2/DSS\345\277\253\351\200\237\345\256\211\350\243\205\344\275\277\347\224\250\346\226\207\346\241\243.md" "b/docs/zh_CN/ch2/DSS\345\277\253\351\200\237\345\256\211\350\243\205\344\275\277\347\224\250\346\226\207\346\241\243.md" index e775e8380e..a8fda0a9b5 100644 --- "a/docs/zh_CN/ch2/DSS\345\277\253\351\200\237\345\256\211\350\243\205\344\275\277\347\224\250\346\226\207\346\241\243.md" +++ "b/docs/zh_CN/ch2/DSS\345\277\253\351\200\237\345\256\211\350\243\205\344\275\277\347\224\250\346\226\207\346\241\243.md" @@ -32,7 +32,7 @@ ## 二、精简版DSS环境配置准备 DSS环境配置准备分为三部分,前后端基础软件安装、后端环境配置准备和前端环配置境准备,详细介绍如下: ### 2.1 前后端基础软件安装 -Linkis精简版(0.9.3及以上),[如何安装Linkis](https://github.com/WeBankFinTech/Linkis/wiki/%E5%A6%82%E4%BD%95%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8Linkis) +Linkis精简版(0.9.4及以上),[如何安装Linkis](https://github.com/WeBankFinTech/Linkis/wiki/%E5%A6%82%E4%BD%95%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8Linkis) JDK (1.8.0_141以上),[如何安装JDK](https://www.runoob.com/java/java-environment-setup.html) @@ -132,7 +132,7 @@ dss_port="8088" linkis_url="http://127.0.0.1:9001" # dss ip address -dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') ``` 环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8) @@ -143,7 +143,7 @@ dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/ ## 三、简单版DSS环境配置准备 DSS环境配置准备分为三部分,前后端基础软件安装、后端环境配置准备和前端环配置境准备,详细介绍如下: ### 3.1 前后端基础软件安装 -Linkis简单版(0.9.3及以上),[如何安装Linkis](https://github.com/WeBankFinTech/Linkis/wiki/%E5%A6%82%E4%BD%95%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8Linkis) +Linkis简单版(0.9.4及以上),[如何安装Linkis](https://github.com/WeBankFinTech/Linkis/wiki/%E5%A6%82%E4%BD%95%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8Linkis) JDK (1.8.0_141以上),[如何安装JDK](https://www.runoob.com/java/java-environment-setup.html) @@ -243,7 +243,7 @@ dss_port="8088" linkis_url="http://127.0.0.1:9001" # dss ip address -dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') ``` 环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8) @@ -251,7 +251,7 @@ dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/ ## 四、标准版DSS环境配置准备 标准版DSS环境准备也分为三部分,前后端基础软件安装、后端环境准备和前端环境准备,详细介绍如下: ### 4.1 前后端基础软件安装 -Linkis标准版(0.9.3及以上),[如何安装Linkis](https://github.com/WeBankFinTech/Linkis/wiki/%E5%A6%82%E4%BD%95%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8Linkis) +Linkis标准版(0.9.4及以上),[如何安装Linkis](https://github.com/WeBankFinTech/Linkis/wiki/%E5%A6%82%E4%BD%95%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8Linkis) JDK (1.8.0_141以上),[如何安装JDK](https://www.runoob.com/java/java-environment-setup.html) @@ -365,7 +365,7 @@ dss_port="8088" linkis_url="http://127.0.0.1:9001" # dss ip address -dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +dss_ipaddr=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') ``` 环境准备完毕,点我进入 [五、安装和使用](https://github.com/WeBankFinTech/DataSphereStudio/blob/master/docs/zh_CN/ch2/DSS%E5%BF%AB%E9%80%9F%E5%AE%89%E8%A3%85%E4%BD%BF%E7%94%A8%E6%96%87%E6%A1%A3.md#%E4%BA%94%E5%AE%89%E8%A3%85%E5%92%8C%E4%BD%BF%E7%94%A8) diff --git a/dss-appjoint-auth/pom.xml b/dss-appjoint-auth/pom.xml index 595ad36f0b..941a120d1c 100644 --- a/dss-appjoint-auth/pom.xml +++ b/dss-appjoint-auth/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 @@ -38,6 +38,11 @@ linkis-common ${linkis.version} + + javax.servlet + javax.servlet-api + 3.1.0 + diff --git a/dss-appjoint-auth/src/main/scala/com/webank/wedatasphere/dss/appjoint/auth/impl/AppJointAuthImpl.scala b/dss-appjoint-auth/src/main/scala/com/webank/wedatasphere/dss/appjoint/auth/impl/AppJointAuthImpl.scala index 7413baf5f6..15dd7e3072 100644 --- a/dss-appjoint-auth/src/main/scala/com/webank/wedatasphere/dss/appjoint/auth/impl/AppJointAuthImpl.scala +++ b/dss-appjoint-auth/src/main/scala/com/webank/wedatasphere/dss/appjoint/auth/impl/AppJointAuthImpl.scala @@ -24,8 +24,9 @@ import com.webank.wedatasphere.dss.appjoint.auth.{AppJointAuth, RedirectMsg} import com.webank.wedatasphere.linkis.common.utils.Logging import com.webank.wedatasphere.linkis.httpclient.dws.DWSHttpClient import com.webank.wedatasphere.linkis.httpclient.dws.config.DWSClientConfigBuilder -import javax.servlet.http.{Cookie, HttpServletRequest} +import javax.servlet.http.HttpServletRequest import org.apache.commons.io.IOUtils +import org.apache.http.impl.cookie.BasicClientCookie import scala.collection.JavaConversions._ @@ -68,7 +69,7 @@ class AppJointAuthImpl private() extends AppJointAuth with Logging { val index = cookie.indexOf("=") val key = cookie.substring(0, index).trim val value = cookie.substring(index + 1).trim - userInfoAction.addCookie(new Cookie(key, value)) + userInfoAction.addCookie(new BasicClientCookie(key, value)) } val redirectMsg = new RedirectMsgImpl redirectMsg.setRedirectUrl(request.getParameter(AppJointAuthImpl.REDIRECT_KEY)) diff --git a/dss-appjoint-core/pom.xml b/dss-appjoint-core/pom.xml index 0c00f2cd7f..6ee707c4dd 100644 --- a/dss-appjoint-core/pom.xml +++ b/dss-appjoint-core/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 @@ -50,6 +50,12 @@ dss-common ${dss.version} + + + com.webank.wedatasphere.linkis + linkis-httpclient + ${linkis.version} + diff --git a/dss-appjoint-core/src/main/scala/com/webank/wedatasphere/dss/appjoint/execution/scheduler/ListenerEventBusNodeExecutionScheduler.scala b/dss-appjoint-core/src/main/scala/com/webank/wedatasphere/dss/appjoint/execution/scheduler/ListenerEventBusNodeExecutionScheduler.scala index a355270ccb..51d167eaff 100644 --- a/dss-appjoint-core/src/main/scala/com/webank/wedatasphere/dss/appjoint/execution/scheduler/ListenerEventBusNodeExecutionScheduler.scala +++ b/dss-appjoint-core/src/main/scala/com/webank/wedatasphere/dss/appjoint/execution/scheduler/ListenerEventBusNodeExecutionScheduler.scala @@ -17,6 +17,8 @@ package com.webank.wedatasphere.dss.appjoint.execution.scheduler +import java.util.concurrent.ArrayBlockingQueue + import com.webank.wedatasphere.dss.appjoint.exception.AppJointErrorException import com.webank.wedatasphere.dss.appjoint.execution.common.{AsyncNodeExecutionResponse, CompletedNodeExecutionResponse, LongTermNodeExecutionAction} import com.webank.wedatasphere.dss.appjoint.execution.conf.NodeExecutionConfiguration._ @@ -55,7 +57,7 @@ class ListenerEventBusNodeExecutionScheduler(eventQueueCapacity: Int, name: Stri val field1 = ru.typeOf[ListenerEventBus[_, _]].decl(ru.TermName("eventQueue")).asMethod val result = listenerEventBusClass.reflectMethod(field1) result() match { - case queue: BlockingLoopArray[AsyncNodeExecutionResponseEvent] => queue + case queue: ArrayBlockingQueue[AsyncNodeExecutionResponseEvent] => queue } } @@ -104,18 +106,18 @@ class ListenerEventBusNodeExecutionScheduler(eventQueueCapacity: Int, name: Stri protected def addEvent(event: AsyncNodeExecutionResponseEvent): Unit = synchronized { listenerEventBus.post(event) - event.getResponse.getAction match { - case longTermAction: LongTermNodeExecutionAction => - longTermAction.setSchedulerId(eventQueue.max) - case _ => - } +// event.getResponse.getAction match { +// case longTermAction: LongTermNodeExecutionAction => +// longTermAction.setSchedulerId(eventQueue.max) +// case _ => +// } } - override def removeAsyncResponse(action: LongTermNodeExecutionAction): Unit = - getAsyncResponse(action).setCompleted(true) + override def removeAsyncResponse(action: LongTermNodeExecutionAction): Unit = { + + } - override def getAsyncResponse(action: LongTermNodeExecutionAction): AsyncNodeExecutionResponse = - eventQueue.get(action.getSchedulerId).getResponse + override def getAsyncResponse(action: LongTermNodeExecutionAction): AsyncNodeExecutionResponse = null override def start(): Unit = listenerEventBus.start() diff --git a/dss-appjoint-loader/pom.xml b/dss-appjoint-loader/pom.xml index 7dc9097f34..b36ab5d799 100644 --- a/dss-appjoint-loader/pom.xml +++ b/dss-appjoint-loader/pom.xml @@ -22,12 +22,12 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 dss-appjoint-loader - 0.7.0 + 0.9.0 diff --git a/dss-application/pom.xml b/dss-application/pom.xml index 1a55a80350..54583b1239 100644 --- a/dss-application/pom.xml +++ b/dss-application/pom.xml @@ -23,7 +23,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 dss-application diff --git a/dss-azkaban-scheduler-appjoint/pom.xml b/dss-azkaban-scheduler-appjoint/pom.xml index a6d8945341..f7a82d5557 100644 --- a/dss-azkaban-scheduler-appjoint/pom.xml +++ b/dss-azkaban-scheduler-appjoint/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/dss-azkaban-scheduler-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/scheduler/azkaban/service/AzkabanProjectService.java b/dss-azkaban-scheduler-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/scheduler/azkaban/service/AzkabanProjectService.java index 2f2f98e4d0..eb536b14b7 100644 --- a/dss-azkaban-scheduler-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/scheduler/azkaban/service/AzkabanProjectService.java +++ b/dss-azkaban-scheduler-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/scheduler/azkaban/service/AzkabanProjectService.java @@ -80,11 +80,11 @@ public Project createProject(Project project, Session session) throws AppJointEr params.add(new BasicNameValuePair("name", project.getName())); params.add(new BasicNameValuePair("description", project.getDescription())); HttpPost httpPost = new HttpPost(projectUrl); - httpPost.addHeader(HTTP.CONTENT_ENCODING, "UTF-8"); + httpPost.addHeader(HTTP.CONTENT_ENCODING, HTTP.IDENTITY_CODING); CookieStore cookieStore = new BasicCookieStore(); cookieStore.addCookie(session.getCookies()[0]); - HttpEntity entity = EntityBuilder.create().setContentEncoding("UTF-8"). - setContentType(ContentType.create("application/x-www-form-urlencoded", Consts.UTF_8)) + HttpEntity entity = EntityBuilder.create() + .setContentType(ContentType.create("application/x-www-form-urlencoded", Consts.UTF_8)) .setParameters(params).build(); httpPost.setEntity(entity); CloseableHttpClient httpClient = null; diff --git a/dss-common/pom.xml b/dss-common/pom.xml index bbf896bfa5..fb77c4dad1 100644 --- a/dss-common/pom.xml +++ b/dss-common/pom.xml @@ -23,7 +23,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 dss-common diff --git a/dss-flow-execution-entrance/pom.xml b/dss-flow-execution-entrance/pom.xml index 97d12b24be..7e152ced39 100644 --- a/dss-flow-execution-entrance/pom.xml +++ b/dss-flow-execution-entrance/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/dss-flow-execution-entrance/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowExecutionRestfulApi.java b/dss-flow-execution-entrance/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowExecutionRestfulApi.java index 807de60299..8ad6dcb51f 100644 --- a/dss-flow-execution-entrance/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowExecutionRestfulApi.java +++ b/dss-flow-execution-entrance/src/main/java/com/webank/wedatasphere/dss/flow/execution/entrance/restful/FlowExecutionRestfulApi.java @@ -70,6 +70,12 @@ public Response execution(@PathParam("id") String id) { message = Message.ok("Successfully get job execution info"); message.setMethod("/api/entrance/" + id + "/execution"); message.setStatus(0); + long nowTime = System.currentTimeMillis(); + flowEntranceJob.getFlowContext().getRunningNodes().forEach((k, v) -> { + if (v != null) { + v.setNowTime(nowTime); + } + }); message.data("runningJobs", FlowContext$.MODULE$.convertView(flowEntranceJob.getFlowContext().getRunningNodes())); List> pendingList = FlowContext$.MODULE$.convertView(flowEntranceJob.getFlowContext().getPendingNodes()); pendingList.addAll(FlowContext$.MODULE$.convertView(flowEntranceJob.getFlowContext().getSkippedNodes())); diff --git a/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/DefaultNodeRunner.scala b/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/DefaultNodeRunner.scala index ebed133096..fbd13c8493 100644 --- a/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/DefaultNodeRunner.scala +++ b/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/DefaultNodeRunner.scala @@ -49,6 +49,8 @@ class DefaultNodeRunner extends NodeRunner with Logging { private var startTime: Long = _ + private var nowTime:Long = _ + private var lastGetStatusTime: Long = 0 override def getNode: SchedulerNode = this.node @@ -156,4 +158,7 @@ class DefaultNodeRunner extends NodeRunner with Logging { override def setStartTime(startTime: Long): Unit = this.startTime = startTime + override def getNowTime(): Long = this.nowTime + + override def setNowTime(nowTime: Long): Unit = this.nowTime = nowTime } diff --git a/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/NodeRunner.scala b/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/NodeRunner.scala index 5e090fe29e..f0a40fd57a 100644 --- a/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/NodeRunner.scala +++ b/dss-flow-execution-entrance/src/main/scala/com/webank/wedatasphere/dss/flow/execution/entrance/node/NodeRunner.scala @@ -75,6 +75,10 @@ abstract class NodeRunner extends Runnable with Logging{ def setStartTime(startTime: Long): Unit + def getNowTime():Long + + def setNowTime(nowTime: Long):Unit + protected def transitionState(toState: NodeExecutionState): Unit = Utils.tryAndWarn{ if (getStatus == toState) return info(s"from state $getStatus to $toState") diff --git a/dss-linkis-node-execution/pom.xml b/dss-linkis-node-execution/pom.xml index b98c33987a..0e8eb4e823 100644 --- a/dss-linkis-node-execution/pom.xml +++ b/dss-linkis-node-execution/pom.xml @@ -24,7 +24,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 dss-linkis-node-execution @@ -33,13 +33,13 @@ com.webank.wedatasphere.linkis linkis-ujes-client - 0.9.3 + ${linkis.version} com.webank.wedatasphere.linkis linkis-workspace-httpclient - 0.9.3 + ${linkis.version} diff --git a/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/JobParamsParser.java b/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/JobParamsParser.java index 5c9e013c6b..27174fbcc8 100644 --- a/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/JobParamsParser.java +++ b/dss-linkis-node-execution/src/main/java/com/webank/wedatasphere/dss/linkis/node/execution/parser/JobParamsParser.java @@ -24,8 +24,10 @@ import com.webank.wedatasphere.dss.linkis.node.execution.job.Job; import com.webank.wedatasphere.dss.linkis.node.execution.job.SignalSharedJob; import com.webank.wedatasphere.dss.linkis.node.execution.utils.LinkisJobExecutionUtils; - +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -33,7 +35,7 @@ * Created by johnnwang on 2019/11/3. */ public class JobParamsParser implements JobParser { - + private static final Logger LOGGER = LoggerFactory.getLogger(JobParamsParser.class); private JobSignalKeyCreator signalKeyCreator; public JobSignalKeyCreator getSignalKeyCreator() { @@ -61,8 +63,12 @@ public void parseJob(Job job) throws Exception { if (sharedValue != null) { Collection values = sharedValue.values(); for(Object value : values){ - Map variableMap = LinkisJobExecutionUtils.gson.fromJson(value.toString(), new TypeToken>() {}.getType()); - putParamsMap(job.getParams(), "variable", variableMap); + List> list = LinkisJobExecutionUtils.gson.fromJson(value.toString(), List.class); + Map totalMap = new HashMap<>(); + for (Map kv : list) { + totalMap.putAll(kv); + } + putParamsMap(job.getParams(), "variable", totalMap); } } // put configuration diff --git a/dss-scheduler-appjoint-core/pom.xml b/dss-scheduler-appjoint-core/pom.xml index b09213fc83..979a90903e 100644 --- a/dss-scheduler-appjoint-core/pom.xml +++ b/dss-scheduler-appjoint-core/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/dss-server/pom.xml b/dss-server/pom.xml index b0627677bc..c3ec0ed388 100644 --- a/dss-server/pom.xml +++ b/dss-server/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 @@ -117,7 +117,7 @@ com.webank.wedatasphere.linkis - 0.9.3 + ${linkis.version} @@ -145,7 +145,7 @@ com.webank.wedatasphere.dss dss-scheduler-appjoint-core - 0.7.0 + 0.9.0 diff --git a/dss-server/src/main/assembly/distribution.xml b/dss-server/src/main/assembly/distribution.xml index 3defc761b6..0c7f789ee4 100644 --- a/dss-server/src/main/assembly/distribution.xml +++ b/dss-server/src/main/assembly/distribution.xml @@ -82,7 +82,6 @@ com.netflix.ribbon:ribbon-loadbalancer:jar com.netflix.ribbon:ribbon-transport:jar com.netflix.servo:servo-core:jar - com.ning:async-http-client:jar com.sun.jersey.contribs:jersey-apache-client4:jar com.sun.jersey:jersey-client:jar com.sun.jersey:jersey-core:jar @@ -137,8 +136,6 @@ joda-time:joda-time:jar log4j:log4j:jar mysql:mysql-connector-java:jar - net.databinder.dispatch:dispatch-core_2.11:jar - net.databinder.dispatch:dispatch-json4s-jackson_2.11:jar org.antlr:antlr-runtime:jar org.antlr:stringtemplate:jar org.apache.commons:commons-compress:jar diff --git a/eventchecker-appjoint/pom.xml b/eventchecker-appjoint/pom.xml index 8c38e47474..3ae6e8f70b 100644 --- a/eventchecker-appjoint/pom.xml +++ b/eventchecker-appjoint/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/eventchecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/service/AbstractEventCheckReceiver.java b/eventchecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/service/AbstractEventCheckReceiver.java index 4f0a85e07f..fbd1c887ae 100644 --- a/eventchecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/service/AbstractEventCheckReceiver.java +++ b/eventchecker-appjoint/src/main/java/com/webank/wedatasphere/dss/appjoint/schedulis/jobtype/service/AbstractEventCheckReceiver.java @@ -80,39 +80,62 @@ boolean updateMsgOffset(int jobId, Properties props, Logger log, String[] consum boolean result = false; String vNewMsgID = "-1"; PreparedStatement updatePstmt = null; + PreparedStatement pstmtForGetID = null; Connection msgConn = null; vNewMsgID = setConsumedMsg(props,log,consumedMsgInfo); try { if(StringUtils.isNotEmpty(vNewMsgID) && StringUtils.isNotBlank(vNewMsgID) && !"-1".equals(vNewMsgID)){ msgConn = getEventCheckerConnection(props,log); if(msgConn == null) return false; - int vProcessID = jobId; - String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");; - String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END"; - log.info("last message offset {} is:" + lastMsgId); - updatePstmt = msgConn.prepareCall(sqlForUpdateMsg); - updatePstmt.setString(1, receiver); - updatePstmt.setString(2, topic); - updatePstmt.setString(3, msgName); - updatePstmt.setString(4, vReceiveTime); - updatePstmt.setString(5, vNewMsgID); - int updaters = updatePstmt.executeUpdate(); - log.info("updateMsgOffset successful {} update result is:" + updaters); - if(updaters != 0){ - log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID); - //return true after update success - result = true; + msgConn.setAutoCommit(false); + String sqlForReadMsgID = "SELECT msg_id FROM event_status WHERE receiver=? AND topic=? AND msg_name=? for update"; + pstmtForGetID = msgConn.prepareCall(sqlForReadMsgID); + pstmtForGetID.setString(1, receiver); + pstmtForGetID.setString(2, topic); + pstmtForGetID.setString(3, msgName); + ResultSet rs = pstmtForGetID.executeQuery(); + String nowLastMsgId = rs.last()==true ? rs.getString("msg_id"):"0"; + log.info("receive message successfully , Now check to see if the latest offset has changed ,nowLastMsgId is {} " + nowLastMsgId); + if("0".equals(nowLastMsgId) || nowLastMsgId.equals(lastMsgId)){ + + int vProcessID = jobId; + String vReceiveTime = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss");; + String sqlForUpdateMsg = "INSERT INTO event_status(receiver,topic,msg_name,receive_time,msg_id) VALUES(?,?,?,?,?) ON DUPLICATE KEY UPDATE receive_time=VALUES(receive_time),msg_id= CASE WHEN msg_id= " + lastMsgId + " THEN VALUES(msg_id) ELSE msg_id END"; + log.info("last message offset {} is:" + lastMsgId); + updatePstmt = msgConn.prepareCall(sqlForUpdateMsg); + updatePstmt.setString(1, receiver); + updatePstmt.setString(2, topic); + updatePstmt.setString(3, msgName); + updatePstmt.setString(4, vReceiveTime); + updatePstmt.setString(5, vNewMsgID); + int updaters = updatePstmt.executeUpdate(); + log.info("updateMsgOffset successful {} update result is:" + updaters); + if(updaters != 0){ + log.info("Received message successfully , update message status succeeded, consumed flow execution ID: " + vProcessID); + //return true after update success + result = true; + }else{ + log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID); + result = false; + } }else{ - log.info("Received message successfully , update message status failed, consumed flow execution ID: " + vProcessID); + log.info("the latest offset has changed , Keep waiting for the signal"); result = false; } + msgConn.commit(); }else{ result = false; } }catch (SQLException e){ log.error("Error update Msg Offset" + e); + try { + msgConn.rollback(); + } catch (SQLException ex) { + log.error("transaction rollback failed " + e); + } return false; }finally { + closeQueryStmt(pstmtForGetID, log); closeQueryStmt(updatePstmt, log); closeConnection(msgConn, log); } diff --git a/plugins/azkaban/linkis-jobtype/pom.xml b/plugins/azkaban/linkis-jobtype/pom.xml index 56225d510a..c1c697d8ca 100644 --- a/plugins/azkaban/linkis-jobtype/pom.xml +++ b/plugins/azkaban/linkis-jobtype/pom.xml @@ -23,7 +23,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 com.webank.wedatasphere.dss linkis-jobtype diff --git a/plugins/linkis/linkis-appjoint-entrance/pom.xml b/plugins/linkis/linkis-appjoint-entrance/pom.xml index 0651c396ef..12a064427e 100644 --- a/plugins/linkis/linkis-appjoint-entrance/pom.xml +++ b/plugins/linkis/linkis-appjoint-entrance/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/plugins/linkis/linkis-appjoint-entrance/src/main/scala/com/webank/wedatasphere/dss/linkis/appjoint/entrance/execute/AppJointEntranceEngine.scala b/plugins/linkis/linkis-appjoint-entrance/src/main/scala/com/webank/wedatasphere/dss/linkis/appjoint/entrance/execute/AppJointEntranceEngine.scala index 26e4305d49..e16c9b93e5 100644 --- a/plugins/linkis/linkis-appjoint-entrance/src/main/scala/com/webank/wedatasphere/dss/linkis/appjoint/entrance/execute/AppJointEntranceEngine.scala +++ b/plugins/linkis/linkis-appjoint-entrance/src/main/scala/com/webank/wedatasphere/dss/linkis/appjoint/entrance/execute/AppJointEntranceEngine.scala @@ -31,6 +31,7 @@ import com.webank.wedatasphere.dss.linkis.appjoint.entrance.job.AppJointExecuteR import com.webank.wedatasphere.linkis.common.exception.ErrorException import com.webank.wedatasphere.linkis.common.utils.{Logging, Utils} import com.webank.wedatasphere.linkis.entrance.execute.{EngineExecuteAsynReturn, EntranceEngine, EntranceJob} +import com.webank.wedatasphere.linkis.entrance.interceptor.impl.CustomVariableUtils import com.webank.wedatasphere.linkis.protocol.engine.{JobProgressInfo, RequestTask} import com.webank.wedatasphere.linkis.protocol.query.RequestPersistTask import com.webank.wedatasphere.linkis.scheduler.executer._ @@ -156,6 +157,9 @@ class AppJointEntranceEngine(properties: util.Map[String, Any]) val nodeType = nodeContext.getAppJointNode.getNodeType val realAppJointType = if (nodeType.contains(".")) nodeType.substring(0, nodeType.indexOf(".")) else nodeType val appJoint = AppJointManager.getAppJoint(realAppJointType) + if((realAppJointType.toLowerCase()).contains("datacheck")){ + replaceCustomVariables(nodeContext.getRuntimeMap) + } val user = if (null != runTimeMap.get("user")) runTimeMap.get("user").toString else null val session = if (StringUtils.isNotEmpty(user)){ if (appJoint.getSecurityService != null) appJoint.getSecurityService.login(user) else null @@ -189,8 +193,19 @@ class AppJointEntranceEngine(properties: util.Map[String, Any]) ErrorExecuteResponse(s"cannot do this executeRequest $executeRequest", new ErrorException(80056, s"cannot do this executeRequest $executeRequest")) } + private def replaceCustomVariables(runTimeMap:java.util.Map[String, Object]):Unit = { + val key = "check.object" + val value:String = if (null != runTimeMap.get(key)) runTimeMap.get(key).toString else "" + val task = new RequestPersistTask + task.setExecutionCode(value) + task.setParams(new util.HashMap[String, Object]()) + val (result, code) = CustomVariableUtils.replaceCustomVar(task, "sql") + logger.info(s"after code replace code is $code") + if (result) runTimeMap(key) = code + } } + case class AppJointEntranceExecuteException(errMsg:String) extends ErrorException(70046, errMsg) class AppJointEntranceAsyncExecuteResponse extends AsynReturnExecuteResponse with Logging{ diff --git a/pom.xml b/pom.xml index 76737321d2..e80674603e 100644 --- a/pom.xml +++ b/pom.xml @@ -23,7 +23,7 @@ pom com.webank.wedatasphere.dss dss - 0.7.0 + 0.9.0 dss-common @@ -47,8 +47,8 @@ - 0.7.0 - 0.9.3 + 0.9.0 + 0.9.4 2.11.8 1.8 3.3.3 diff --git a/qualitis-appjoint/appjoint/pom.xml b/qualitis-appjoint/appjoint/pom.xml index d32ad4bb8a..280e567a85 100644 --- a/qualitis-appjoint/appjoint/pom.xml +++ b/qualitis-appjoint/appjoint/pom.xml @@ -5,7 +5,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/sendemail-appjoint/sendemail-core/pom.xml b/sendemail-appjoint/sendemail-core/pom.xml index 97c3c167dd..31440bd86c 100644 --- a/sendemail-appjoint/sendemail-core/pom.xml +++ b/sendemail-appjoint/sendemail-core/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 diff --git a/visualis-appjoint/appjoint/pom.xml b/visualis-appjoint/appjoint/pom.xml index ab48a6148b..11506add21 100644 --- a/visualis-appjoint/appjoint/pom.xml +++ b/visualis-appjoint/appjoint/pom.xml @@ -22,7 +22,7 @@ dss com.webank.wedatasphere.dss - 0.7.0 + 0.9.0 4.0.0 @@ -52,6 +52,12 @@ provided true + + + net.databinder.dispatch + dispatch-core_2.11 + 0.12.3 + diff --git a/web/config.sh b/web/config.sh index 5cebe73d0b..864e492715 100644 --- a/web/config.sh +++ b/web/config.sh @@ -5,4 +5,4 @@ dss_web_port="8088" linkis_gateway_url="http://localhost:9001" #dss nginx ip -dss_nginx_ip=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}') +dss_nginx_ip=$(ip addr | awk '/^[0-9]+: / {}; /inet.*global/ {print gensub(/(.*)\/(.*)/, "\\1", "g", $2)}'|awk 'NR==1') diff --git a/web/src/js/component/editor/editor.vue b/web/src/js/component/editor/editor.vue index dad5484c92..79ae4e6387 100644 --- a/web/src/js/component/editor/editor.vue +++ b/web/src/js/component/editor/editor.vue @@ -91,7 +91,7 @@ export default { if (newValue == this.getValue()) { return; } - let readOnly = this.editor.getRawOptions().readOnly; + let readOnly = this.currentConfig.readOnly; if (readOnly) { // editor.setValue 和 model.setValue 都会丢失撤销栈 this.editor.setValue(newValue); diff --git a/web/src/js/view/login/index.vue b/web/src/js/view/login/index.vue index 52c50362fd..7becae2204 100644 --- a/web/src/js/view/login/index.vue +++ b/web/src/js/view/login/index.vue @@ -134,7 +134,7 @@ export default { const params = { userName: this.loginForm.user, password: this.loginForm.password, - captcha: this.loginForm.captcha, + captcha: this.loginForm.captcha.toLocaleLowerCase(), }; api .fetch(`/user/login`, params)