Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BeamRunPythonPipelineOperator doesn't work with Google Application Default Credentials ADC #42396

Open
1 of 2 tasks
fpopic opened this issue Sep 21, 2024 · 3 comments
Open
1 of 2 tasks

Comments

@fpopic
Copy link
Contributor

fpopic commented Sep 21, 2024

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.6.3 (problem occurs in latest version as well, will try to download latest and post log as well)

What happened?

When manually submitting an Apache Beam Python job to Google Dataflow runner using BeamRunPythonPipelineOperator

  • without specifying as operator parameter BeamRunPythonPipelineOperator.pipeline_options.service_account_name
  • without having any Google Auth explicit environment variable set GOOGLE_APPLICATION_CREDENTIALS
  • without having any Google Auth explicit environment variable set GCP_PROJECT
  • with previously executing Google Auth gcloud auth application-default login --disable-quota-project
  • with previously executing Google Auth gcloud auth login
  • with previously executing Google Auth gcloud config set project <project>
  • with using default airflow gcp connection google_cloud_default with following content
    {
      "conn_type": "google_cloud_platform", 
      "extra": {
        "scope": "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/drive,https://www.googleapis.com/auth/bigquery", 
        "project": "<project>", 
        "num_retries": 5
      }
    }
  • with specifying BeamRunPythonPipelineOperator.DataflowConfiguration.project_id

Task gets stuck in apitools that use oauth2client

and try initiative browser Google sign-in which must fail. I don't understand why the authentication flow ends in that execution branch since the credential of type authorized_userexist in the well-known path ~/.config/gcloud/application_default_credentials.json.

[2024-09-21, 21:05:58 UTC] {taskinstance.py:1328} INFO - Executing <Task(BeamRunPythonPipelineOperator): submit_beam_job> on 2022-01-01 00:00:00+00:00
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:57} INFO - Started process 2105 to run task
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:84} INFO - Running: ['airflow', 'tasks', 'run', 'dag-example-beam-dataflow-python', 'submit_beam_job', 'scheduled__2022-01-01T00:00:00+00:00', '--job-id', '716', '--raw', '--subdir', 'DAGS_FOLDER/dag_example_beam_dataflow_python/dag_example_beam_dataflow_python.py', '--cfg-path', '/tmp/tmprcjz83eb']
[2024-09-21, 21:05:58 UTC] {standard_task_runner.py:85} INFO - Job 716: Subtask submit_beam_job
[2024-09-21, 21:05:58 UTC] {task_command.py:414} INFO - Running <TaskInstance: dag-example-beam-dataflow-python.submit_beam_job scheduled__2022-01-01T00:00:00+00:00 [running]> on host 9743bdb39e14
[2024-09-21, 21:05:58 UTC] {taskinstance.py:1547} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='dag-example-beam-dataflow-python' AIRFLOW_CTX_TASK_ID='submit_beam_job' AIRFLOW_CTX_EXECUTION_DATE='2022-01-01T00:00:00+00:00' AIRFLOW_CTX_TRY_NUMBER='3' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2022-01-01T00:00:00+00:00'
[2024-09-21, 21:05:58 UTC] {crypto.py:83} WARNING - empty cryptography key - values will not be stored encrypted.

[2024-09-21, 21:05:58 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-09-21, 21:05:58 UTC] {beam.py:198} INFO - {'job_name': 'simple-beam-job-7d346a20', 'project': 'XXX', 'region': 'europe-west1', 'labels': {'airflow-version': 'v2-6-3-composer'}}
[2024-09-21, 21:05:59 UTC] {base.py:73} INFO - Using connection ID 'google_cloud_default' for task execution.
[2024-09-21, 21:05:59 UTC] {credentials_provider.py:353} INFO - Getting connection using `google.auth.default()` since no explicit credentials are provided.
[2024-09-21, 21:05:59 UTC] {logging_mixin.py:150} WARNING - /opt/python3.8/lib/python3.8/site-packages/google/auth/_default.py:78 UserWarning: Your application has authenticated using end user credentials from Google Cloud SDK without a quota project. You might receive a "quota exceeded" or "API not enabled" error. See the following page for troubleshooting: https://cloud.google.com/docs/authentication/adc-troubleshooting/user-creds.

[2024-09-21, 21:06:05 UTC] {beam.py:271} INFO - Beam version: 2.50.0
[2024-09-21, 21:06:05 UTC] {beam.py:131} INFO - Running command: python3 /home/airflow/gcs/dags/dag_example_beam_dataflow_python/src/beam/job_example_beam_dataflow_python.py --runner=DataflowRunner --job_name=simple-beam-job-7d346a20 --project=XXX --region=europe-west1 --labels=airflow-version=v2-6-3-composer --worker_machine_type=n1-standard-1 --disk_size_gb=10 --num_workers=1
[2024-09-21, 21:06:05 UTC] {beam.py:142} INFO - Start waiting for Apache Beam process to complete.
[2024-09-21, 21:06:07 UTC] {beam.py:113} INFO - 0

[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - Generating new OAuth credentials ...
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - Your browser has been opened to visit:
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -     https://accounts.google.com/o/oauth2/v2/auth?client_id=XXXX.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8090%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute.readonly+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email&access_type=offline&response_type=code
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - If your browser is on a different machine then exit and re-run this
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO - application with the command-line parameter
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -   --noauth_local_webserver
[2024-09-21, 21:06:10 UTC] {beam.py:113} INFO -

What you think should happen instead?

Airflow should submit job to Dataflow using Application Default Credentials the same way standalone Apache Beam Python (without Airflow) submits the job to Dataflow does.

I see that Apache Beam already solved that problem apache/beam#15004, hence running Apache Beam Python without Airflow using ADC works.

How to reproduce

Prepare env. variables:

unset GOOGLE_APPLICATION_CREDENTIALS
unset GCP_PROJECT
gcloud auth application-default login
gcloud config set project <project>

and execute the following DAG

# -*- coding: utf-8 -*-
import os
from datetime import datetime
from airflow.models import DAG

from airflow.providers.apache.beam.operators.beam import (
    BeamRunPythonPipelineOperator,
)
from airflow.operators.empty import EmptyOperator
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowConfiguration,
)

current_path = os.path.dirname(__file__)

with DAG(
    dag_id="dag-example-beam-dataflow-python-adc",
    default_args={"owner": "airflow"},
    start_date=datetime(2024, 1, 1),
    schedule_interval="@once",
    catchup=True,
) as dag:
    start_dag = EmptyOperator(task_id="start_dag")
    end_dag = EmptyOperator(task_id="end_dag")

    submit_beam_job = BeamRunPythonPipelineOperator(
        task_id="submit_beam_job_with_dataflow_using_adc",
        py_file=os.path.join("job_example_beam_dataflow_python_adc.py"),
        runner="DataflowRunner",
        pipeline_options={
            "temp_location": "<bucket>",
            "staging_location":  "<bucket>",
        },
        dataflow_config=DataflowConfiguration(
            job_name="submit_beam_job_with_dataflow_using_adc",
            project_id="<project_id>",
            location="<location>",
            wait_until_finished=True,
        ),
        do_xcom_push=True,
    )

    start_dag >> submit_beam_job >> end_dag

and Apache Beam job source code job_example_beam_dataflow_python_adc.py

# -*- coding: utf-8 -*-
import argparse
import logging
import apache_beam as beam
from apache_beam import Create
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions


class PrintElementDoFn(beam.DoFn):
    def process(self, element, *args, **kwargs):
        print(f"Processing element {element}.")


def run(argv=None):
    parser = argparse.ArgumentParser()

    known_args, pipeline_args = parser.parse_known_args(argv)
    print(known_args.sleep)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    p = beam.Pipeline(options=pipeline_options)

    p | "create dummy events" >> Create([1]) | "print dummy elements" >> beam.ParDo(
        PrintElementDoFn()
    )

    p.run()


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.DEBUG)
    run()

