Skip to content

Commit

Permalink
merge 1.1.19 webank to 1.1.20-webank (apache#434)
Browse files Browse the repository at this point in the history
* fix slow sql (apache#412)

Co-authored-by: “v_kkhuang” <“[email protected]”>

* opt code (apache#413)

Co-authored-by: aiceflower <[email protected]>

* chore: 1.1.19 web update (apache#410)

* optimize concurrent ec execute logic

* should throw exception

* Dev 1.1.19 webank fix acrosscluster (apache#432)

* fix cross cluster resource bug

* code format

* fix cross cluster

* add cross cluster notes

---------

Co-authored-by: v-kkhuang <[email protected]>
Co-authored-by: “v_kkhuang” <“[email protected]”>
Co-authored-by: aiceflower <[email protected]>
Co-authored-by: Yonghao Mei <[email protected]>
Co-authored-by: peacewong <[email protected]>
Co-authored-by: lemonjuice <[email protected]>
  • Loading branch information
7 people authored Feb 29, 2024
1 parent 7deb591 commit e304f93
Show file tree
Hide file tree
Showing 25 changed files with 1,297 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class DriverAndYarnReqResourceService(
logger.info(
s"user: ${labelContainer.getUserCreatorLabel.getUser} request queue resource $requestedYarnResource > left resource $queueLeftResource"
)

val notEnoughMessage =
generateQueueNotEnoughMessage(requestedYarnResource, queueLeftResource, maxCapacity)
canCreateECRes.setCanCreateEC(false);
Expand Down Expand Up @@ -156,6 +157,7 @@ class DriverAndYarnReqResourceService(
val notEnoughMessage =
generateQueueNotEnoughMessage(requestedYarnResource, queueLeftResource, maxCapacity)
throw new RMWarnException(notEnoughMessage._1, notEnoughMessage._2)

}

if (engineCreateRequest.getProperties != null) {
Expand All @@ -165,14 +167,15 @@ class DriverAndYarnReqResourceService(
val acrossClusterTask = properties.getOrDefault(AMConfiguration.ACROSS_CLUSTER_TASK, "false")
val priorityCluster = properties.get(AMConfiguration.PRIORITY_CLUSTER)

// judge if is cross cluster task and priority cluster
if (
StringUtils.isNotBlank(acrossClusterTask) && acrossClusterTask.toBoolean && StringUtils
.isNotBlank(priorityCluster) && priorityCluster.equals(
AMConfiguration.PRIORITY_CLUSTER_TARGET
)
) {

// cross cluster task and bdp priority
// priority cluster is target, get target threshold
val targetCPUThreshold = properties.get(AMConfiguration.TARGET_CPU_THRESHOLD)
val targetMemoryThreshold = properties.get(AMConfiguration.TARGET_MEMORY_THRESHOLD)
val targetCPUPercentageThreshold =
Expand All @@ -188,7 +191,7 @@ class DriverAndYarnReqResourceService(
)
) {

// judge total cluster resources
// judge total target cluster resources between target threshold
val clusterYarnResource =
externalResourceService.getResource(
ResourceType.Yarn,
Expand All @@ -210,7 +213,7 @@ class DriverAndYarnReqResourceService(
s"clusterCPUPercentageThreshold: $clusterCPUPercentageThreshold, clusterMemoryPercentageThreshold: $clusterMemoryPercentageThreshold"
)

// judge bdp cluster queue resources
// judge target cluster resource between target threshold
try {
AcrossClusterRulesJudgeUtils.targetClusterRuleCheck(
queueLeftResource.asInstanceOf[YarnResource],
Expand All @@ -226,15 +229,18 @@ class DriverAndYarnReqResourceService(
clusterMemoryPercentageThreshold
)
} catch {
// if target cluster resource gt threshold, throw target retry exception and change to normal task next retry;
case ex: Exception =>
throw new RMWarnException(
RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode,
ex.getMessage
)
}
logger.info(s"user: $user, creator: $creator task meet the threshold rule")
logger.info(s"user: $user, creator: $creator task meet the target threshold rule")
} else {
logger.info(s"user: $user, creator: $creator task skip cross cluster resource judgment")
logger.info(
s"user: $user, creator: $creator task skip the target threshold rule judgment"
)
}
} else if (
StringUtils.isNotBlank(acrossClusterTask) && acrossClusterTask.toBoolean && StringUtils
Expand All @@ -243,7 +249,7 @@ class DriverAndYarnReqResourceService(
)
) {

// cross cluster task and bdap priority
// priority cluster is origin, get origin threshold
val originCPUPercentageThreshold =
properties.get(AMConfiguration.ORIGIN_CPU_PERCENTAGE_THRESHOLD)
val originMemoryPercentageThreshold =
Expand All @@ -260,7 +266,7 @@ class DriverAndYarnReqResourceService(
s"originCPUPercentageThreshold: $originCPUPercentageThreshold, originMemoryPercentageThreshold: $originMemoryPercentageThreshold"
)

// judge bdap cluster queue resources
// judge origin cluster resource between origin threshold
try {
AcrossClusterRulesJudgeUtils.originClusterRuleCheck(
usedCapacity.asInstanceOf[YarnResource],
Expand All @@ -269,15 +275,19 @@ class DriverAndYarnReqResourceService(
originMemoryPercentageThreshold.toDouble
)
} catch {
// if origin cluster resource gt threshold, throw origin retry exception and change to target cluster next retry;
case ex: Exception =>
throw new RMWarnException(
RMErrorCode.ACROSS_CLUSTER_RULE_FAILED.getErrorCode,
ex.getMessage
)
}
logger.info(s"user: $user, creator: $creator task meet the threshold rule")
// if origin cluster resource lt threshold, continue as normal task
logger.info(s"user: $user, creator: $creator task meet the origin threshold rule")
} else {
logger.info(s"user: $user, creator: $creator task skip cross cluster resource judgment")
logger.info(
s"user: $user, creator: $creator task skip the origin threshold rule judgment"
)
}
} else {
logger.info(s"user: $user, creator: $creator task skip cross cluster resource judgment")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,18 @@ class IoEngineConnExecutor(val id: Int, val outputLimit: Int = 10)
AliasOutputExecuteResponse(method.id.toString, StorageUtils.serializerStringToResult(res))
}

override def getConcurrentLimit(): Int = {
var maxTaskNum = ComputationExecutorConf.ENGINE_CONCURRENT_THREAD_NUM.getValue - 5
if (maxTaskNum <= 0) {
logger.error(
s"max task num cannot ${maxTaskNum} < 0, should set linkis.engineconn.concurrent.thread.num > 6"
)
maxTaskNum = 1
}
logger.info(s"max task num $maxTaskNum")
maxTaskNum
}

override def killTask(taskID: String): Unit = {
logger.warn(s"Kill job : ${taskID}")
super.killTask(taskID)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ class ShellEngineConnFactory extends ComputationSingleExecutorEngineConnFactory
maxTaskNum = 1
}
logger.info(s"max task num $maxTaskNum")
new ShellEngineConnConcurrentExecutor(id)
new ShellEngineConnConcurrentExecutor(id, maxTaskNum)

} else {
new ShellEngineConnExecutor(id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ List<String> getDbsByUserAndRoles(

List<Map<String, Object>> getTablesByDbName(MetadataQueryParam queryParam);

Map<String, Object> getTableInfoByTableNameAndDbName(
@Param("tableName") String tableName, @Param("dbName") String dbName);

/**
* get the table partition's size
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import org.apache.linkis.metadata.util.DWSConfig;
import org.apache.linkis.metadata.utils.MdqConstants;

import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -72,7 +75,21 @@ public List<Map<String, Object>> getTablesByDbNameAndOptionalUserName(
return null;
}
String userName = queryParam.getUserName();
String dbName = queryParam.getDbName();
if (adminUser.equals(userName)) {
String tableName = queryParam.getTableName();
// if tableName is not empty;query by tablename
if (StringUtils.isNotEmpty(tableName) && StringUtils.isNotEmpty(dbName)) {
log.info("admin {} to get table with tableName:{} ", userName, tableName);
Map<String, Object> queryRes =
hiveMetaDao.getTableInfoByTableNameAndDbName(tableName, dbName);
List<Map<String, Object>> result = new ArrayList<>();
if (queryRes != null) {
result.add(queryRes);
}
return result;
}

log.info("admin {} to get all tables ", userName);
return hiveMetaDao.getTablesByDbName(queryParam);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,10 @@ public MdqTableBaseInfoVO getTableBaseInfoFromHive(MetadataQueryParam queryParam
.parallelStream()
.filter(f -> queryParam.getTableName().equals(f.get("NAME")))
.findFirst();
Map<String, Object> talbe =
Map<String, Object> table =
tableOptional.orElseThrow(() -> new IllegalArgumentException("table不存在"));
MdqTableBaseInfoVO mdqTableBaseInfoVO =
DomainCoversionUtils.mapToMdqTableBaseInfoVO(talbe, queryParam.getDbName());
DomainCoversionUtils.mapToMdqTableBaseInfoVO(table, queryParam.getDbName());
String tableComment =
hiveMetaDao.getTableComment(queryParam.getDbName(), queryParam.getTableName());
mdqTableBaseInfoVO.getBase().setComment(tableComment);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,23 @@
</select>

<select id="getTablesByDbName" resultType="map" parameterType="map">
select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME, t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME,
t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
from TBLS t2 inner join DBS t3 on t2.DB_ID = t3.DB_ID
where t3.NAME = #{dbName,jdbcType=VARCHAR}
order by t2.TBL_NAME;
</select>


<select id="getTableInfoByTableNameAndDbName" resultType="map">
select t2.TBL_NAME as NAME, t2.TBL_TYPE as TYPE, t2.CREATE_TIME as CREATE_TIME,
t2.LAST_ACCESS_TIME as LAST_ACCESS_TIME, t2.OWNER as OWNER
from TBLS t2
where t2.TBL_NAME=#{tableName,jdbcType=VARCHAR}
and t2.DB_ID=(select DB_ID from DBS t3 where t3.NAME = #{dbName,jdbcType=VARCHAR})
</select>


<select id="getPartitionSize" resultType="java.lang.Long" parameterType="map">
select PARTITION_PARAMS.PARAM_VALUE from PARTITION_PARAMS
inner join `PARTITIONS` ON `PARTITIONS`.PART_ID=PARTITION_PARAMS.PART_ID
Expand Down
32 changes: 24 additions & 8 deletions linkis-web/src/apps/linkis/i18n/common/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
"instanceNum": "Instance Number",
"keyTip": "The key cannot be empty",
"instanceName": "Instance Name",
"serviceName": "Service Name",
"IP": "IP",
"hostname": "Hostname",
"resources": "Resources",
"reset": "Reset",
"clearSearch": "Clear",
Expand Down Expand Up @@ -119,7 +122,7 @@
"initiator": "Initiator",
"engineInstance": "Engine Instance",
"engineType": "Engine Type",
"serveType": "Serve Type",
"serveType": "Serve Name",
"appType": "App Type",
"taskID": "JobID",
"fileName": "Source",
Expand Down Expand Up @@ -152,7 +155,11 @@
"engineUsed": "Engine Used",
"engineTop": "Engine Max",
"versioTips": "If no engine version is available, check whether the engine materials of the corresponding version are installed",
"applicationRole": "Application role"
"applicationRole": "Application role",
"serviceAddress": "IP:Port",
"hostname": "Hostname",
"registryTime": "Registration Time",
"versionInfo": "Version Info"
},
"logLoading": "Requesting logs, please hold on",
"title": "Linkis Control Panel",
Expand All @@ -175,12 +182,14 @@
"dateReport": "Global Variables",
"globalValiable": "Frequently Asked",
"microserviceManage": "Microservice Management",
"eurekaService": "Eureka Service",
"ECMManage": "ECM Management",
"udfFunctionTitle": "UDF Function",
"udfFunctionManage": "UDF Management",
"dataSourceManage": "DataSource Management",
"userResourceManagement": "User Resource Management",
"tenantTagManagement": "Tenant Tag Management",
"departmentTagManagement": "Department Tag Management",
"ipListManagement": "White List Management",
"acrossClusterRule": "Across Cluster Rule Management",
"errorCode": "Error Manage",
Expand Down Expand Up @@ -219,6 +228,7 @@
"update": "Update Engine Plugin",
"updateFileOnly": "Update",
"resourceVersion": "Engine Material BML Version",
"resourceAddress": "Material Address",
"user": "Affiliated Person",
"deleteCurrentbml": "Delete",
"versionList": "Version List",
Expand Down Expand Up @@ -416,13 +426,15 @@
"inputTenant": "Please Input Tenant Tag",
"inputDesc": "Please Input Description",
"inputCreateUser": "Please Input Creare User",
"selectDepartment": "Please Select Department",
"yourTagMapping": "Your Tag Mapping",
"notEmpty": "Cannot be empty",
"maxLen": "A maximum of 100 characters is allowed",
"contentError": "English, numbers, asterisk and underline only",
"contentError1": "English, numbers and underline only",
"contentError2": "English, numbers, underscores and dashes only",
"check": "Check",
"department": "Department",
"OK": "OK",
"Cancel": "Cancel",
"action": "Action",
Expand Down Expand Up @@ -473,11 +485,14 @@
"rules": "Rules",
"startTime": "Start Time",
"endTime": "End Time",
"CPUThreshold": "CPU Threshold(Core)",
"MemoryThreshold": "Memory Threshold(G)",
"CPUPercentageThreshold": "CPU Percentage Threshold",
"priorityCluster": "Priority Cluster",
"targetCPUThreshold": "Target CPU Threshold(Core)",
"targetMemoryThreshold": "Target Memory Threshold(G)",
"targetCPUPercentageThreshold": "Target CPU Percentage Threshold",
"targetMemoryPercentageThreshold": "Target Memory Percentage Threshold",
"originCPUPercentageThreshold": "Origin CPU Percentage Threshold",
"originMemoryPercentageThreshold": "Origin Memory Percentage Threshold",
"acrossClusterQueue": "Across Cluster Queue",
"MemoryPercentageThreshold": "Memory Percentage Threshold",
"timeError": "The time format should be XX:XX, please enter the correct time",
"thresholdError": "Please enter an integer between 0 and 10000",
"percentageThresholdError": "Please enter an integer between 0 and 1",
Expand Down Expand Up @@ -536,8 +551,9 @@
"status": "Status",
"submitUser": "Submit User",
"createdTime": "Created Time",
"searchRange": "Only T-1 history code can be queried"

"searchRange": "Only T-1 history code can be queried",
"fullMatch": "Full Match",
"keyword": "Keyword Match"
},
"basedataManagement": {
"add": "Add",
Expand Down
Loading

0 comments on commit e304f93

Please sign in to comment.