Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some adjustments #348

Merged
merged 4 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,14 @@ public Response shareUDF(@Context HttpServletRequest req, JsonNode json)throws
//Verify that the udf function has been shared(校验udf函数是否已经被分享)
UDFInfo udfInfo = mapper.readValue(json.get("udfInfo"), UDFInfo.class);

UDFTree sharedTree;
Long shareParentId = json.get("shareParentId").getLongValue();
String category = udfInfo.getUdfType() == 3 || udfInfo.getUdfType() == 4 ? ConstantVar.FUNCTION : ConstantVar.UDF;
UDFTree sharedTree = udfTreeService.getSharedTree(category);
if (shareParentId > 0) { // 目录
sharedTree = udfTreeService.getTreeById(shareParentId, userName, "share", category);
} else {
sharedTree = udfTreeService.getSharedTree(category);
}
if (sharedTree == null){
throw new UDFException("No shared directories!(没有共享目录!)");
}
Expand All @@ -360,7 +366,7 @@ public Response shareUDF(@Context HttpServletRequest req, JsonNode json)throws
throw new UDFException("This file is being shared!(该文件正在分享中!)");
}

String sharedPath = fileName +"";
String sharedPath = udfInfo.getPath();
//Verify sharing path---plus timestamp, it should not be repeated(校验分享路径---加上时间戳,应该不会重复)
//Copy the file to a shared directory(将文件拷贝到共享目录下)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,11 @@ object YarnUtil extends Logging{
case JNull | JNothing => None
}
def getChildQueues(resp:JValue):JValue = {
val queues = resp \ "childQueues" \ "queue"

if(queues != null && queues != JNull && queues != JNothing ) {
info(s"test queue:$queues")
queues
} else resp \ "childQueues"
val queues = resp \ "childQueues" match {
case child: JObject => child \ "queue"
case children: JArray => children
}
queues
}

def getQueueOfCapacity(queues: JValue): Option[JValue] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.file.Paths