Operating System

macOS 14.7

Versions of Apache Airflow Providers

apache-airflow-providers-apache-beam==5.3.0

Deployment

Docker Airflow / Composer Airflow (doesn't matter, problem occurs in latest version as well).

Deployment details

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@fpopic fpopic added area:core kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Sep 21, 2024
@fpopic fpopic changed the title BeamRunPythonPipelineOperator doesn't work with Application Default Credentials BeamRunPythonPipelineOperator doesn't work with Google Application Default Credentials ADC Sep 22, 2024
@Lee-W Lee-W removed the needs-triage label for new issues that we didn't triage yet label Oct 4, 2024
@Lee-W
Copy link
Member

Lee-W commented Oct 4, 2024

Hi @fpopic , as you checked "Yes I am willing to submit a PR!", I'll assign this to you. But please let us know if you no longer interested in it. Thanks!

@fpopic
Copy link
Contributor Author

fpopic commented Oct 4, 2024

I removed the checkmark, would appreciate help.

@fpopic
Copy link
Contributor Author

fpopic commented Oct 20, 2024

Adding minimal change to latest setup to reproduce

In docker-compose.yml passing host ADC config with write permission (to refresh host token from within container)

x-airflow-common:
  &airflow-common
  #...
  build: .
  #image: ...
  environment:
    &airflow-common-env
    # ...
    AIRFLOW__LOGGING__LOGGING_LEVEL: 'WARNING'
    AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT: '{ "conn_type": "google_cloud_platform", "extra": { "scope": "https://www.googleapis.com/auth/cloud-platform,https://www.googleapis.com/auth/drive,https://www.googleapis.com/auth/bigquery",  "num_retries": 0 } }'
    GOOGLE_CLOUD_PROJECT: <my-project>

  volumes: &airflow-common-volumes
    # ...
    # Use ADC
    - ~/.config/gcloud/:/home/airflow/.config/gcloud:rw
...

In Dockerfile install google cloud SDK (gcloud command) since google base hook requires gcloud CLI it:
https:/apache/airflow/blob/2.10.2/airflow/providers/google/common/hooks/base_google.py#L638-L669

# specifying explicit python 3.9 otherwise gcloud sdk installation complains
FROM apache/airflow:2.10.2-python3.9
USER root

## COPY FROM https:/apache/airflow/blob/main/docs/docker-stack/docker-images-recipes/gcloud.Dockerfile
ARG CLOUD_SDK_VERSION=322.0.0
ENV GCLOUD_HOME=/home/airflow/google-cloud-sdk
ENV PATH="${GCLOUD_HOME}/bin/:${PATH}"

USER airflow

RUN DOWNLOAD_URL="https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-${CLOUD_SDK_VERSION}-linux-x86_64.tar.gz" \
    && TMP_DIR="$(mktemp -d)" \
    && curl -fL "${DOWNLOAD_URL}" --output "${TMP_DIR}/google-cloud-sdk.tar.gz" \
    && mkdir -p "${GCLOUD_HOME}" \
    && tar xzf "${TMP_DIR}/google-cloud-sdk.tar.gz" -C "${GCLOUD_HOME}" --strip-components=1 \
    && "${GCLOUD_HOME}/install.sh" \
       --bash-completion=false \
       --path-update=false \
       --usage-reporting=false \
       --additional-components alpha beta kubectl \
       --quiet \
    && rm -rf "${TMP_DIR}" \
    && rm -rf "${GCLOUD_HOME}/.install/.backup/" \
    && gcloud --version
# END OF COPY

# pip install airflow has to be run with USER airflow
RUN pip install 'apache-airflow==2.10.2' \
    apache-airflow[google_auth] \
    apache-airflow-providers-google \
    apache-airflow[google] \
    apache-airflow-providers-apache-beam
    #apache-airflow-providers-hashicorp \

RUN mkdir -p /home/airflow/.config/gcloud/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants
@potiuk @fpopic @Lee-W and others