Skip to content

Commit

Permalink
Merge branch 'dev-1.10.0-webank' of github.com:WeDataSphere/linkis in…
Browse files Browse the repository at this point in the history
…to dev-1.10.0-webank
  • Loading branch information
mayinrain committed Jan 13, 2025
2 parents 7d0815c + 554713b commit 9851c2f
Show file tree
Hide file tree
Showing 26 changed files with 180 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object HadoopConf {
CommonVars("linkis.hadoop.hdfs.cache.close.enable", true).getValue

val HDFS_ENABLE_NOT_CLOSE_USERS =
CommonVars("linkis.hadoop.hdfs.cache.not.close.users", "").getValue
CommonVars("linkis.hadoop.hdfs.cache.not.close.users", "hadoop").getValue

val HDFS_ENABLE_CACHE_IDLE_TIME =
CommonVars("wds.linkis.hadoop.hdfs.cache.idle.time", 3 * 60 * 1000).getValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -57,6 +58,12 @@ public class HDFSFileSystem extends FileSystem {

private static final Logger logger = LoggerFactory.getLogger(HDFSFileSystem.class);

private static final String LOCKER_SUFFIX = "refresh";

private static final int REFRESH_INTERVAL = LinkisStorageConf.HDFS_FILE_SYSTEM_REFRESHE_INTERVAL() * 1000 * 60;

private static final ConcurrentHashMap<String, Long> lastCallTimes = new ConcurrentHashMap<>();

/** File System abstract method start */
@Override
public String listRoot() throws IOException {
Expand Down Expand Up @@ -328,9 +335,21 @@ public boolean exists(FsPath dest) throws IOException {

private void resetRootHdfs() {
if (fs != null) {
synchronized (this) {
String locker = user + LOCKER_SUFFIX;
synchronized (locker.intern()) {
if (fs != null) {
if (HadoopConf.HDFS_ENABLE_CACHE()) {
long currentTime = System.currentTimeMillis();
Long lastCallTime = lastCallTimes.get(user);

if (lastCallTime != null && (currentTime - lastCallTime) < REFRESH_INTERVAL) {
logger.warn(
"Method call denied for username: {} Please wait for {} minutes.",
user,
REFRESH_INTERVAL / 60000);
return;
}
lastCallTimes.put(user, currentTime);
HDFSUtils.closeHDFSFIleSystem(fs, user, label, true);
} else {
HDFSUtils.closeHDFSFIleSystem(fs, user, label);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ object LinkisStorageConf {
)
.getValue

val HDFS_FILE_SYSTEM_REFRESHE_INTERVAL: Int =
CommonVars
.apply("wds.linkis.hdfs.rest.interval", 10)
.getValue

val ROW_BYTE_MAX_LEN_STR = CommonVars("wds.linkis.resultset.row.max.str", "2m").getValue

val ROW_BYTE_MAX_LEN = ByteTimeUtils.byteStringAsBytes(ROW_BYTE_MAX_LEN_STR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ object DataType extends Logging {
case "date" => DateType
case "timestamp" => TimestampType
case "binary" => BinaryType
case "decimal" | DECIMAL_REGEX() => DecimalType(dataType, 3)
case "decimal" | DECIMAL_REGEX() => DecimalType
case ARRAY_REGEX() => ArrayType
case MAP_REGEX() => MapType
case LIST_REGEX() => ListType
Expand All @@ -89,8 +89,7 @@ object DataType extends Logging {
case LongType | BigIntType => if (isNumberNull(newValue)) null else newValue.toLong
case FloatType => if (isNumberNull(newValue)) null else newValue.toFloat
case DoubleType => if (isNumberNull(newValue)) null else newValue.toDouble
case DecimalType(_, _) =>
if (isNumberNull(newValue)) null else new JavaBigDecimal(newValue)
case DecimalType => if (isNumberNull(newValue)) null else new JavaBigDecimal(newValue)
case DateType => if (isNumberNull(newValue)) null else Date.valueOf(newValue)
case TimestampType =>
if (isNumberNull(newValue)) null else Timestamp.valueOf(newValue).toString.stripSuffix(".0")
Expand Down Expand Up @@ -146,16 +145,12 @@ case object VarcharType extends DataType("varchar", 12)
case object DateType extends DataType("date", 91)
case object TimestampType extends DataType("timestamp", 93)
case object BinaryType extends DataType("binary", -2)
case object DecimalType extends DataType("decimal", 3)
case object ArrayType extends DataType("array", 2003)
case object MapType extends DataType("map", 2000)
case object ListType extends DataType("list", 2001)
case object StructType extends DataType("struct", 2002)

case class DecimalType(override val typeName: String, override val javaSQLType: Int)
extends DataType(typeName, javaSQLType)

case class BigDecimalType(override val typeName: String, override val javaSQLType: Int)
extends DataType(typeName, javaSQLType)
case object BigDecimalType extends DataType("bigdecimal", 3)

case class Column(columnName: String, dataType: DataType, comment: String) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.linkis.storage.domain

import java.lang.reflect.Type

import com.google.gson.GsonBuilder
import com.google.gson.{GsonBuilder, ToNumberPolicy}

/**
* @param id
Expand Down Expand Up @@ -56,7 +55,10 @@ case class MethodEntity(

object MethodEntitySerializer {

val gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create
val gson = new GsonBuilder()
.setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
.setObjectToNumberStrategy(ToNumberPolicy.LAZILY_PARSED_NUMBER)
.create

/**
* Serialized to code as a MethodEntity object 序列化为code为MethodEntity对象
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class StorageExcelWriter(
case VarcharType => style.setDataFormat(format.getFormat("@"))
case DateType => style.setDataFormat(format.getFormat("m/d/yy h:mm"))
case TimestampType => style.setDataFormat(format.getFormat("m/d/yy h:mm"))
case DecimalType(_, _) => style.setDataFormat(format.getFormat("#.000000000"))
case BigDecimalType(_, _) => style.setDataFormat(format.getFormat("#.000000000"))
case DecimalType => style.setDataFormat(format.getFormat("#.000000000"))
case BigDecimalType => style.setDataFormat(format.getFormat("#.000000000"))
case _ => style.setDataFormat(format.getFormat("@"))
}
}
Expand Down Expand Up @@ -171,10 +171,10 @@ class StorageExcelWriter(
case VarcharType => cell.setCellValue(DataType.valueToString(elem))
case DateType => cell.setCellValue(getDate(elem))
case TimestampType => cell.setCellValue(getDate(elem))
case DecimalType(_, _) =>
case DecimalType =>
doubleCheck(DataType.valueToString(elem))
cell.setCellValue(DataType.valueToString(elem).toDouble)
case BigDecimalType(_, _) =>
case BigDecimalType =>
doubleCheck(DataType.valueToString(elem))
cell.setCellValue(DataType.valueToString(elem).toDouble)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType, RunType}
import org.apache.linkis.ujes.client.response.ResultSetResult

import java.math.{BigDecimal => JavaBigDecimal}
import java.util
import java.util.Locale

Expand All @@ -30,7 +29,6 @@ import com.google.gson.{Gson, JsonObject}
object UJESClientUtils {

val gson: Gson = new Gson()
val DECIMAL_REGEX = "^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)".r.unanchored

def toEngineType(engineType: String): EngineType = engineType match {
case "spark" => EngineType.SPARK
Expand Down Expand Up @@ -77,7 +75,7 @@ object UJESClientUtils {
case "boolean" => value.toBoolean
case "byte" => value.toByte
case "bigint" => value.toLong
case "decimal" | DECIMAL_REGEX() => new JavaBigDecimal(value)
case "decimal" => value.toDouble
case "array" => gson.fromJson(value, classOf[util.ArrayList[Object]])
case "map" => gson.fromJson(value, classOf[util.HashMap[Object, Object]])
case "struct" => gson.fromJson(value, classOf[JsonObject])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ class TaskExecutionServiceImpl
} else {
val msg =
"Task null! requestTaskStatus: " + ComputationEngineUtils.GSON.toJson(requestTaskStatus)
logger.error(msg)
logger.info(msg)
ResponseTaskStatus(requestTaskStatus.execId, ExecutionNodeStatus.Cancelled)
}
}
Expand Down Expand Up @@ -587,7 +587,7 @@ class TaskExecutionServiceImpl
if (null != task) {
sendToEntrance(task, ResponseTaskLog(logUpdateEvent.taskId, logUpdateEvent.log))
} else {
logger.error("Task cannot null! logupdateEvent: " + logUpdateEvent.taskId)
logger.info("Task cannot null! logupdateEvent: " + logUpdateEvent.taskId)
}
} else if (null != lastTask) {
val executor = executorManager.getReportExecutor
Expand Down Expand Up @@ -629,7 +629,7 @@ class TaskExecutionServiceImpl
logger.info(s"task ${task.getTaskId} status $toStatus will not be send to entrance")
}
} else {
logger.error(
logger.info(
"Task cannot null! taskStatusChangedEvent: " + ComputationEngineUtils.GSON
.toJson(taskStatusChangedEvent)
)
Expand All @@ -656,7 +656,7 @@ class TaskExecutionServiceImpl

sendToEntrance(task, respRunningInfo)
} else {
logger.error(
logger.info(
"Task cannot null! taskProgressUpdateEvent : " + ComputationEngineUtils.GSON
.toJson(taskProgressUpdateEvent)
)
Expand All @@ -677,7 +677,7 @@ class TaskExecutionServiceImpl
)
)
} else {
logger.error(s"Task cannot null! taskResultCreateEvent: ${taskResultCreateEvent.taskId}")
logger.info(s"Task cannot null! taskResultCreateEvent: ${taskResultCreateEvent.taskId}")
}
logger.info(s"Finished to deal result event ${taskResultCreateEvent.taskId}")
}
Expand All @@ -704,7 +704,7 @@ class TaskExecutionServiceImpl
)
)
} else {
logger.error(
logger.info(
"Task cannot null! taskResultSizeCreatedEvent: " + ComputationEngineUtils.GSON
.toJson(taskResultSizeCreatedEvent)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public class AMConfiguration {
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python");

public static final CommonVars<String> UNALLOW_BATCH_KILL_ENGINE_TYPES =
CommonVars.apply("wds.linkis.unallow.batch.kill.engine.types", "trino,appconn,io_file,jdbc");
CommonVars.apply("wds.linkis.unallow.batch.kill.engine.types", "trino,appconn,io_file,jdbc,nebula");
public static final CommonVars<String> MULTI_USER_ENGINE_USER =
CommonVars.apply("wds.linkis.multi.user.engine.user", getDefaultMultiEngineUser());
public static final String UDF_KILL_ENGINE_TYPE =
Expand Down Expand Up @@ -205,7 +205,7 @@ public static boolean isUnAllowKilledEngineType(String engineType) {
AMConfiguration.UNALLOW_BATCH_KILL_ENGINE_TYPES.getValue().split(",");
Optional<String> findResult =
Arrays.stream(unAllowBatchKillEngine)
.filter(e -> e.equalsIgnoreCase(engineType))
.filter(e -> engineType.toLowerCase().contains(e))
.findFirst();
return findResult.isPresent();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ class DefaultEngineStopService extends AbstractEngineService with EngineStopServ
.filter(node =>
LabelUtil.getUserCreatorLabel(node.getLabels).getUser.equals(userName)
&& LabelUtil.getUserCreatorLabel(node.getLabels).getCreator.equals(creator)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getEngineType.equals(engineType)
&& LabelUtil.getEngineTypeLabel(node.getLabels).getStringValue.equals(engineType)
)
.toList
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,51 @@ public class LoadInstanceResource extends Resource {
private final long memory;
private final int cores;
private final int instances;
// 兼容性适配
private final int instance;

private LoadInstanceResource(Resource r) {
if (r instanceof LoadInstanceResource) {
LoadInstanceResource t = (LoadInstanceResource) r;
this.memory = t.memory;
this.cores = t.cores;
this.instances = t.instances;
this.instance = t.instances;
} else if (r instanceof LoadResource) {
LoadResource l = (LoadResource) r;
this.memory = l.getMemory();
this.cores = l.getCores();
this.instances = 0;
this.instance = 0;
} else if (r instanceof MemoryResource) {
MemoryResource m = (MemoryResource) r;
this.memory = m.getMemory();
this.cores = 0;
this.instances = 0;
this.instance = 0;
} else if (r instanceof CPUResource) {
CPUResource c = (CPUResource) r;
this.memory = 0;
this.cores = c.getCores();
this.instances = 0;
this.instance = 0;
} else if (r instanceof DriverAndYarnResource) {
DriverAndYarnResource d = (DriverAndYarnResource) r;
this.memory = d.getLoadInstanceResource().getMemory();
this.cores = d.getLoadInstanceResource().getCores();
this.instances = d.getLoadInstanceResource().getInstances();
this.instance = d.getLoadInstanceResource().getInstances();
} else if (r instanceof DriverAndKubernetesResource) {
DriverAndKubernetesResource d = (DriverAndKubernetesResource) r;
this.memory = d.getLoadInstanceResource().getMemory();
this.cores = d.getLoadInstanceResource().getCores();
this.instances = d.getLoadInstanceResource().getInstances();
this.instance = d.getLoadInstanceResource().getInstances();
} else {
this.memory = Long.MAX_VALUE;
this.cores = Integer.MAX_VALUE;
this.instances = Integer.MAX_VALUE;
this.instance = Integer.MAX_VALUE;
}
}

Expand All @@ -73,6 +82,7 @@ public LoadInstanceResource(long memory, int cores, int instances) {
this.memory = memory;
this.cores = cores;
this.instances = instances;
this.instance = instances;
}

public LoadInstanceResource add(Resource r) {
Expand Down Expand Up @@ -184,4 +194,8 @@ public int getCores() {
public int getInstances() {
return instances;
}

public int getInstance() {
return instances;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public static String getTypeStr(int type) {
retVal = BinaryType.typeName();
break;
case Types.DECIMAL:
retVal = new DecimalType("decimal", 3).typeName();
retVal = DecimalType.typeName();
break;
case Types.ARRAY:
retVal = ArrayType.typeName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ object DolphinToSpark {
case wds.BigIntType => LongType
case wds.FloatType => FloatType
case wds.DoubleType => DoubleType
case wds.DecimalType(_, _) => DecimalType(bigDecimalPrecision, bigDecimalScale)
case wds.DecimalType => DecimalType(bigDecimalPrecision, bigDecimalScale)
case wds.DateType => DateType
// case wds.TimestampType => TimestampType
case wds.BinaryType => BinaryType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class ThreadUtils extends ApplicationContextEvent {
Utils.newCachedExecutionContext(5, "alert-pool-thread-", false);

public static ExecutionContextExecutorService executors_analyze =
Utils.newCachedExecutionContext(50, "analyze-pool-thread-", false);
Utils.newCachedExecutionContext(50, "analyze-pool-thread-", false);

public ThreadUtils(ApplicationContext source) {
super(source);
Expand All @@ -68,7 +68,8 @@ public static String run(List<String> cmdList, String shellName) {
}
return msg;
}
public static void analyzeRun(JobHistory jobHistory){

public static void analyzeRun(JobHistory jobHistory) {
FutureTask future = new FutureTask(() -> HttpsUntils.analyzeJob(jobHistory), -1);
executors_analyze.submit(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ public Message queryDbsWithTables(HttpServletRequest req) {
}
}

@ApiOperation(
value = "queryDbsWithTablesgetByAccessTime",
notes = "query dbs with tables order by getByAccessTime",
response = Message.class)
@RequestMapping(path = "getByAccessTime", method = RequestMethod.GET)
public Message queryDbsWithTablesgetByAccessTime(HttpServletRequest req) {
String userName = ModuleUserUtils.getOperationUser(req, "get all db and tables");
try {
JsonNode dbs = dataSourceService.getDbsWithTablesAndLastAccessAt(userName);
return Message.ok("").data("dbs", dbs);
} catch (Exception e) {
logger.error("Failed to queryDbsWithTables", e);
return Message.error("Failed to queryDbsWithTables", e);
}
}

@ApiOperation(value = "queryTables", notes = "query tables", response = Message.class)
@ApiImplicitParams({
@ApiImplicitParam(name = "database", dataType = "String", value = "database")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ public interface DataSourceRestfulRemote {
@GetMapping("/api/datasource/all")
public Message queryDbsWithTables(HttpServletRequest req);

@GetMapping("/api/datasource/getByAccessTime")
public Message queryDbsWithTablesgetByAccessTime(HttpServletRequest req);

@GetMapping("/api/datasource/tables")
public Message queryTables(@RequestParam("database") String database, HttpServletRequest req);

Expand Down
Loading

0 comments on commit 9851c2f

Please sign in to comment.