apache/airflow

KPO pods can hang if sidecar never exits

Open

#58968 opened on Dec 2, 2025

View on GitHub
 (2 comments) (0 reactions) (0 assignees)Python (44,809 stars) (16,781 forks)batch import
area:providersgood first issuekind:bugprovider:cncf-kubernetes

Description

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

all versions

Apache Airflow version

all versions

Operating System

linux

Deployment

Astronomer

Deployment details

this can happen on any deployment type / executor as it's an issue with KPO.

What happened

tl;dr KPO sometimes hang around long after their corresponding TI has reached a terminal state.

sometimes the xcom sidecar never exits leaving zombie KPO pods around indefinitely. this could also happen if the user defines a pod with a sidecar that never exits and ignores signals.

A KubernetesPodOperator (KPO) pod is considered a zombie if any of the following apply:

  1. It doesn't match any active (non-terminal) task instance in the database
  2. The pod's try_number is less than the matching active task instance's current try_number (old retry pod)

What you think should happen instead

There should be some clean-up logic perhaps in the scheduler, instead of the operator clean up itself to force kill zombie KPO pods.

similar to that which cleans up Kubernetes Executor zombies (gated by the configurations below) AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE

  1. List all KPO pods: Uses Kubernetes API with label selector kubernetes_pod_operator=True
  2. Extract TI details from pods: For each pod, extracts normalized task instance details from labels (dag_id, task_id, run_id, map_index, try_number). Pod labels are already normalized by Airflow using make_safe_label_value()
  3. Query active task instances: Queries all active (non-terminal) task instances from the database in a single batch query, filtering for KubernetesPodOperator-related operators (KubernetesPodOperator, KubernetesPodOperatorAsync, SparkKubernetesPodOperator, etc., but excluding "external" EksPodOperator and GKEStartPodOperator)
  4. Normalize DB values to pod label format: Normalizes database values (dag_id, task_id, run_id, map_index) using make_safe_label_value() to match the format used in pod labels
  5. Exact matching: Compares normalized pod labels with normalized database values using exact matching
  6. Identify zombies: Pods that don't match any active task instance are considered zombies. Pods matching active TIs are checked for old retry scenarios
  7. Terminate zombies: Force deletes all identified zombie pods

How to reproduce

I've used this dag for testing this scenario

you can manually kill the zombie pod TIs

from airflow import __version__ as airflow_version
from airflow.decorators import dag
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from pendulum import datetime


def sleep_container(name: str, duration: int | str) -> k8s.V1Container:
    return k8s.V1Container(
        name=name,
        image="busybox:latest",
        command=["/bin/sh", "-c"],
        args=[f"sleep {duration}"],
    )


def healthy_pod_spec() -> k8s.V1PodSpec:
    return k8s.V1PodSpec(
        containers=[
            sleep_container("base", "20"),
        ],
        termination_grace_period_seconds=86400,
        restart_policy="Never",
    )


def zombie_pod_spec() -> k8s.V1PodSpec:
    return k8s.V1PodSpec(
        containers=[
            sleep_container("base", "5"),
            sleep_container("sidecar", "infinity"),
        ],
    )


@dag(
    dag_id="kpo_zombie_dag",
    schedule=None,
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["test", "kpo", "zombie"],
)
def kpo_zombie_dag():
    zombie_task = KubernetesPodOperator(
        task_id="zombie_kpo_task",
        full_pod_spec=k8s.V1Pod(
            metadata=k8s.V1ObjectMeta(),
            spec=zombie_pod_spec(),
        ),
    )
    healthy_task = KubernetesPodOperator(
        task_id="healthy_kpo_task",
        full_pod_spec=k8s.V1Pod(
            metadata=k8s.V1ObjectMeta(),
            spec=healthy_pod_spec(),
        ),
    )

    tasks = [zombie_task, healthy_task]

    # Create 3 mapped healthy KPO tasks (only for Airflow >= 2.3.0)
    # Mapped tasks were introduced in Airflow 2.3.0
    try:
        # Parse version string (e.g., "2.3.0" or "2.3.0+astro.1")
        version_parts = airflow_version.split("+")[0].split(".")
        major = int(version_parts[0])
        minor = int(version_parts[1]) if len(version_parts) > 1 else 0

        supports_mapped_tasks = major > 2 or (major == 2 and minor >= 3)

        if supports_mapped_tasks:
            # Use expand_kwargs to create 2 mapped task instances (map_index 0, 1)
            # one zombie and one healthy
            # Each dict in the list creates one mapped instance
            mapped_healthy_task = KubernetesPodOperator.partial(
                task_id="mapped_kpo_task",
            ).expand_kwargs(
                [
                    {
                        "full_pod_spec": k8s.V1Pod(
                            metadata=k8s.V1ObjectMeta(),
                            spec=zombie_pod_spec(),
                        ),
                    },
                    {
                        "full_pod_spec": k8s.V1Pod(
                            metadata=k8s.V1ObjectMeta(),
                            spec=healthy_pod_spec(),
                        ),
                    },
                ]
            )
            tasks.append(mapped_healthy_task)
    except (ValueError, IndexError):
        # If version parsing fails, skip mapped tasks to be safe
        pass

    return tasks


kpo_dag = kpo_zombie_dag()

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide