From 00e2a85c85a3a82d5e791cd34892216ceb384bf5 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 11 Oct 2024 09:20:46 +0100 Subject: [PATCH 1/3] Use different sessions in writing and deletion of RTIF Previously, this was how it was done, but now, a session was used for both the writing and deletion of RTIF, which we suspect caused StaleDataError. The related PR: https://github.com/apache/airflow/pull/38565 This PR brings back the old behaviour of using different sessions for writing/deleting RTIFs --- airflow/models/taskinstance.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 5b51bb0d24d..65972e6f6e4 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1631,13 +1631,12 @@ def _get_previous_ti( @internal_api_call -@provide_session -def _update_rtif(ti, rendered_fields, session: Session | None = None): +def _update_rtif(ti, rendered_fields): from airflow.models.renderedtifields import RenderedTaskInstanceFields rtif = RenderedTaskInstanceFields(ti=ti, render_templates=False, rendered_fields=rendered_fields) - RenderedTaskInstanceFields.write(rtif, session=session) - RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session) + RenderedTaskInstanceFields.write(rtif) + RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id) def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | TaskInstance, session: Session): From f9218ce957154e43a86025037fd5719e6f3a5700 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Fri, 11 Oct 2024 21:27:37 +0100 Subject: [PATCH 2/3] fixup! Use different sessions in writing and deletion of RTIF --- airflow/models/taskinstance.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 65972e6f6e4..27171662c59 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1631,12 +1631,14 @@ def _get_previous_ti( @internal_api_call -def _update_rtif(ti, rendered_fields): +@provide_session +def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION): from airflow.models.renderedtifields import RenderedTaskInstanceFields rtif = RenderedTaskInstanceFields(ti=ti, render_templates=False, rendered_fields=rendered_fields) - RenderedTaskInstanceFields.write(rtif) - RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id) + RenderedTaskInstanceFields.write(rtif, session=session) + session.commit() + RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session) def _coalesce_to_orm_ti(*, ti: TaskInstancePydantic | TaskInstance, session: Session): From b3b7c747803562ef89e6d0052810ddd18c454676 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 14 Oct 2024 08:42:50 +0100 Subject: [PATCH 3/3] add test and use flush --- airflow/models/taskinstance.py | 2 +- tests/models/test_renderedtifields.py | 50 ++++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py index 27171662c59..c1373e5d6a1 100644 --- a/airflow/models/taskinstance.py +++ b/airflow/models/taskinstance.py @@ -1637,7 +1637,7 @@ def _update_rtif(ti, rendered_fields, session: Session = NEW_SESSION): rtif = RenderedTaskInstanceFields(ti=ti, render_templates=False, rendered_fields=rendered_fields) RenderedTaskInstanceFields.write(rtif, session=session) - session.commit() + session.flush() RenderedTaskInstanceFields.delete_old_records(ti.task_id, ti.dag_id, session=session) diff --git a/tests/models/test_renderedtifields.py b/tests/models/test_renderedtifields.py index ea22d31871d..6ff87b28a89 100644 --- a/tests/models/test_renderedtifields.py +++ b/tests/models/test_renderedtifields.py @@ -24,13 +24,16 @@ from datetime import date, timedelta from unittest import mock +import pendulum import pytest +from sqlalchemy import select from airflow import settings from airflow.configuration import conf from airflow.decorators import task as task_decorator -from airflow.models import Variable +from airflow.models import DagRun, Variable from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF +from airflow.operators.python import PythonOperator from airflow.providers.standard.operators.bash import BashOperator from airflow.utils.task_instance_session import set_current_task_instance_session from airflow.utils.timezone import datetime @@ -386,3 +389,48 @@ def test_redact(self, redact, dag_maker): "env": "val 2", "cwd": "val 3", } + + @pytest.mark.skip_if_database_isolation_mode + def test_rtif_deletion_stale_data_error(self, dag_maker, session): + """ + Here we verify bad behavior. When we rerun a task whose RTIF + will get removed, we get a stale data error. + """ + with dag_maker(dag_id="test_retry_handling"): + task = PythonOperator( + task_id="test_retry_handling_op", + python_callable=lambda a, b: print(f"{a}\n{b}\n"), + op_args=[ + "dag {{dag.dag_id}};", + "try_number {{ti.try_number}};yo", + ], + ) + + def run_task(date): + run_id = f"abc_{date.to_date_string()}" + dr = session.scalar(select(DagRun).where(DagRun.execution_date == date, DagRun.run_id == run_id)) + if not dr: + dr = dag_maker.create_dagrun(execution_date=date, run_id=run_id) + ti = dr.task_instances[0] + ti.state = None + ti.try_number += 1 + session.commit() + ti.task = task + ti.run() + return dr + + base_date = pendulum.datetime(2021, 1, 1) + exec_dates = [base_date.add(days=x) for x in range(40)] + for date_ in exec_dates: + run_task(date=date_) + + session.commit() + session.expunge_all() + + # find oldest date + date = session.scalar( + select(DagRun.execution_date).join(RTIF.dag_run).order_by(DagRun.execution_date).limit(1) + ) + date = pendulum.instance(date) + # rerun the old date. this will fail + run_task(date=date)