import com.webank.wedatasphere.linkis.common.conf.Configuration
import com.webank.wedatasphere.linkis.enginemanager.{AbstractEngineCreator, EngineResource}
import com.webank.wedatasphere.linkis.enginemanager.conf.EnvConfiguration.{DEFAULT_JAVA_OPTS, ENGINE_CLIENT_MEMORY, JAVA_HOME, engineGCLogPath}
import com.webank.wedatasphere.linkis.enginemanager.conf.EnvConfiguration.{DEFAULT_JAVA_OPTS, ENGINE_CLIENT_MEMORY, HADOOP_LIB_NATIVE, JAVA_HOME, engineGCLogPath}
import com.webank.wedatasphere.linkis.enginemanager.hive.conf.HiveEngineConfiguration
import com.webank.wedatasphere.linkis.enginemanager.impl.UserEngineResource
import com.webank.wedatasphere.linkis.enginemanager.process.JavaProcessEngineBuilder
Expand Down Expand Up @@ -117,7 +117,7 @@ class HiveQLProcessBuilder extends JavaProcessEngineBuilder{
var classpath = getClasspath(request.properties, getExtractClasspath)
classpath = classpath ++ request.properties.get("jars").split(",")
classpathCheck(classpath)
commandLine += "-Djava.library.path=/appcom/Install/hadoop/lib/native"
commandLine += "-Djava.library.path=" + HADOOP_LIB_NATIVE.getValue
commandLine += "-cp"
commandLine += classpath.mkString(":")
commandLine += "com.webank.wedatasphere.linkis.engine.DataWorkCloudEngineApplication"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,16 @@ class SparkSubmitProcessBuilder extends ProcessEngineBuilder with Logging {
addOpt("--deploy-mode", _deployMode)
addOpt("--name", _name)
//addOpt("--jars",Some(ENGINEMANAGER_JAR.getValue))
info("No need to add jars for "+_jars.map(fromPath).exists(x => x.equals("hdfs:///")).toString())
if(_jars.map(fromPath).exists(x => x.equals("hdfs:///")) != true) {
_jars = _jars.filter(_.isNotBlankPath())
if(!_jars.isEmpty) {
addList("--jars", _jars.map(fromPath))
}
if(_pyFiles.map(fromPath).exists(x => x.equals("hdfs:///")) != true) {
_pyFiles = _pyFiles.filter(_.isNotBlankPath())
if(!_pyFiles.isEmpty) {
addList("--py-files", _pyFiles.map(fromPath))
}
if(_files.map(fromPath).exists(x => x.equals("hdfs:///")) != true) {
_files = _files.filter(_.isNotBlankPath())
if(!_files.isEmpty) {
addList("--files", _files.map(fromPath))
}
_conf.foreach { case (key, value) => if (key.startsWith("spark.")) addOpt("--conf", Option(f"""$key=$value"""))
Expand Down Expand Up @@ -368,10 +370,21 @@ object SparkSubmitProcessBuilder {
new SparkSubmitProcessBuilder
}

sealed trait Path
sealed trait Path {

case class AbsolutePath(path: String) extends Path
def isNotBlankPath(): Boolean;

case class RelativePath(path: String) extends Path
protected def isNotBlankPath(path: String): Boolean = {
StringUtils.isNotBlank(path) && !"/".equals(path.trim) && !"hdfs:///".equals(path.trim) && !"file:///".equals(path.trim)
}
}

case class AbsolutePath(path: String) extends Path {
override def isNotBlankPath(): Boolean = isNotBlankPath(path)
}

case class RelativePath(path: String) extends Path {
override def isNotBlankPath(): Boolean = isNotBlankPath(path)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -215,53 +215,57 @@ object CustomVariableUtils extends Logging {

val codeReg = "\\$\\{\\s*[A-Za-z][A-Za-z0-9_]*\\s*[\\+\\-\\*/]?\\s*[A-Za-z0-9_\\.]*\\s*\\}".r
val calReg = "(\\s*[A-Za-z][A-Za-z0-9_]*\\s*)([\\+\\-\\*/]?)(\\s*[A-Za-z0-9_\\.]*\\s*)".r
val parseCode = new StringBuilder
val codes = codeReg.split(code)
val expressionCache = mutable.HashSet[String]()

var i = 0

codeReg.findAllIn(code).foreach(str => {
calReg.findFirstMatchIn(str).foreach(ma => {
i = i + 1
val name = ma.group(1)
val signal = ma.group(2)
val bValue = ma.group(3)

if (name == null || name.trim.isEmpty) {
throw VarSubstitutionException(20041,s"[$str] replaced var is null")
} else {
var expression = name.trim
val varType = nameAndType.get(name.trim).orNull
if (varType == null) {
warn(s"Use undefined variables or use the set method: [$str](使用了未定义的变量或者使用了set方式:[$str])")
parseCode ++= codes(i - 1) ++ str
var targetCode = code
var noChange = false
while (!noChange && codeReg.findFirstIn(targetCode).isDefined) {
var i = 0
val parseCode = new StringBuilder
val codes = codeReg.split(targetCode)
codeReg.findAllIn(targetCode).foreach(str => {
calReg.findFirstMatchIn(str).foreach(ma => {
i = i + 1
val name = ma.group(1)
val signal = ma.group(2)
val bValue = ma.group(3)

if (name == null || name.trim.isEmpty) {
throw VarSubstitutionException(20041,s"[$str] replaced var is null")
} else {
var res: String = varType.getValue
if (signal != null && !signal.trim.isEmpty) {
if (bValue == null || bValue.trim.isEmpty) {
throw VarSubstitutionException(20042, s"[$str] expression is not right, please check")
} else {
expression = expression + "_" + signal.trim + "_" + bValue.trim
res = varType.calculator(signal.trim, bValue.trim)
var expression = name.trim
val varType = nameAndType.get(name.trim).orNull
if (varType == null) {
warn(s"Use undefined variables or use the set method: [$str](使用了未定义的变量或者使用了set方式:[$str])")
parseCode ++= codes(i - 1) ++ str
} else {
var res: String = varType.getValue
if (signal != null && !signal.trim.isEmpty) {
if (bValue == null || bValue.trim.isEmpty) {
throw VarSubstitutionException(20042, s"[$str] expression is not right, please check")
} else {
expression = expression + "_" + signal.trim + "_" + bValue.trim
res = varType.calculator(signal.trim, bValue.trim)
}
}
if (!expressionCache.contains(expression)) {
info(s"Variable expression [$str] = $res(变量表达式[$str] = $res)")
//println(s"变量表达式[$str] = $res")
expressionCache += expression
}
//println(s"变量表达式序号:$i\t[$str] = $res")
parseCode ++= codes(i - 1) ++ res
}
if (!expressionCache.contains(expression)) {
info(s"Variable expression [$str] = $res(变量表达式[$str] = $res)")
//println(s"变量表达式[$str] = $res")
expressionCache += expression
}
//println(s"变量表达式序号:$i\t[$str] = $res")
parseCode ++= codes(i - 1) ++ res
}
}
})
})
})
if (i == codes.length - 1) {
parseCode ++= codes(i)
if (i == codes.length - 1) {
parseCode ++= codes(i)
}
val parsedCode = org.apache.commons.lang.StringUtils.strip(deleteUselessSemicolon(parseCode))
if (targetCode.equals(parsedCode)) noChange = true
targetCode = parsedCode
}
val parsedCode = deleteUselessSemicolon(parseCode)
org.apache.commons.lang.StringUtils.strip(parsedCode)
targetCode
// Utils.trimBlank()
}

Expand Down