Description
Apache Airflow Provider(s)
cncf-kubernetes
Versions of Apache Airflow Providers
all versions
Apache Airflow version
all versions
Operating System
linux
Deployment
Astronomer
Deployment details
this can happen on any deployment type / executor as it's an issue with KPO.
What happened
tl;dr KPO sometimes hang around long after their corresponding TI has reached a terminal state.
sometimes the xcom sidecar never exits leaving zombie KPO pods around indefinitely. this could also happen if the user defines a pod with a sidecar that never exits and ignores signals.
A KubernetesPodOperator (KPO) pod is considered a zombie if any of the following apply:
- It doesn't match any active (non-terminal) task instance in the database
- The pod's
try_numberis less than the matching active task instance's currenttry_number(old retry pod)
What you think should happen instead
There should be some clean-up logic perhaps in the scheduler, instead of the operator clean up itself to force kill zombie KPO pods.
similar to that which cleans up Kubernetes Executor zombies (gated by the configurations below) AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE
- List all KPO pods: Uses Kubernetes API with label selector
kubernetes_pod_operator=True - Extract TI details from pods: For each pod, extracts normalized task instance details from labels (
dag_id,task_id,run_id,map_index,try_number). Pod labels are already normalized by Airflow usingmake_safe_label_value() - Query active task instances: Queries all active (non-terminal) task instances from the database in a single batch query, filtering for KubernetesPodOperator-related operators (KubernetesPodOperator, KubernetesPodOperatorAsync, SparkKubernetesPodOperator, etc., but excluding "external" EksPodOperator and GKEStartPodOperator)
- Normalize DB values to pod label format: Normalizes database values (
dag_id,task_id,run_id,map_index) usingmake_safe_label_value()to match the format used in pod labels - Exact matching: Compares normalized pod labels with normalized database values using exact matching
- Identify zombies: Pods that don't match any active task instance are considered zombies. Pods matching active TIs are checked for old retry scenarios
- Terminate zombies: Force deletes all identified zombie pods
How to reproduce
I've used this dag for testing this scenario
you can manually kill the zombie pod TIs
from airflow import __version__ as airflow_version
from airflow.decorators import dag
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from kubernetes.client import models as k8s
from pendulum import datetime
def sleep_container(name: str, duration: int | str) -> k8s.V1Container:
return k8s.V1Container(
name=name,
image="busybox:latest",
command=["/bin/sh", "-c"],
args=[f"sleep {duration}"],
)
def healthy_pod_spec() -> k8s.V1PodSpec:
return k8s.V1PodSpec(
containers=[
sleep_container("base", "20"),
],
termination_grace_period_seconds=86400,
restart_policy="Never",
)
def zombie_pod_spec() -> k8s.V1PodSpec:
return k8s.V1PodSpec(
containers=[
sleep_container("base", "5"),
sleep_container("sidecar", "infinity"),
],
)
@dag(
dag_id="kpo_zombie_dag",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["test", "kpo", "zombie"],
)
def kpo_zombie_dag():
zombie_task = KubernetesPodOperator(
task_id="zombie_kpo_task",
full_pod_spec=k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=zombie_pod_spec(),
),
)
healthy_task = KubernetesPodOperator(
task_id="healthy_kpo_task",
full_pod_spec=k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=healthy_pod_spec(),
),
)
tasks = [zombie_task, healthy_task]
# Create 3 mapped healthy KPO tasks (only for Airflow >= 2.3.0)
# Mapped tasks were introduced in Airflow 2.3.0
try:
# Parse version string (e.g., "2.3.0" or "2.3.0+astro.1")
version_parts = airflow_version.split("+")[0].split(".")
major = int(version_parts[0])
minor = int(version_parts[1]) if len(version_parts) > 1 else 0
supports_mapped_tasks = major > 2 or (major == 2 and minor >= 3)
if supports_mapped_tasks:
# Use expand_kwargs to create 2 mapped task instances (map_index 0, 1)
# one zombie and one healthy
# Each dict in the list creates one mapped instance
mapped_healthy_task = KubernetesPodOperator.partial(
task_id="mapped_kpo_task",
).expand_kwargs(
[
{
"full_pod_spec": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=zombie_pod_spec(),
),
},
{
"full_pod_spec": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=healthy_pod_spec(),
),
},
]
)
tasks.append(mapped_healthy_task)
except (ValueError, IndexError):
# If version parsing fails, skip mapped tasks to be safe
pass
return tasks
kpo_dag = kpo_zombie_dag()
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct