apache/airflow

BranchingOperator fails when using custom backend xcom deserialization with NoCredentialsError

Open

#50,491 opened on May 12, 2025

View on GitHub
 (4 comments) (1 reaction) (0 assignees)Python (44,809 stars) (16,781 forks)batch import
area:providersgood first issuekind:bugprovider:standard

Description

Apache Airflow version

2.10.5

If "Other Airflow 2 version" selected, which one?

No response

What happened?

I'm encountering an issue when deserializing XComs using a custom XCom backend that loads data from S3 using S3Hook(aws_conn_id='aws'). aws is the connection Id defined with the aws credentials. The same code is working for Airflow 2.10.4 but not anymore from 2.10.5 and further.

Despite having a valid AWS connection configured in Airflow (visible and working well in ALL other parts of the DAG), I get the following error during deserialization steps:

```botocore.exceptions.NoCredentialsError: Unable to locate credentials````

With the function:

hook = S3Hook(aws_conn_id='aws')
head_object = hook.head_object(key=key, bucket_name='bucket')

This error is not visible in the Airflow UI but I found it in the scheduler container's logs. The branching task is set as success but the following branched task is queued 'forever'. I assume this error is coming from the following task which is pulling a xcom.

After switching to manual session initialization using boto3.Session(...), I still have the same error... BUT it is working and I am settings the aws credentials directly inside the code, without trying to get the aws Connection Id. This is telling me that in the specific case of BranchingOperator, something is happening and the follwing task' context doesn't have the connection Id anymore.

What you think should happen instead?

The 'aws' connection is not resolved correctly anymore by the scheduler, even though it is defined in Airflow. The context might have changed?

How to reproduce

class CustomXComBackendPandas(BaseXCom):
    XCOM_PREFIX = "xcom_s3://"
    S3_PATH_PREFIX = "tmp"
    _BUCKET_NAME = ""
    DATAFRAME_EXTENSION = ".parquet"

@staticmethod
    def deserialize_value(result: BaseXCom) -> Any:
            hook = S3Hook(aws_conn_id='aws')
            head_object = hook.head_object(key='key', bucket_name='bucket')

Operating System

macOS 15.4.1 (Sonoma)

Versions of Apache Airflow Providers

No response

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide