Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Find a solution to handle the execution timeout in Triggers #32638

Open
1 task done
hussein-awala opened this issue Jul 16, 2023 · 12 comments · May be fixed by #32990 or #33718
Open
1 task done

Find a solution to handle the execution timeout in Triggers #32638

hussein-awala opened this issue Jul 16, 2023 · 12 comments · May be fixed by #32990 or #33718
Assignees
Labels
area:Triggerer kind:meta High-level information important to the community

Comments

@hussein-awala
Copy link
Member

Body

Currently, when we run tasks/sensors in defferable mode, the execution timeout is ignored where it is a TaskInstance property, and the Trigger doesn't use and handle it.

IMO we should update the Trigger execution logic to take this parameter into account, and stop the execution once the timeout is reached.

To achieve that, we need:

  • create an async TimeoutPosix class
  • pass the execution timeout parameter to the Trigger class (probably update the table model)
  • update the Triggerer method which executes Triggers to handle execution timeout

related: #32580

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@hussein-awala hussein-awala added kind:meta High-level information important to the community area:Triggerer labels Jul 16, 2023
@hussein-awala hussein-awala self-assigned this Jul 16, 2023
@shubham22
Copy link

@hussein-awala - are you working on this? If not, @vandonr-amz could look into it during this week.

@hussein-awala
Copy link
Member Author

@shubham22 yes I'm working on this, I already tried different approaches such as using asyncio.wait_for, but I didn't work. Currently I'm trying to register the triggers TTL in a list/dict, and create a loop which check this list every x seconds to cancel the expired triggers.

@vandonr-amz
Copy link
Contributor

But triggers already have a timeout mechanism, couldn't we reuse that logic to enforce the task timeout as well ?

@hussein-awala
Copy link
Member Author

But triggers already have a timeout mechanism.

The current mechanism is not an implementation for timeout (time-based), but it's based on an extra parameter called max_attempts, which tells the trigger how many attempts it should try before failing. For example, when we configure it with max_attempts=300, the maximum execution time for this trigger will be 300 * (sleep time + execution time). The execution time is not negligible, as in most cases, it involves a call to an external API, and we could experience some latency during this call.

Additionally, the trigger could be adopted by another triggerer if we lose the one that runs the trigger. Since the consumed attempts are not synchronized with the Metadata, it will restart from the beginning with each run.

What I'm trying to do is:

  • Run the task until deferring it.
  • Calculate the time when we should stop the task because of a timeout -> start_date + timeout duration.
  • Create the trigger and provide the timeout timestamp.
  • Run the trigger in a triggerer.
  • In the triggerer, there will be a loop that checks if one of the timeouts is already reached to cancel the asyncio task.
  • The cancellation exception will be caught and the task will be marked as failed because of a timeout.

What are the benefits of this mechanism?

  • No need to implement it explicitly in each Trigger.
  • The task timeout is respected as the normal task.
  • No need to handle the timeout in the task, as it will be caught by Airflow jobs.

What do you think about this solution?

cc: @potiuk @pankajastro @syedahsn

@vandonr-amz
Copy link
Contributor

vandonr-amz commented Jul 31, 2023

Ha yes indeed, I was mistaken, I was thinking about the timeout parameter on defer

timeout: timedelta | None = None,

Which kindda is a timeout for triggers, except it's defined when we defer and not in the object. Not too sure how that's handled underneath.

@hussein-awala
Copy link
Member Author

Yes, even for this mechanism, we need to add the timeout explicitly and it's different from task timeout in normal mode.

Currently we have this method:

@provide_session
def check_trigger_timeouts(self, session: Session = NEW_SESSION) -> None:
"""Mark any "deferred" task as failed if the trigger or execution timeout has passed."""
num_timed_out_tasks = session.execute(
update(TI)
.where(
TI.state == TaskInstanceState.DEFERRED,
TI.trigger_timeout < timezone.utcnow(),
)
.values(
state=TaskInstanceState.SCHEDULED,
next_method="__fail__",
next_kwargs={"error": "Trigger/execution timeout"},
trigger_id=None,
)
).rowcount
if num_timed_out_tasks:
self.log.info("Timed out %i deferred tasks without fired triggers", num_timed_out_tasks)

which change the state of the tasks to scheduled and change the next method to __fail__.

The value __fail__ is handled in runtime:

if self.next_method:
# __fail__ is a special signal value for next_method that indicates
# this task was scheduled specifically to fail.
if self.next_method == "__fail__":
next_kwargs = self.next_kwargs or {}
traceback = self.next_kwargs.get("traceback")
if traceback is not None:
self.log.error("Trigger failed:\n%s", "\n".join(traceback))
raise TaskDeferralError(next_kwargs.get("error", "Unknown"))

And the Trigger is canceled by the triggerer when it detects that the task is not in deffered state anymore.

I'm trying to reuse this mechanism and improve it by implementing what I explained before.

@robg-eb
Copy link

robg-eb commented Aug 16, 2023

@hussein-awala - This may be somewhat related to what you're working on, but there are a few Async Sensors that DO respect timeout, like BigQueryTableExistenceAsyncSensor, but then they have a different problem in that the async sensor acts differently than sync sensors do with respect to how long they wait for timeout.

Take these two:

    check_for_data = BigQueryTableExistenceSensor(
        task_id="check_for_data",
        project_id=GCP_PROJECT,
        dataset_id="searchconsole",
        table_id="searchdata_url_impression_robtest",
        timeout=60,
        retries=3,
    )

    check_for_data_async = BigQueryTableExistenceAsyncSensor(
        task_id="check_for_data_async",
        project_id=GCP_PROJECT,
        dataset_id="searchconsole",
        table_id="searchdata_url_impression_robtest",
        timeout=60,
        retries=3,
    )

These are two Sensors with exact same configuration params, only difference is that one is the Async sensor that is supposed to be a drop-in replacement for the Sync sensor. When you let this run, looking for a table that it doesnt find, the Sync sensor times out as it should after 60 seconds. The Async sensor retries 3 times and ends up waiting a total of (60 seconds * 4 attempts) + (retry wait interval) * 3. That is not expected. We'd want that retries don't occur past the total timeout interval for Async, the same way it works for Sync. Is that something you're incorporating into your update?

@hussein-awala
Copy link
Member Author

the Sync sensor times out as it should after 60 seconds. The Async sensor retries 3 times and ends up waiting a total of (60 seconds * 4 attempts) + (retry wait interval) * 3. That is not expected.

This is the main issue I'm trying to fix in my PR which is almost ready. However I'm trying to find a solution for providers which are running with an older version of Airflow.

The issue is that currently we raise TaskDeferralError when the trigger is timeout, and this exception is not processed by Airflow as a timeout exception. There is another issue; we need to define explicitly the trigger timeout, where Airflow could calculating it from the task start_date as it does for sync sensors.

@robg-eb
Copy link

robg-eb commented Aug 16, 2023

Thank you so much for working on this @hussein-awala ! I agree with the problem statement - that TaskDeferralError is not treated as a timeout exception, so it still gets retries applied. I had even raised something similar in Airflow Slack here , but did not want to raise an issue here until we got upgraded to Airflow 2.6.2 first. Seems you beat me to it!

@dstandish
Copy link
Contributor

Currently, when we run tasks/sensors in defferable mode, the execution timeout is ignored where it is a TaskInstance property, and the Trigger doesn't use and handle it.

Is this actually true? i tried this locally and it seems that execution_timeout is respected even when the task is in a state of deferral. is the actual problem just that, with sensor tasks, there are retries when there shouldn't be?

by the way, if the user doesn't want retries, why doesn't the user just set retries to 0?

@dstandish
Copy link
Contributor

by the way, if the user doesn't want retries, why doesn't the user just set retries to 0?

i guess the issue is you want retries if it fails for some reason other than sensor out of time...

@hussein-awala
Copy link
Member Author

Is this actually true? i tried this locally and it seems that execution_timeout is respected even when the task is in a state of deferral. is the actual problem just that, with sensor tasks, there are retries when there shouldn't be?

No, after diving in the code, I found that the problem is with the callback of the execution timeout and the sensor timeout which is completely ignored.

by the way, if the user doesn't want retries, why doesn't the user just set retries to 0?
i guess the issue is you want retries if it fails for some reason other than sensor out of time...

Exactly, the sensor timeout is applied on the overall time and not only the retry duration.

@dstandish dstandish linked a pull request Aug 25, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Triggerer kind:meta High-level information important to the community
Projects
None yet
5 participants