apache/airflow
View on GitHub`DataprocCreateBatchOperator` and `BigQueryInsertJobOperator` imports result in `DagBag timeout error`
Open
#62373 opened on Feb 23, 2026
area:providersgood first issuekind:bugprovider:google
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
No response
Apache Airflow version
3.1.5
Operating System
Linux
Deployment
Astronomer
Deployment details
No response
What happened
When running the DAG below (shown in the "how to reproduce" text box), a DagBag timeout error was thrown. This occurred when running on Astronomer A5 (1 vCPU and 2 GiB) workers. When bumping up the Worker Size, this issue was resolved. However, imports for these two Operators were still taking well over 30 seconds.
What you think should happen instead
Imports do not cause a DagBag timeout error, even on small Workers.
How to reproduce
Running this DAG and checking the logging statements in the Task Logs.
import logging
import time
start_time = time.perf_counter()
import pendulum
from airflow.decorators import dag
from airflow.decorators import task
logger = logging.getLogger(__name__)
logger.info(f"DAG_PERF_CHECK 1: {time.perf_counter()-start_time}")
# executed on Astro Worker Type = A5. 0-10 Workers, Concurrency: 5
# -------------------------------------------
# when following 2 imports are commented out:
# INFO - DAG_PERF_CHECK 1: 0.00933
# INFO - DAG_PERF_CHECK 2: 0.00963
# INFO - DAG_PERF_CHECK 3: 0.01018
# -------------------------------------------
# same dag with following 2 imports included:
# some tasks fail with AirflowTaskTimeout: DagBag import timeout for [.../test_dag.py] after 30.0s
# and ones that succeed look have values like below:
# INFO - DAG_PERF_CHECK 1: 0.08950
# INFO - DAG_PERF_CHECK 2: 26.09278
# INFO - DAG_PERF_CHECK 3: 26.09359
# -------------------------------------------
# same as above, but Worker Type = A10. 0-10 Workers, Concurrency: 5
# all tasks succeed.
# INFO - DAG_PERF_CHECK 1: 0.00706
# INFO - DAG_PERF_CHECK 2: 15.74727
# INFO - DAG_PERF_CHECK 3: 15.74815
from airflow.providers.google.cloud.operators.dataproc import DataprocCreateBatchOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
logger.info(f"DAG_PERF_CHECK 2: {time.perf_counter()-start_time}")
from airflow.operators.empty import EmptyOperator
logger.info(f"DAG_PERF_CHECK 3: {time.perf_counter()-start_time}")
@dag(
schedule="*/10 * * * *",
start_date=pendulum.datetime(2026, 2, 1),
catchup=False,
max_active_tasks=16,
)
def dag_performance_check_gcp():
@task
def dummy_task(num: int) -> int:
return num
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
start >> dummy_task() >> end
dag_performance_check_gcp()
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct