Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BeamRunPythonPipelineOperator doesn't push xcom until Pipeline completes, leaving DataflowSensors worthless #30007

Closed
1 of 2 tasks
CYarros10 opened this issue Mar 9, 2023 · 12 comments

Comments

@CYarros10
Copy link
Contributor

CYarros10 commented Mar 9, 2023

Apache Airflow version

2.5.1

What happened

BeamRunPythonPipelineOperator does not push values to xcoms when the pipeline starts. But Dataflow Sensors work like this:

        discover_cancelling_jobs = DataflowJobStatusSensor(
            task_id="discover_cancelling_jobs",
            job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
            expected_statuses={DataflowJobStatus.JOB_STATE_CANCELLING},
            location="{{region}}",
            mode="poke"
        )

Since the only way to retrieve Dataflow Job ID from a BeamRunPythonPipelineOperator is through xcom, and BeamRunPythonPipelineOperator does not push this xcom until the pipeline ends, the Sensor can't "sense". It will only be able to read jobs that are done.

Error Message:

jinja2.exceptions.UndefinedError: 'None' has no attribute 'dataflow_job_config'

BeamRunPythonPipelineOperator Xcom (after completing):


dataflow_job_config | {'project_id': '<project>', 'region': '<region>', 'job_id': '2023-03-09_13_59_37-825481577571358868'}
-- | --
return_value | {'dataflow_job_id': '2023-03-09_13_59_37-825481577571358868'}

What you think should happen instead

The dataflow Job ID should be pushed to xcom when/before the pipeline starts.

How to reproduce

Sample Code

 # -------------------------------------------------------------------------
    # Dataflow
    # -------------------------------------------------------------------------

    with TaskGroup(group_id="dataflow_tg1") as dataflow_tg1:


        start_python_job = BeamRunPythonPipelineOperator(
            task_id="start_python_job",
            runner="DataflowRunner",
            py_file="gs://{{gcs_download_bucket}}/{{df_python_script}}",
            py_options=[],
            pipeline_options={
                "output": "gs://{{gcs_download_bucket}}/dataflow_output",
            },
            py_requirements=["apache-beam[gcp]==2.36.0"],
            py_interpreter="python3",
            py_system_site_packages=False,
            dataflow_config={
                "job_name": "{{df_job}}-python",
                "wait_until_finished": False,
            },
        )

        start_python_job_async = BeamRunPythonPipelineOperator(
            task_id="start_python_job_async",
            runner="DataflowRunner",
            py_file="gs://{{gcs_download_bucket}}/{{df_python_script}}",
            py_options=[],
            pipeline_options={
                "output": "gs://{{gcs_download_bucket}}/dataflow_output",
            },
            py_requirements=["apache-beam[gcp]==2.36.0"],
            py_interpreter="python3",
            py_system_site_packages=False,
            dataflow_config={
                "job_name": "{{df_job}}-aysnc",
                "wait_until_finished": False,
            },
        )

        start_template_job = DataflowTemplatedJobStartOperator(
            task_id="start_template_job",
            job_name="{{df_job}}-template",
            project_id="{{ project_id }}",
            template="gs://dataflow-templates/latest/Word_Count",
            parameters={"inputFile": "gs://{{gcs_download_bucket}}/{{gcs_download_obj}}", "output": "gs://{{gcs_download_bucket}}/dataflow_output"},
            location="{{region}}",
        )


        wait_for_python_job_async_done = DataflowJobStatusSensor(
            task_id="wait_for_python_job_async_done",
            job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
            expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
            location="{{region}}",
            mode="reschedule",
            poke_interval=60
        )

        def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
            """Check is metric greater than equals to given value."""

            def callback(metrics) -> bool:
                dag.log.info("Looking for '%s' >= %d", metric_name, value)
                for metric in metrics:
                    context = metric.get("name", {}).get("context", {})
                    original_name = context.get("original_name", "")
                    tentative = context.get("tentative", "")
                    if original_name == "Service-cpu_num_seconds" and not tentative:
                        return metric["scalar"] >= value
                raise AirflowException(f"Metric '{metric_name}' not found in metrics")

            return callback

        wait_for_python_job_async_metric = DataflowJobMetricsSensor(
            task_id="wait_for_python_job_async_metric",
            job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}", # this doesnt work
            location="{{region}}",
            callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
            fail_on_terminal_state=False,
            mode="reschedule",
            poke_interval=60
        )


        def check_autoscaling_event(autoscaling_events) -> bool:
            """Check autoscaling event"""
            for autoscaling_event in autoscaling_events:
                if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
                    return True
            return False

        wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
            task_id="wait_for_python_job_async_autoscaling_event",
            job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}", # this doesnt work
            location="{{region}}",
            callback=check_autoscaling_event,
            fail_on_terminal_state=False,
            mode="reschedule",
            poke_interval=60
        )

        stop_python_job = DataflowStopJobOperator(
            task_id="stop_python_dataflow_job",
            location="{{region}}",
            job_name_prefix="{{task_instance.xcom_pull('start_python_job')['dataflow_job_config']['job_id']}}",
        )

        stop_template_job = DataflowStopJobOperator(
            task_id="stop_dataflow_job",
            location="{{region}}",
            job_name_prefix="{{df_job}}-template",
        )

        stop_async_job = DataflowStopJobOperator(
            task_id="stop_async_job",
            location="{{region}}",
            job_name_prefix="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",
        )

        start_python_job >> stop_python_job
    
        start_template_job >> stop_template_job

        start_python_job_async >> stop_async_job

        wait_for_python_job_async_metric

        wait_for_python_job_async_autoscaling_event
        
        wait_for_python_job_async_done

