Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow assume task context is serialized with Pydantic models #42485

Open
1 of 2 tasks
wolfier opened this issue Sep 25, 2024 · 3 comments
Open
1 of 2 tasks

Airflow assume task context is serialized with Pydantic models #42485

wolfier opened this issue Sep 25, 2024 · 3 comments
Labels
area:core area:Triggerer good first issue pending-response stale Stale PRs per the .github/workflows/stale.yml policy file

Comments

@wolfier
Copy link
Contributor

wolfier commented Sep 25, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.10.2

What happened?

The triggerer terminates when attempting to deserialize the task context dictionary.

[2024-09-25T13:25:57.492-0500] {triggerer_job_runner.py:338} INFO - Starting the triggerer
[2024-09-25T13:25:57.629-0500] {triggerer_job_runner.py:348} ERROR - Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 346, in _execute
    self._run_trigger_loop()
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 374, in _run_trigger_loop
    self.load_triggers()
  File "/usr/local/lib/python3.12/site-packages/airflow/traces/tracer.py", line 58, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 400, in load_triggers
    self.trigger_runner.update_triggers(set(ids))
  File "/usr/local/lib/python3.12/site-packages/airflow/jobs/triggerer_job_runner.py", line 719, in update_triggers
    new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
                                           ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 94, in kwargs
    return self._decrypt_kwargs(self.encrypted_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/models/trigger.py", line 130, in _decrypt_kwargs
    return BaseSerialization.deserialize(decrypted_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 831, in deserialize
    return {k: cls.deserialize(v, use_pydantic_models) for k, v in var.items()}
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 821, in deserialize
    d[k] = cls.deserialize(v, use_pydantic_models=True)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 803, in deserialize
    raise RuntimeError(
RuntimeError: Setting use_pydantic_models = True requires AIP-44 (in progress) feature flag to be true. This parameter will be removed eventually when new serialization is used by AIP-44

The issue started appearing when I upgraded from Airflow 2.9.3 to Airflow 2.10.2.

Do note that I have a custom trigger where I am serializing the task context.

class CustomTrigger(BaseTrigger):
...
    def serialize(self) -> tuple[str, dict[str, Any]]:
        """
        Serializes the trigger's arguments and classpath.

        :return: A tuple containing the classpath and a dictionary of arguments.
        """
        return (
            "CustomTrigger",
            {
                "context": self.context,
            },
        )

What you think should happen instead?

I believe the deserialize operation should not be forcing use_pydantic_models to be true.

Instead, it should be using the value passed as a parameter.

                d[k] = cls.deserialize(v, use_pydantic_models)

Also, when the task context is being serialized, it is respecting the value passed to the serialized function.

How to reproduce

  1. Create a custom Trigger that serializes the task context
  2. Create a deferrable operator that uses the custom trigger
  3. Run the task

Operating System

n/a

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@wolfier wolfier added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 25, 2024
@gopidesupavan
Copy link
Collaborator

Currently context is not supported in trigger's, if you need any values from the context object , extract and send as key-->values.

@gopidesupavan gopidesupavan added pending-response and removed kind:bug This is a clearly a bug labels Oct 3, 2024
@potiuk
Copy link
Member

potiuk commented Oct 4, 2024

Yeah. We might want to handle it better - so better error message should be printed in this case. Marked it as good-first-issue.

@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Oct 4, 2024
Copy link

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Oct 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core area:Triggerer good first issue pending-response stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

No branches or pull requests

3 participants