Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deferred tasks get kill during heartbeat callback in some rare cases #40435

Open
2 tasks done
0x26res opened this issue Jun 26, 2024 · 6 comments
Open
2 tasks done

deferred tasks get kill during heartbeat callback in some rare cases #40435

0x26res opened this issue Jun 26, 2024 · 6 comments
Assignees
Labels
affected_version:2.8 Issues Reported for 2.8 area:Scheduler including HA (high availability) scheduler good first issue kind:bug This is a clearly a bug

Comments

@0x26res
Copy link
Contributor

0x26res commented Jun 26, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1 (MWAA)

What happened?

My airflow deferred (AWS) BatchOperator occasionally fail. When this happens I can see this:

{{local_task_job_runner.py:302}} WARNING - State of this instance has been externally set to deferred. Terminating instance.

I actually think it's an oversight in the code for local_task_job_runner. This line should check for state being RUNNING or DEFERRED.

What you think should happen instead?

No response

How to reproduce

This is hard to reproduce. The issue is very transient.

Operating System

MWAA

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.16.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

MWAA

Anything else?

I would say it happens one out of 100 run.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@0x26res 0x26res added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jun 26, 2024
@shahar1 shahar1 added area:Scheduler including HA (high availability) scheduler and removed needs-triage label for new issues that we didn't triage yet area:core labels Jun 27, 2024
@wolfier
Copy link
Contributor

wolfier commented Aug 7, 2024

I don't think a task in the deferred state should be counted as running. Once a task is deferred, it should exit as tasks do for other terminal states like success and failed. For the LocalTaskJobRunner to produce the message external set to deferred, the task instance must have been in the deferred state for at least scheduler_zombie_task_threshold seconds (see source) and the corresponding StandardTaskRunner did not produced a return code during that time. The source code provides a helpful note that I think is what happened.

# potential race condition, the _run_raw_task commits success or other state
# but task_runner does not exit right away due to slow process shutdown or any other reasons

The reporter also mentioned in another comment that their scheduler may not well resourced. However, this statement cannot be confirmed without the actual resource utilization graph.

Also, I have a question related to the underlying issue. I've noticed this happens if a lot of deferred tasks are submitted at once. The server I'm running this on is very small (2 CPU, little memory). I think this is means the timing of the heart beat is messed up by a straved CPU.

Regardless, it would beneficial to see what happened to the task after being marked as deferred. The task log should tell us the exact timestamp. Compare that with when the LocalTaskJobRunner terminated the task, we can at least have a clearer view of the task/process lifecycle. I also think the full task log would help the investigation as the terminal code may provide additional context.

I suspect the issue is something to do with the StandardTaskRunner failing to exit for some reason. Considering deferred as a "running" state would probably let the LocalTaskJobRunner and StandardTaskRunner run indefinitely.

@0x26res
Copy link
Contributor Author

0x26res commented Aug 7, 2024

Logs of a failed task:

[2024-08-03, 00:46:55 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 00:46:56 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 00:46:56 UTC] {{taskinstance.py:2170}} INFO - Starting attempt 1 of 1
[2024-08-03, 00:46:56 UTC] {{taskinstance.py:2191}} INFO - Executing <Task(BatchOperator): task_123> on 2024-08-02 00:15:00+00:00
[2024-08-03, 00:46:56 UTC] {{standard_task_runner.py:60}} INFO - Started process 26709 to run task
[2024-08-03, 00:46:57 UTC] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='eod_jobs' AIRFLOW_CTX_TASK_ID='task_123' AIRFLOW_CTX_EXECUTION_DATE='2024-08-02T00:15:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-08-02T00:15:00+00:00'
[2024-08-03, 00:46:57 UTC] {{batch.py:274}} INFO - Running AWS Batch job - job definition: some_job_definition - on queue some-queue
[2024-08-03, 00:46:57 UTC] {{batch.py:281}} INFO - AWS Batch job - container overrides: {'command': ['bash', '-c', 'python -m some_module.batch.task_123  --date=2024-08-02'], 'environment': []}
[2024-08-03, 00:46:57 UTC] {{base_aws.py:161}} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name=None). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-08-03, 00:46:59 UTC] {{batch.py:312}} INFO - AWS Batch job (JOB_UUID) started: REDACTED
[2024-08-03, 00:47:01 UTC] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=eod_jobs, task_id=task_123, execution_date=20240802T001500, start_date=20240803T004656
[2024-08-03, 00:47:02 UTC] {{base_aws.py:161}} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name=None). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-08-03, 00:47:07 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['STARTING']
[2024-08-03, 00:47:22 UTC] {{local_task_job_runner.py:302}} WARNING - State of this instance has been externally set to deferred. Terminating instance.
[2024-08-03, 00:47:23 UTC] {{process_utils.py:131}} INFO - Sending 15 to group 26709. PIDs of all processes in the group: [26709]
[2024-08-03, 00:47:24 UTC] {{process_utils.py:86}} INFO - Sending the signal 15 to group 26709
[2024-08-03, 00:47:37 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['STARTING']
[2024-08-03, 00:47:44 UTC] {{process_utils.py:79}} INFO - Process psutil.Process(pid=26709, status='terminated', exitcode=100, started='00:46:56') (26709) terminated with exit code 100
[2024-08-03, 00:48:07 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['STARTING']
[2024-08-03, 00:48:37 UTC] {{waiter_with_logging.py:126}} ERROR - Failure while running batch job JOB_UUID: ['FAILED']
[2024-08-03, 00:56:04 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 00:56:04 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 00:56:04 UTC] {{taskinstance.py:2168}} INFO - Resuming after deferral
[2024-08-03, 00:56:04 UTC] {{taskinstance.py:2191}} INFO - Executing <Task(BatchOperator): task_123> on 2024-08-02 00:15:00+00:00
[2024-08-03, 00:56:04 UTC] {{standard_task_runner.py:60}} INFO - Started process 27106 to run task
[2024-08-03, 00:56:04 UTC] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'eod_jobs', 'task_123', 'scheduled__2024-08-02T00:15:00+00:00', '--job-id', '36971', '--raw', '--subdir', 'DAGS_FOLDER/eod_jobs.py', '--cfg-path', '/tmp/tmpf1bgxn6c']
[2024-08-03, 00:56:04 UTC] {{standard_task_runner.py:88}} INFO - Job 36971: Subtask task_123
[2024-08-03, 00:56:05 UTC] {{baseoperator.py:1598}} ERROR - Trigger failed:
Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/utils/waiter_with_logging.py", line 123, in async_wait
    await waiter.wait(**args, WaiterConfig={"MaxAttempts": 1})

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/waiter.py", line 49, in wait
    return await AIOWaiter.wait(self, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/waiter.py", line 126, in wait
    raise WaiterError(

botocore.exceptions.WaiterError: Waiter batch_job_complete failed: Waiter encountered a terminal failure state: For expression "jobs[].status" all members matched excepted path: "FAILED"


During handling of the above exception, another exception occurred:


Traceback (most recent call last):

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers
    result = details["task"].result()
             ^^^^^^^^^^^^^^^^^^^^^^^^

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 601, in run_trigger
    async for event in trigger.run():

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/base.py", line 143, in run
    await async_wait(

  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/utils/waiter_with_logging.py", line 127, in async_wait
    raise AirflowException(f"{failure_message}: {error}")

airflow.exceptions.AirflowException: Failure while running batch job JOB_UUID: Waiter batch_job_complete failed: Waiter encountered a terminal failure state: For expression "jobs[].status" all members matched excepted path: "FAILED"

[2024-08-03, 00:56:05 UTC] {{taskinstance.py:2698}} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1599, in resume_execution
    raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
airflow.exceptions.TaskDeferralError: Trigger failure
[2024-08-03, 00:56:05 UTC] {{taskinstance.py:1138}} INFO - Marking task as FAILED. dag_id=eod_jobs, task_id=task_123, execution_date=20240802T001500, start_date=20240803T004656, end_date=20240803T005605
[2024-08-03, 00:56:05 UTC] {{logging_mixin.py:188}} WARNING - /usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/slack/hooks/slack_webhook.py:42 UserWarning: You cannot override the default channel (chosen by the user who installed your app), username, or icon when you're using Incoming Webhooks to post messages. Instead, these values will always inherit from the associated Slack app configuration. See: https://api.slack.com/messaging/webhooks#advanced_message_formatting. It is possible to change this values only in Legacy Slack Integration Incoming Webhook: https://api.slack.com/legacy/custom-integrations/messaging/webhooks#legacy-customizations
[2024-08-03, 00:56:06 UTC] {{standard_task_runner.py:107}} ERROR - Failed to execute job 36971 for task task_123 (Trigger failure; 27106)
[2024-08-03, 00:56:07 UTC] {{local_task_job_runner.py:234}} INFO - Task exited with return code 1
[2024-08-03, 00:56:07 UTC] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check

Logs of a good task (same task after retry):

[2024-08-03, 10:35:19 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 10:35:20 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 10:35:20 UTC] {{taskinstance.py:2170}} INFO - Starting attempt 2 of 2
[2024-08-03, 10:35:20 UTC] {{taskinstance.py:2191}} INFO - Executing <Task(BatchOperator): task_123> on 2024-08-02 00:15:00+00:00
[2024-08-03, 10:35:20 UTC] {{standard_task_runner.py:60}} INFO - Started process 3875 to run task
[2024-08-03, 10:35:20 UTC] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'eod_jobs', 'task_123', 'scheduled__2024-08-02T00:15:00+00:00', '--job-id', '37770', '--raw', '--subdir', 'DAGS_FOLDER/eod_jobs.py', '--cfg-path', '/tmp/tmpev95ge4j']
[2024-08-03, 10:35:20 UTC] {{standard_task_runner.py:88}} INFO - Job 37770: Subtask task_123
[2024-08-03, 10:35:20 UTC] {{taskinstance.py:2480}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='eod_jobs' AIRFLOW_CTX_TASK_ID='task_123' AIRFLOW_CTX_EXECUTION_DATE='2024-08-02T00:15:00+00:00' AIRFLOW_CTX_TRY_NUMBER='2' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-08-02T00:15:00+00:00'
[2024-08-03, 10:35:20 UTC] {{batch.py:274}} INFO - Running AWS Batch job - job definition: some_job_definition - on queue some-queue
[2024-08-03, 10:35:20 UTC] {{batch.py:281}} INFO - AWS Batch job - container overrides: {'command': ['bash', '-c', 'python -m some_module.batch.task_123  --date=2024-08-02'], 'environment': []}
[2024-08-03, 10:35:20 UTC] {{base_aws.py:161}} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name=None). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-08-03, 10:35:21 UTC] {{batch.py:312}} INFO - AWS Batch job (JOB_UUID) started: REDACTED
[2024-08-03, 10:35:21 UTC] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=eod_jobs, task_id=task_123, execution_date=20240802T001500, start_date=20240803T103519
[2024-08-03, 10:35:21 UTC] {{local_task_job_runner.py:231}} INFO - Task exited with return code 100 (task deferral)
[2024-08-03, 10:35:22 UTC] {{base_aws.py:161}} INFO - No connection ID provided. Fallback on boto3 credential strategy (region_name=None). See: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/configuration.html
[2024-08-03, 10:35:22 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['RUNNABLE']
[2024-08-03, 10:35:53 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['STARTING']
[2024-08-03, 10:36:23 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['STARTING']
[2024-08-03, 10:36:53 UTC] {{waiter_with_logging.py:129}} INFO - Batch job JOB_UUID not ready yet: ['RUNNING']
[2024-08-03, 10:37:23 UTC] {{triggerer_job_runner.py:602}} INFO - Trigger eod_jobs/scheduled__2024-08-02T00:15:00+00:00/task_123/-1/2 (ID 5992) fired: TriggerEvent<{'status': 'success', 'job_id': 'JOB_UUID'}>
[2024-08-03, 10:37:28 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 10:37:28 UTC] {{taskinstance.py:1956}} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: eod_jobs.task_123 scheduled__2024-08-02T00:15:00+00:00 [queued]>
[2024-08-03, 10:37:28 UTC] {{taskinstance.py:2168}} INFO - Resuming after deferral
[2024-08-03, 10:37:28 UTC] {{taskinstance.py:2191}} INFO - Executing <Task(BatchOperator): task_123> on 2024-08-02 00:15:00+00:00
[2024-08-03, 10:37:28 UTC] {{standard_task_runner.py:60}} INFO - Started process 3913 to run task
[2024-08-03, 10:37:28 UTC] {{standard_task_runner.py:87}} INFO - Running: ['airflow', 'tasks', 'run', 'eod_jobs', 'task_123', 'scheduled__2024-08-02T00:15:00+00:00', '--job-id', '37775', '--raw', '--subdir', 'DAGS_FOLDER/eod_jobs.py', '--cfg-path', '/tmp/tmpzry41zjk']
[2024-08-03, 10:37:28 UTC] {{standard_task_runner.py:88}} INFO - Job 37775: Subtask task_123
[2024-08-03, 10:37:29 UTC] {{batch.py:262}} INFO - Job completed.
[2024-08-03, 10:37:29 UTC] {{taskinstance.py:1138}} INFO - Marking task as SUCCESS. dag_id=eod_jobs, task_id=task_123, execution_date=20240802T001500, start_date=20240803T103519, end_date=20240803T103729
[2024-08-03, 10:37:29 UTC] {{local_task_job_runner.py:234}} INFO - Task exited with return code 0
[2024-08-03, 10:37:29 UTC] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check

@0x26res
Copy link
Contributor Author

0x26res commented Aug 7, 2024

@wolfier thanks for the detailed answer. I've put the logs, I hopes it helps.

For context our deferred task submit jobs to AWS batch and wait for completion. I don't suspect the submission would block for more than 300 seconds / scheduler_zombie_task_threshold

But I know for fact this issue happens when a lot of these deferred tasks start at the same time.

@wolfier
Copy link
Contributor

wolfier commented Aug 7, 2024

I'll start with the successful task / trigger execution. The task exited right away with the return code. The trigger fired as expected after the conditions are met.

[2024-08-03, 10:35:20 UTC] {{standard_task_runner.py:60}} INFO - Started process 3875 to run task
...
[2024-08-03, 10:35:21 UTC] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=eod_jobs, task_id=task_123, execution_date=20240802T001500, start_date=20240803T103519
[2024-08-03, 10:35:21 UTC] {{local_task_job_runner.py:231}} INFO - Task exited with return code 100 (task deferral)
...
[2024-08-03, 10:37:23 UTC] {{triggerer_job_runner.py:602}} INFO - Trigger eod_jobs/scheduled__2024-08-02T00:15:00+00:00/task_123/-1/2 (ID 5992) fired: TriggerEvent<{'status': 'success', 'job_id': 'JOB_UUID'}>

The failed task instance did not exit therefore does not have a return code. Given the logs, the reported behaviour is then expected.

[2024-08-03, 00:46:56 UTC] {{standard_task_runner.py:60}} INFO - Started process 26709 to run task
...
[2024-08-03, 00:47:01 UTC] {{taskinstance.py:2344}} INFO - Pausing task as DEFERRED. dag_id=eod_jobs, task_id=task_123, execution_date=20240802T001500, start_date=20240803T004656
...
[2024-08-03, 00:47:23 UTC] {{process_utils.py:131}} INFO - Sending 15 to group 26709. PIDs of all processes in the group: [26709]
[2024-08-03, 00:47:24 UTC] {{process_utils.py:86}} INFO - Sending the signal 15 to group 26709
[2024-08-03, 00:47:44 UTC] {{process_utils.py:79}} INFO - Process psutil.Process(pid=26709, status='terminated', exitcode=100, started='00:46:56') (26709) terminated with exit code 100

Surprisingly, the exit code was 100.

We know that two consecutive calls of heartbeat_callback saw the task_runner return code is None and the task instance state was in deferred starting at 00:47:01. We do not know what happened for 22 seconds.

I suspect one of the following happened:

  • The execute function did complete and return_code is set to 100. However, the process has not exited. It may have gotten stuck at os._exit(return_code). When the process was forced to exit via reaping which calls os.killpg(process_group_id, sig) , the returncode for the process was 100.
  • The execute function did not complete. Not sure where it could be stuck.

@0x26res
Copy link
Contributor Author

0x26res commented Aug 7, 2024

@wolfier does each deferred task start a new process? It wouldn't surprised me, if there are a lot of deferred tasks starting/running at the same time that the overhead of each process causes some of them to be killed because OMM / lack of memory.

Does the scheduler throttle / limit the number of deferred tasks that are running at the same time?

@wolfier
Copy link
Contributor

wolfier commented Aug 7, 2024

does each deferred task start a new process?

That's correct. Each task execution will spawn a new process. A deferrable operator task will be executed twice. Once to submit the trigger to the triggerer and another time to process the trigger event.

The process should be fairly short running though as you can see with the successful attempt.

I am not sure how MWAA is setup but the triggerer and worker should be running in two different execution spaces. This means the number of concurrent triggers should not affect the worker.

Does the scheduler throttle / limit the number of deferred tasks that are running at the same time?

The scheduler does not but the triggerer does have a limit of triggers per triggerer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
affected_version:2.8 Issues Reported for 2.8 area:Scheduler including HA (high availability) scheduler good first issue kind:bug This is a clearly a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants