apache/airflow

KubernetesJobOperator in deferrable mode has race-condition problem

Open

#47,780 opened on Mar 14, 2025

View on GitHub
 (5 comments) (0 reactions) (0 assignees)Python (44,809 stars) (16,781 forks)batch import
area:providersgood first issuekind:bugpending-responseprovider:cncf-kubernetesstale

Description

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

See Cloud Composer 2 dependencies for composer-2.11.3-airflow-2.10.2

Kubernetes Version: apache-airflow-providers-cncf-kubernetes==10.1.0

Apache Airflow version

2.10.2

Operating System

Linux

Deployment

Google Cloud Composer

Deployment details

We access a Google Kubernetes Engine from Cloud Composer, both are in different VPC networks.

What happened

Starting KubernetesJobOperator in deferrable mode often-time causes race-condition issue in execute, when calling self.get_or_create_pod

def execute(self, context: Context):
        ...

        if self.pod is None:
            self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
                pod_request_obj=self.pod_request_obj,
                context=context,
            )

This method is implemented in the KubernetesPodOperator:

def get_or_create_pod(self, pod_request_obj: k8s.V1Pod, context: Context) -> k8s.V1Pod:
        if self.reattach_on_restart:
            pod = self.find_pod(pod_request_obj.metadata.namespace, context=context)
            if pod:
                return pod
        self.log.debug("Starting pod:\n%s", yaml.safe_dump(pod_request_obj.to_dict()))
        self.pod_manager.create_pod(pod=pod_request_obj)
        return pod_request_obj

The find_pod method returns an empty-list, since at the time of the call the Job has not created a Pod yet. This results in the creation of a second Pod, which does not have the correct template spec.

What you think should happen instead

In my opinion, there should be an additional wait time to allow the Job to create a Pod. This can easily be solved by overriding the get_or_create_pod method in KubernetesJobOperator:

def get_or_create_pod(self, pod_request_obj: V1Pod, context: Context) -> V1Pod:
        time.sleep(self.startup_timeout_seconds)
        return super().get_or_create_pod(pod_request_obj, context)

How to reproduce

Create a KubernetesPodOperator in deferrable mode

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide

KubernetesJobOperator in deferrable mode has race-condition problem · apache/airflow#47780 | Good First Issue