apache/airflow

`DataprocCreateBatchOperator` and `BigQueryInsertJobOperator` imports result in `DagBag timeout error`

Open

#62373 opened on Feb 23, 2026

View on GitHub
 (6 comments) (0 reactions) (0 assignees)Python (44,809 stars) (16,781 forks)batch import
area:providersgood first issuekind:bugprovider:google

Description

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

No response

Apache Airflow version

3.1.5

Operating System

Linux

Deployment

Astronomer

Deployment details

No response

What happened

When running the DAG below (shown in the "how to reproduce" text box), a DagBag timeout error was thrown. This occurred when running on Astronomer A5 (1 vCPU and 2 GiB) workers. When bumping up the Worker Size, this issue was resolved. However, imports for these two Operators were still taking well over 30 seconds.

What you think should happen instead

Imports do not cause a DagBag timeout error, even on small Workers.

How to reproduce

Running this DAG and checking the logging statements in the Task Logs.

import logging
import time

start_time = time.perf_counter()

import pendulum

from airflow.decorators import dag
from airflow.decorators import task


logger = logging.getLogger(__name__)

logger.info(f"DAG_PERF_CHECK 1: {time.perf_counter()-start_time}")

# executed on Astro  Worker Type = A5.  0-10 Workers, Concurrency: 5
# -------------------------------------------
# when following 2 imports are commented out:
# INFO - DAG_PERF_CHECK 1: 0.00933
# INFO - DAG_PERF_CHECK 2: 0.00963
# INFO - DAG_PERF_CHECK 3: 0.01018
# -------------------------------------------
# same dag with following 2 imports included:
# some tasks fail with AirflowTaskTimeout: DagBag import timeout for [.../test_dag.py] after 30.0s
# and ones that succeed look have values like below:
# INFO - DAG_PERF_CHECK 1:  0.08950
# INFO - DAG_PERF_CHECK 2: 26.09278
# INFO - DAG_PERF_CHECK 3: 26.09359
# -------------------------------------------
# same as above, but  Worker Type = A10.  0-10 Workers, Concurrency: 5
# all tasks succeed.
# INFO - DAG_PERF_CHECK 1: 0.00706
# INFO - DAG_PERF_CHECK 2: 15.74727
# INFO - DAG_PERF_CHECK 3: 15.74815

from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

logger.info(f"DAG_PERF_CHECK 2: {time.perf_counter()-start_time}")

from airflow.operators.empty import EmptyOperator

logger.info(f"DAG_PERF_CHECK 3: {time.perf_counter()-start_time}")


@dag(
    schedule="*/10 * * * *",
    start_date=pendulum.datetime(2026, 2, 1),
    catchup=False,
    max_active_tasks=16,
)
def dag_performance_check_gcp():

    @task
    def dummy_task(num: int) -> int:
        return num


    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")

    start >> dummy_task() >> end



dag_performance_check_gcp()

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide