diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java index 370d2be602..5d855c8dc1 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java @@ -283,10 +283,6 @@ public Message killEngine(HttpServletRequest req, @RequestBody Map engineParam : param) { String moduleName = engineParam.get("applicationName"); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java index 45a45434b3..7cc9273c9f 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/service/impl/ECResourceInfoServiceImpl.java @@ -19,11 +19,17 @@ import org.apache.linkis.manager.am.restful.EMRestfulApi; import org.apache.linkis.manager.am.service.ECResourceInfoService; +import org.apache.linkis.manager.am.util.ECResourceInfoUtils; import org.apache.linkis.manager.common.entity.enumeration.NodeStatus; import org.apache.linkis.manager.common.entity.persistence.ECResourceInfoRecord; import org.apache.linkis.manager.common.entity.persistence.PersistencerEcNodeInfo; import org.apache.linkis.manager.dao.ECResourceRecordMapper; +import org.apache.linkis.manager.dao.LabelManagerMapper; import org.apache.linkis.manager.dao.NodeManagerMapper; +import org.apache.linkis.manager.label.entity.Label; +import org.apache.linkis.manager.label.entity.engine.EngineTypeLabel; +import org.apache.linkis.manager.label.service.NodeLabelService; +import org.apache.linkis.manager.persistence.LabelManagerPersistence; import org.apache.commons.lang3.StringUtils; @@ -52,6 +58,12 @@ public class ECResourceInfoServiceImpl implements ECResourceInfoService { @Autowired private NodeManagerMapper nodeManagerMapper; + @Autowired private LabelManagerMapper labelManagerMapper; + + @Autowired private LabelManagerPersistence labelManagerPersistence; + + @Autowired private NodeLabelService nodeLabelService; + @Override public ECResourceInfoRecord getECResourceInfoRecord(String ticketId) { if (StringUtils.isNotBlank(ticketId)) { @@ -101,7 +113,7 @@ public List> getECResourceInfoList( statusIntList.add(NodeStatus.valueOf(status).ordinal()); } - // get engine conn info list filter by creator user list /engineType/instance status list + // get engine conn info list filter by creator user list /instance status list List ecNodesInfo = nodeManagerMapper.getEMNodeInfoList(creatorUserList, statusIntList); @@ -112,6 +124,28 @@ public List> getECResourceInfoList( return resultList; } + // filter by engineTypes + List ecNodesFilterInfo = new ArrayList<>(); + HashMap>> labelsMap = + nodeLabelService.getNodeLabelsByInstanceList2(instanceList); + for (PersistencerEcNodeInfo node : ecNodesInfo) { + List> labels = + labelsMap.get(node.getInstance()).stream() + .filter( + label -> { + if (label instanceof EngineTypeLabel) { + String engineType = ((EngineTypeLabel) label).getEngineType(); + return engineTypeList.contains(engineType); + } else { + return false; + } + }) + .collect(Collectors.toList()); + if (labels.size() > 0) { + ecNodesFilterInfo.add(node); + } + } + // filter by engineType and get latest resource record info List ecResourceInfoRecords = ecResourceRecordMapper.getECResourceInfoList(instanceList, engineTypeList); @@ -120,7 +154,7 @@ public List> getECResourceInfoList( ecResourceInfoRecords.stream() .collect(Collectors.toMap(ECResourceInfoRecord::getServiceInstance, item -> item)); - ecNodesInfo.forEach( + ecNodesFilterInfo.forEach( info -> { try { Map item = @@ -132,7 +166,14 @@ public List> getECResourceInfoList( if (latestEcInfo == null) { logger.info("Can not get any resource record info of ec:{}", info.getInstance()); } else { - item.put("useResource", latestEcInfo.getUsedResource()); + String usedResourceStr = latestEcInfo.getUsedResource(); + /* + {"instance":1,"memory":"2.0 GB","cpu":1} + -> + {"driver":{"instance":1,"memory":"2.0 GB","cpu":1} } + */ + + item.put("useResource", ECResourceInfoUtils.getStringToMap(usedResourceStr)); item.put("ecmInstance", latestEcInfo.getEcmInstance()); String engineType = latestEcInfo.getLabelValue().split(",")[1].split("-")[0]; item.put("engineType", engineType); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java index 80320b694a..ab926df64e 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/util/ECResourceInfoUtils.java @@ -87,6 +87,26 @@ public static ResourceVo getStringToMap(String str, ECResourceInfoRecord info) { return resourceVo; } + /* + {"instance":1,"memory":"2.0 GB","cpu":1} + -> + {"driver":{"instance":1,"memory":"2.0 GB","cpu":1} } + */ + public static Map getStringToMap(String usedResourceStr) { + Map resourceMap = new HashMap<>(); + Map map = + BDPJettyServerHelper.gson().fromJson(usedResourceStr, new HashMap<>().getClass()); + if (MapUtils.isNotEmpty(map)) { + + if (!usedResourceStr.contains("driver")) { + resourceMap.put("driver", map); + } else { + resourceMap = map; + } + } + return resourceMap; + } + public static AMEngineNode convertECInfoTOECNode(ECResourceInfoRecord ecInfo) { AMEngineNode engineNode = new AMEngineNode(); AMEMNode ecmNode = new AMEMNode(); diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala index c47adb89e3..4fda27a8ce 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineStopService.scala @@ -134,10 +134,7 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ val unlockEngineNodes = engineNodes.filter(node => NodeStatus.Unlock.equals(node.getNodeStatus)) logger.info( - "get ec node total num:{} and unlock node num:{} of ecm:{} ", - engineNodes.size, - unlockEngineNodes.size, - ecmInstance + s"get ec node total num:${engineNodes.size} and unlock node num:${unlockEngineNodes.size} of ecm:${ecmInstance} " ) var loadInstanceResourceTotal = new LoadInstanceResource(0, 0, 0) diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala index e6fe947a45..cd881cdbae 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/NodeLabelService.scala @@ -90,4 +90,16 @@ trait NodeLabelService { instanceList: util.List[ServiceInstance] ): util.HashMap[String, util.List[Label[_]]] + /** + * Get instance labels by instances name list + * + * @param instanceList + * instance name list + * @return + * Map of:instance name -> label list + */ + def getNodeLabelsByInstanceList2( + instanceList: util.List[String] + ): util.HashMap[String, util.List[Label[_]]] + } diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala index 644c875726..35f3c68fe3 100644 --- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala +++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/label/service/impl/DefaultNodeLabelService.scala @@ -37,6 +37,8 @@ import org.springframework.transaction.annotation.Transactional import org.springframework.util.CollectionUtils import java.util +import java.util.List +import java.util.stream.Collectors import scala.collection.JavaConverters._ import scala.collection.mutable @@ -406,4 +408,35 @@ class DefaultNodeLabelService extends NodeLabelService with Logging { resultMap } + override def getNodeLabelsByInstanceList2( + instanceList: util.List[String] + ): util.HashMap[String, util.List[Label[_]]] = { + val resultMap = new util.HashMap[String, util.List[Label[_]]]() + val serviceInstanceList = new util.ArrayList[ServiceInstance] + + instanceList.asScala.foreach(instance => { + val serviceInstance = new ServiceInstance(); + serviceInstance.setInstance(instance); + serviceInstanceList.add(serviceInstance) + }) + + val map = labelManagerPersistence.getLabelRelationsByServiceInstance(serviceInstanceList) + serviceInstanceList.asScala.foreach(serviceInstance => { + val LabelList = map + .get(serviceInstance) + .asScala + .map { label => + val realyLabel: Label[_] = labelFactory.createLabel( + label.getLabelKey, + if (!CollectionUtils.isEmpty(label.getValue)) label.getValue else label.getStringValue + ) + realyLabel + } + .toList + .asJava + resultMap.put(serviceInstance.getInstance, LabelList) + }) + resultMap + } + } diff --git a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala index 586773f03c..dde1d2074c 100644 --- a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala +++ b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/executor/SQLSession.scala @@ -194,11 +194,11 @@ object SQLSession extends Logging { .toSeq .sorted .mkString("{", ",", "}") - case (null, _) => "null" - case (s: String, StringType) => "\"" + s + "\"" - case (decimal, DecimalType()) => decimal.toString - case (interval, CalendarIntervalType) => interval.toString - case (other, tpe) if primitiveTypes contains tpe => other.toString + case (str: String, StringType) => str.replaceAll("\n|\t", " ") + case (double: Double, DoubleType) => nf.format(double) + case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal) + case other: Any => other.toString + case _ => null } a match { @@ -219,23 +219,13 @@ object SQLSession extends Logging { .toSeq .sorted .mkString("{", ",", "}") - case (null, _) => "NULL" - case (d: Date, DateType) => - DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d)) - case (t: Timestamp, TimestampType) => - DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t)) - case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) + + case (str: String, StringType) => str.replaceAll("\n|\t", " ") + case (double: Double, DoubleType) => nf.format(double) case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal) - case (interval, CalendarIntervalType) => interval.toString - case (other, tpe) if primitiveTypes.contains(tpe) => other.toString - } - } + case other: Any => other.toString + case _ => null - private def formatDecimal(d: java.math.BigDecimal): String = { - if (null == d || d.compareTo(java.math.BigDecimal.ZERO) == 0) { - java.math.BigDecimal.ZERO.toPlainString - } else { - d.stripTrailingZeros().toPlainString } }