Operating System

composer-2.1.5-airflow-2.4.3

Versions of Apache Airflow Providers

2.4.3

Deployment

Google Cloud Composer

Deployment details

No response

Anything else

Occurs every time

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@CYarros10 CYarros10 added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Mar 9, 2023
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 9, 2023

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@josh-fell josh-fell added area:providers good first issue and removed area:core needs-triage label for new issues that we didn't triage yet labels Mar 17, 2023
@josh-fell
Copy link
Contributor

Agreed. dataflow_job_id should be pushed to XCom as early as its known in not only BeanRunPythonPipelineOperator but also the Java and Go versions of the operator as well.

@hubert-pietron
Copy link
Contributor

Can I take care of this issue?

@josh-fell
Copy link
Contributor

@hubert-pietron Sure thing, all yours!

@hubert-pietron
Copy link
Contributor

I need to unassigned myself, currently by the change of work I do not have time to look into the problem :/

@hubert-pietron hubert-pietron removed their assignment May 14, 2023
@githubwua
Copy link

githubwua commented May 16, 2023

job_id is stored in:
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",

It is not stored in:
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_config']['job_id']}}",

Code Reference:

self.dataflow_job_id = job_id

If you modify your code to retrieve dataflow job_id correctly, you will be able to retrieve it.

To illustrate how this is done, here is a sample code on how to retrieve Dataflow job id:

https://airflow.apache.org/docs/apache-airflow-providers-apache-beam/1.0.0/_modules/airflow/providers/apache/beam/example_dags/example_beam.html

@CYarros10
Copy link
Contributor Author

CYarros10 commented May 16, 2023

Have you tested this? the documentation is inconsistent and not reliable to solely go off of. for example, documentation you referenced states:

      wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
        task_id="wait-for-python-job-async-done",
        job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
        expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
        project_id=GCP_PROJECT_ID,
        location='us-central1',
    )

and dataflow_job_id is not actually in the xcom

@githubwua
Copy link

You are right. I was asked to take a look at this issue, and didn't have a chance to read the issue description in detail.
I was only checking successful runs so I was able to get the dataflow job ids.

The dataflow job id is indeed only available after a Dataflow job finishes successfully.
It is not available when a Dataflow job starts or while it is running.

In a perfect world where no issue occurs, this is fine, but in the real world, when a Dataflow job gets cancelled, there is no job id to track the cancelled Dataflow job.

@zeotuan
Copy link
Contributor

zeotuan commented Sep 21, 2023

I can take on this issue

@josh-fell
Copy link
Contributor

@zeotuan All yours!

Copy link

github-actions bot commented Oct 7, 2024

This issue has been automatically marked as stale because it has been open for 365 days without any activity. There has been several Airflow releases since last activity on this issue. Kindly asking to recheck the report against latest Airflow version and let us know if the issue is reproducible. The issue will be closed in next 30 days if no further activity occurs from the issue author.

olegkachur-e pushed a commit to VladaZakharova/airflow that referenced this issue Oct 10, 2024
- To let GCP Beam Sensor operators 'sense' the pipeline changes,
by having dataflow job_id been xcom_push as soon as it available.

Related issue: apache#30007.
olegkachur-e pushed a commit to VladaZakharova/airflow that referenced this issue Oct 13, 2024
- To let GCP Beam Sensor operators 'sense' the pipeline changes,
by having dataflow job_id been xcom_push as soon as it available.

Related issue: apache#30007.
potiuk pushed a commit that referenced this issue Oct 14, 2024
#42982)

- To let GCP Beam Sensor operators 'sense' the pipeline changes,
by having dataflow job_id been xcom_push as soon as it available.

Related issue: #30007.

Co-authored-by: Oleg Kachur <[email protected]>
pavansharma36 pushed a commit to pavansharma36/airflow that referenced this issue Oct 14, 2024
apache#42982)

- To let GCP Beam Sensor operators 'sense' the pipeline changes,
by having dataflow job_id been xcom_push as soon as it available.

Related issue: apache#30007.

Co-authored-by: Oleg Kachur <[email protected]>
@olegkachur-e
Copy link
Contributor

Fixed in #42982, the data flow job got pushed to the xcom, as soon as it available, and can be retrieved like job_id="{{task_instance.xcom_pull('start_python_job_async', key="dataflow_job_id)}}"

P.S. Can you close the issue please? @CYarros10 @josh-fell

@potiuk potiuk closed this as completed Oct 17, 2024
R7L208 pushed a commit to R7L208/airflow that referenced this issue Oct 17, 2024
apache#42982)

- To let GCP Beam Sensor operators 'sense' the pipeline changes,
by having dataflow job_id been xcom_push as soon as it available.

Related issue: apache#30007.

Co-authored-by: Oleg Kachur <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
8 participants