diff --git a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala index 588346e2..4241b15e 100644 --- a/core/src/main/scala/dagr/core/execsystem/TaskManager.scala +++ b/core/src/main/scala/dagr/core/execsystem/TaskManager.scala @@ -316,7 +316,11 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de if (updateNodeToCompleted) { logger.debug("processCompletedTask: Task [" + taskInfo.task.name + "] updating node to completed") if (TaskStatus.isTaskNotDone(taskInfo.status, failedIsDone = true)) { - throw new RuntimeException(s"Processing a completed task but it was not done! status: ${taskInfo.status}") + if (taskInfo.task.isInstanceOf[UnitTask]) { + throw new RuntimeException(s"Processing a completed UnitTask but it was not done! status: ${taskInfo.status}") + } + // Set it to succeeded as it no longer has predecessors + taskInfo.status = TaskStatus.SUCCEEDED } completeGraphNode(node, Some(taskInfo)) } @@ -326,21 +330,27 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de * * @return for each completed task, a map from a task identifier to a tuple of the exit code and the status of the `onComplete` method */ - private def updateCompletedTasks(): Map[TaskId, (Int, Boolean)] = { - val completedTasks: Map[TaskId, (Int, Boolean)] = taskExecutionRunner.completedTasks() - val emptyTasks = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS).filter(_.task.isInstanceOf[Task.EmptyTask]) - - emptyTasks.foreach { node => - node.taskInfo.status = TaskStatus.SUCCEEDED - processCompletedTask(node.taskId) - logger.debug("updateCompletedTasks: empty task [" + node.task.name + "] completed") - } - - completedTasks.keysIterator.foreach { taskId => - processCompletedTask(taskId) - val name = this(taskId).task.name - val status = this(taskId).taskInfo.status - logger.debug("updateCompletedTasks: task [" + name + "] completed with task status [" + status + "]") + private def updateCompletedTasks(): Seq[Task] = { + // Get the tasks that are not eligible to be scheduled/executed, and have no predecessors (can be completed). + // Developer note: + // A task that returns one or more "other" tasks in getTasks, and so are not executed by the `taskExecutionRunner`, + // are included only if the tasks returned are themselves completed. This is handed in `getTasks`, where we updated + // the predecessors of a task to include child tasks returned by the parent's `getTasks` call. The child will be + // removed as a predecessor when the child completes successfully. Therefore, the parent task will only have state + // `NO_PREDECESSORS` if all child tasks have `SUCCEEDED`. See the following methods: + // - `updatePredecessors`: removes the child as a predecessor of the parent when the child succeeds + // - `updateParentStartAndEndDates`: updates the parent start/end execution date based on the children's start/end date + val toCompleteTasks: Seq[Task] = graphNodesInStateFor(GraphNodeState.NO_PREDECESSORS) + .filterNot(_.task.isInstanceOf[UnitTask]) + .map(_.task) + .toIndexedSeq + // Get the tasks completed by the task executor + val completedTasks: Seq[Task] = taskExecutionRunner.completedTasks().keys.map(this.taskFor(_).get).toIndexedSeq + + // Set them to complete + (toCompleteTasks ++ completedTasks).foreach { task => + processCompletedTask(task.taskInfo.taskId) + logger.debug(f"updateCompletedTasks: unit-task [${task.name}] completed with task status [${task.taskInfo.status}]") } completedTasks @@ -378,7 +388,9 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de private def updatePredecessors(): Unit = { var hasMore = false graphNodesWithPredecessors.foreach { node => - node.predecessors.filter(p => p.state == GraphNodeState.COMPLETED && TaskStatus.isTaskDone(p.taskInfo.status, failedIsDone=false)).foreach(p => node.removePredecessor(p)) + node.predecessors + .filter(p => p.state == GraphNodeState.COMPLETED && TaskStatus.isTaskDone(p.taskInfo.status, failedIsDone=false)) + .foreach(node.removePredecessor) //logger.debug("updatePredecessors: examining task [" + node.task.name + "] for predecessors: " + node.hasPredecessor) // - if this node has already been expanded and now has no predecessors, then move it to the next state. // - if it hasn't been expanded and now has no predecessors, it should get expanded later @@ -562,17 +574,18 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de // get the running tasks to estimate currently used resources val runningTasks: Map[UnitTask, ResourceSet] = runningTasksMap - // get the tasks that are eligible for execution (tasks with no dependents) - val (emptyTasks: Seq[Task], readyTasks: Seq[Task]) = { - graphNodesInStateFor(NO_PREDECESSORS).map(_.task).toSeq.partition(_.isInstanceOf[Task.EmptyTask]) - } - logger.debug(s"stepExecution: found ${readyTasks.size} readyTasks tasks and ${emptyTasks.size} empty tasks") + // get the tasks that are eligible for execution (tasks with no dependents). UnitTasks should be scheduled for + // execution, while other tasks (such as EmptyTask) should be treated as though they are "RUNNING". + val (toScheduleTasks: Seq[UnitTask], readyToComplete: Seq[UnitTask]) = graphNodesInStateFor(NO_PREDECESSORS) + .map(_.task).toSeq.partition(_.isInstanceOf[UnitTask]) + logger.debug(s"stepExecution: found ${toScheduleTasks.size} toScheduleTasks tasks") + logger.debug(s"stepExecution: found ${readyToComplete.size} readyToComplete tasks") // get the list of tasks to schedule val tasksToSchedule: Map[UnitTask, ResourceSet] = if (!canDoAnything) Map.empty else { val tasks = scheduler.schedule( runningTasks = runningTasks, - readyTasks = readyTasks.filter(_.isInstanceOf[UnitTask]).map(_.asInstanceOf[UnitTask]), + readyTasks = toScheduleTasks, systemCores = taskManagerResources.cores, systemMemory = taskManagerResources.systemMemory, jvmMemory = taskManagerResources.jvmMemory @@ -599,10 +612,10 @@ class TaskManager(taskManagerResources: SystemResources = TaskManagerDefaults.de } ( - readyTasks, + toScheduleTasks, tasksToSchedule.keys, - runningTasks.keys ++ emptyTasks, - completedTasks.keys.map(taskId => this(taskId).task) + runningTasks.keys ++ readyToComplete, + completedTasks ) } diff --git a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala index 9b4c5419..85af9952 100644 --- a/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala +++ b/core/src/test/scala/dagr/core/execsystem/TaskManagerTest.scala @@ -1060,4 +1060,47 @@ class TaskManagerTest extends UnitSpec with OptionValues with LazyLogging with B info.status shouldBe TaskStatus.FAILED_COMMAND } } + + class TaskThatRunsAnotherTask(other: Option[Task] = None) extends Task { + override def getTasks: Iterable[_ <: Task] = other + } + + it should "support deeply nested tasks" in { + Seq(0, 1, 2).foreach { state => + // left branch ends with a shell command + val leftLeaf: Task = new ShellCommand("exit", "0") withName "left-leaf" + val leftMiddle: Task = new TaskThatRunsAnotherTask(other=Some(leftLeaf)) withName "left-middle" + val leftBranch: Task = new TaskThatRunsAnotherTask(other=Some(leftMiddle)) withName "left-branch" + + // right branch ends with a no-op + val rightLeaf: Task = new TaskThatRunsAnotherTask(other=None) withName "right-leaf" + val rightMiddle: Task = new TaskThatRunsAnotherTask(other=Some(rightLeaf)) withName "right-middle" + val rightBranch: Task = new TaskThatRunsAnotherTask(other=Some(rightMiddle)) withName "right-branch" + + val root: Task = Task.empty withName "root" + val taskManager: TestTaskManager = getDefaultTaskManager() + + val tasks: Seq[Task] = state match { + case 0 => // just the left branch + root ==> leftBranch + taskManager.addTasks(root, leftBranch) + Seq(root, leftLeaf, leftMiddle, leftBranch) + case 1 => // just the right branch + root ==> rightBranch + taskManager.addTasks(root, rightBranch) + Seq(root, rightLeaf, rightMiddle, rightBranch) + case 2 => // both branches + root ==> (leftBranch :: rightBranch) + taskManager.addTasks(root, leftBranch, rightBranch) + Seq(root, leftLeaf, leftMiddle, leftBranch, rightLeaf, rightMiddle, rightBranch) + case _ => throw new IllegalArgumentException("Bug") + } + taskManager.runToCompletion(failFast = true) + + tasks.foreach { task => + val info: TaskExecutionInfo = taskManager.taskExecutionInfoFor(task).value + info.status shouldBe TaskStatus.SUCCEEDED + } + } + } }