apache/airflow

BranchingOperator fails when using custom backend xcom deserialization with NoCredentialsError

Open

#50,491 建立於 2025年5月12日

在 GitHub 查看
 (4 留言) (1 反應) (0 負責人)Python (16,781 fork)batch import
area:providersgood first issuekind:bugprovider:standard

倉庫指標

Star
 (44,809 star)
PR 合併指標
 (平均合併 7天 18小時) (30 天內合併 834 個 PR)

描述

Apache Airflow version

2.10.5

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I'm encountering an issue when deserializing XComs using a custom XCom backend that loads data from S3 using S3Hook(aws_conn_id='aws'). aws is the connection Id defined with the aws credentials. The same code is working for Airflow 2.10.4 but not anymore from 2.10.5 and further.

Despite having a valid AWS connection configured in Airflow (visible and working well in ALL other parts of the DAG), I get the following error during deserialization steps:

```botocore.exceptions.NoCredentialsError: Unable to locate credentials````

With the function:

hook = S3Hook(aws_conn_id='aws')
head_object = hook.head_object(key=key, bucket_name='bucket')

This error is not visible in the Airflow UI but I found it in the scheduler container's logs. The branching task is set as success but the following branched task is queued 'forever'. I assume this error is coming from the following task which is pulling a xcom.

After switching to manual session initialization using boto3.Session(...), I still have the same error... BUT it is working and I am settings the aws credentials directly inside the code, without trying to get the aws Connection Id. This is telling me that in the specific case of BranchingOperator, something is happening and the follwing task' context doesn't have the connection Id anymore.

What you think should happen instead?

The 'aws' connection is not resolved correctly anymore by the scheduler, even though it is defined in Airflow. The context might have changed?

How to reproduce

class CustomXComBackendPandas(BaseXCom):
    XCOM_PREFIX = "xcom_s3://"
    S3_PATH_PREFIX = "tmp"
    _BUCKET_NAME = ""
    DATAFRAME_EXTENSION = ".parquet"

@staticmethod
    def deserialize_value(result: BaseXCom) -> Any:
            hook = S3Hook(aws_conn_id='aws')
            head_object = hook.head_object(key='key', bucket_name='bucket')

Operating System

macOS 15.4.1 (Sonoma)

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

貢獻者指南