Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Bugfix/nested non unit tasks #379

Merged
merged 3 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions core/src/main/scala/dagr/core/execsystem/TaskManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
private[execsystem] def processCompletedTask(taskId: TaskId, doRetry: Boolean = true): Unit = {
val node = this(taskId)
val taskInfo = node.taskInfo
logger.debug("processCompletedTask")
nh13 marked this conversation as resolved.
Show resolved Hide resolved
logTaskMessage(taskInfo=taskInfo)
val updateNodeToCompleted: Boolean = if (TaskStatus.isTaskFailed(taskStatus = taskInfo.status) && doRetry) {
val retryTask = taskInfo.task match {
Expand Down Expand Up @@ -327,20 +328,21 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
* @return for each completed task, a map from a task identifier to a tuple of the exit code and the status of the `onComplete` method
*/
private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = {
val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks()
val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask])

emptyTasks.foreach { node =>
// Get the tasks that are not eligible to be scheduled/executed, and have no predecessors (can be completed).
val toCompleteTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filterNot(_.task.isInstanceOf[UnitTask])
nh13 marked this conversation as resolved.
Show resolved Hide resolved
toCompleteTasks.foreach { node =>
node.taskInfo.status = TaskStatus.SUCCEEDED
processCompletedTask(node.taskId)
logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed")
logger.debug("updateCompletedTasks: to-complete task [" + node.task.name + "] completed")
nh13 marked this conversation as resolved.
Show resolved Hide resolved
}

// Get the tasks that have been executed to completion.
val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks()
nh13 marked this conversation as resolved.
Show resolved Hide resolved
completedTasks.keysIterator.foreach { taskId =>
processCompletedTask(taskId)
val name = this(taskId).task.name
val status = this(taskId).taskInfo.status
logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]")
logger.debug("updateCompletedTasks: unit task [" + name + "] completed with task status [" + status + "]")
}

completedTasks
Expand Down Expand Up @@ -562,17 +564,22 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
// get the running tasks to estimate currently used resources
val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap

// get the tasks that are eligible for execution (tasks with no dependents)
val (emptyTasks: Seq[Task], readyTasks: Seq[Task]) = {
graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toSeq.partition(_.isInstanceOf[Task.EmptyTask])
// get the tasks that are eligible for execution (tasks with no dependents). UnitTasks should be scheduled for
// execution, while other tasks (such as EmptyTask) should be treated as though they are "RUNNING".
val (toScheduleTasks: Seq[UnitTask], readyToComplete: Seq[UnitTask]) = {
graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toSeq.partition {
nh13 marked this conversation as resolved.
Show resolved Hide resolved
case _: UnitTask => true
case _ => false
}
}
logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks")
logger.debug(s"stepExecution: found ${toScheduleTasks.size} toScheduleTasks tasks")
logger.debug(s"stepExecution: found ${readyToComplete.size} readyToComplete tasks")

// get the list of tasks to schedule
val tasksToSchedule: Map[UnitTask, ResourceSet] = if (!canDoAnything) Map.empty else {
val tasks = scheduler.schedule(
runningTasks = runningTasks,
readyTasks = readyTasks.filter(_.isInstanceOf[UnitTask]).map(_.asInstanceOf[UnitTask]),
readyTasks = toScheduleTasks,
systemCores = taskManagerResources.cores,
systemMemory = taskManagerResources.systemMemory,
jvmMemory = taskManagerResources.jvmMemory
Expand All @@ -599,9 +606,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de
}

(
readyTasks,
toScheduleTasks,
tasksToSchedule.keys,
runningTasks.keys ++ emptyTasks,
runningTasks.keys ++ readyToComplete,
completedTasks.keys.map(taskId => this(taskId).task)
)
}
Expand Down
29 changes: 29 additions & 0 deletions core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1060,4 +1060,33 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B
info.status shouldBe TaskStatus.FAILED_COMMAND
}
}

class TaskThatRunsAnotherTask(other: Option[Task] = None) extends Task {
override def getTasks: Iterable[_ <: Task] = other
}

it should "support deeply nested tasks" in {
// left branch ends with a shell command
val leftLeaf: Task = new ShellCommand("exit", "0") withName "left-leaf"
val leftMiddle: Task = new TaskThatRunsAnotherTask(other=Some(leftLeaf)) withName "left-middle"
val leftBranch: Task = new TaskThatRunsAnotherTask(other=Some(leftMiddle)) withName "left-branch"

// right branch ends with a no-op
val rightLeaf: Task = new TaskThatRunsAnotherTask(other=None) withName "right-leaf"
val rightMiddle: Task = new TaskThatRunsAnotherTask(other=Some(rightLeaf)) withName "right-middle"
val rightBranch: Task = new TaskThatRunsAnotherTask(other=Some(rightMiddle)) withName "right-branch"

val root: Task = Task.empty withName "root"
root ==> (leftBranch :: rightBranch)

val taskManager: TestTaskManager = getDefaultTaskManager()
taskManager.addTasks(root, leftBranch, rightBranch)
taskManager.runToCompletion(failFast=true)

val tasks = Seq(root, leftLeaf, leftMiddle, leftBranch, rightLeaf, rightMiddle, rightBranch)
tasks.foreach { task =>
val info : TaskExecutionInfo = taskManager.taskExecutionInfoFor(task).value
info.status shouldBe TaskStatus.SUCCEEDED
}
}
}