apache/airflow

Cloudwatch remote logging does not work for ECS Executor

Open

#66475 opened on May 6, 2026

View on GitHub
 (13 comments) (1 reaction) (0 assignees)Python (44,809 stars) (16,781 forks)batch import
area:logginggood first issuekind:bugprovider:amazon

Description

Under which category would you file this issue?

Task SDK

Apache Airflow version

3.1.6

What happened and how to reproduce it?

I have just switched from CeleryExecutor to AWSECSExecutor. Remote logging worked just fine under the CeleryExecutor. I have not changed any configuration regarding remote logging and the new task definition has the very same settings and environment variables as the previous worker. Even the very same Docker image.

{     "timestamp": "2026-05-05T19:45:34.865349Z",     "level": "info",     "event": "Executing workload",     "workload": "ExecuteTask(token='ey123', ti=TaskInstance(id=UUID('019df9ab-4492-7e33-a50f-d63ba5e69a7e'), dag_version_id=UUID('019df35c-b3dd-7352-875f-15a8fda74161'), task_id='execute_clear_tokens_command', dag_id='clear_tokens', run_id='manual__2026-05-05T19:44:02+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={'traceparent': '00-3ac0d9bb09e7b23afad9ff7e24c5cf45-496565643a83fe9c-01'}, context_carrier={'traceparent': '00-3ac0d9bb09e7b23afad9ff7e24c5cf45-3fd301e67f5f5aa3-01'}), dag_rel_path=PurePosixPath('management_commands/clear_tokens.py'), bundle_info=BundleInfo(name='dags-folder', version=None), log_path='dag_id=clear_tokens/run_id=manual__2026-05-05T19:44:02+00:00/task_id=execute_clear_tokens_command/attempt=1.log', type='ExecuteTask')",     "logger": "__main__",     "filename": "execute_workload.py",     "lineno": 56 }
{     "timestamp": "2026-05-05T19:45:34.865902Z",     "level": "info",     "event": "Connecting to server:",     "server": "https://my-server/execution/",     "logger": "__main__",     "filename": "execute_workload.py",     "lineno": 64 }
{     "timestamp": "2026-05-05T19:45:34.937261Z",     "level": "info",     "event": "Failed parsing the json for key role_arn",     "logger": "airflow.models.connection.Connection",     "filename": "connection.py",     "lineno": 248 }
{     "timestamp": "2026-05-05T19:45:34.938875Z",     "level": "info",     "event": "Secrets backends loaded for worker",     "count": 1,     "backend_classes": [         "EnvironmentVariablesBackend"     ],     "logger": "supervisor",     "filename": "supervisor.py",     "lineno": 1975 }
{     "timestamp": "2026-05-05T19:45:35.062342Z",     "level": "warning",     "event": "Received message after logging system shutdown",     "category": "WatchtowerWarning",     "filename": "/bin/app/lib/python3.12/site-packages/watchtower/__init__.py",     "lineno": 464,     "logger": "py.warnings" }
{     "timestamp": "2026-05-05T19:45:35.059730Z",     "level": "info",     "event": "DAG bundles loaded: dags-folder",     "logger": "airflow.dag_processing.bundles.manager.DagBundlesManager",     "filename": "supervisor.py",     "lineno": 1806 }
{     "timestamp": "2026-05-05T19:45:35.060682Z",     "level": "info",     "event": "Filling up the DagBag from /opt/airflow/dags/management_commands/clear_tokens.py",     "logger": "airflow.models.dagbag.DagBag",     "filename": "supervisor.py",     "lineno": 1806 }
{     "timestamp": "2026-05-05T19:46:16.720326Z",     "level": "info",     "event": "Successfully deleted 0 tokens",     "logger": "task.stdout",     "filename": "supervisor.py",     "lineno": 1821 }
{     "timestamp": "2026-05-05T19:46:16.718629Z",     "level": "info",     "event": "Done. Returned value was: None",     "logger": "airflow.task.operators.dags.custom_operators.DjangoOperator",     "filename": "supervisor.py",     "lineno": 1806 }
{     "timestamp": "2026-05-05T19:46:21.894417Z",     "level": "info",     "event": "Task finished",     "task_instance_id": "019df9ab-4492-7e33-a50f-d63ba5e69a7e",     "exit_code": 0,     "duration": 46.96043953400002,     "final_state": "success",     "logger": "supervisor",     "filename": "supervisor.py",     "lineno": 1995 }

The logs are never written to the corresponding cloudwatch group and therefore, the webserver fails fetching the dag logs.

What you think should happen instead?

To send the logs to cloudwatch and show up in the UI.

Operating System

AWS ECS Fargate

envs

Key Type Value
ACCESS_LOG_LEVEL value WARNING
AIRFLOW__API__BASE_URL value https://airflow.staging.example.com
AIRFLOW__API__SECRET_KEY valueFrom arn:aws:ssm:eu-central-1:123456789012:parameter/airflowApiSecretK-def5678
AIRFLOW__API_AUTH__JWT_SECRET valueFrom arn:aws:ssm:eu-central-1:123456789012:parameter/airflowApiAuthJwt-ghi9012
AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP value DISABLED
AIRFLOW__AWS_ECS_EXECUTOR__CHECK_HEALTH_ON_STARTUP value False
AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER value airflow-ecs-cluster-aaa1111
AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME value airflow-executor-container-definition
AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE value FARGATE
AIRFLOW__AWS_ECS_EXECUTOR__REGION_NAME value eu-central-1
AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS value sg-0aabbccddee112233
AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS value subnet-0aaaa1111bbbb2222,subnet-0cccc3333dddd4444
AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION value airflow-executor
AIRFLOW__CORE__AUTH_MANAGER value airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
AIRFLOW__CORE__DEFAULT_TIMEZONE value Europe/Berlin
AIRFLOW__CORE__EXECUTOR value airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor
AIRFLOW__CORE__FERNET_KEY valueFrom arn:aws:ssm:eu-central-1:123456789012:parameter/airflowCoreFernet-jkl3456
AIRFLOW__CORE__LOAD_EXAMPLES value false
AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL value 600
AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL value 600
AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL value 600
AIRFLOW__DAG_PROCESSOR__PARSING_PROCESSES value 2
AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_ENABLED value false
AIRFLOW__LOGGING__ENCRYPT_S3_LOGS value false
AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS value dags.settings.LOGGING_CONFIG
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER value cloudwatch://arn:aws:logs:eu-central-1:123456789012:log-group:airflow-dags-logs-bbb2222
AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID value logs_cloudwatch
AIRFLOW__LOGGING__REMOTE_LOGGING value true
AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT value False
AIRFLOW__SCHEDULER__SCHEDULER_IDLE_SLEEP_TIME value 5
AIRFLOW__TRACES__OTEL_HOST value localhost
AIRFLOW__TRACES__OTEL_ON value true
AIRFLOW__TRACES__OTEL_PORT value 4318
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE value Europe/Berlin
AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS value True
AIRFLOW__WEBSERVER__SKIP_TRIGGER_FORM_IF_NO_PARAM value False
AIRFLOW_CONN_ECS_FARGATE value aws://?role_arn=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fairflow-ecs-role-ccc3333
AIRFLOW_CONN_LOGS_CLOUDWATCH value aws://?role_arn=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fairflow-cloudwatch-role-ddd4444
AIRFLOW_HOST value airflow.staging.example.com
AIRFLOW_REST_API_PASSWORD valueFrom arn:aws:ssm:eu-central-1:123456789012:parameter/airflow-rest-api-password-eee5555
AIRFLOW_REST_API_ROLE value Admin
AIRFLOW_REST_API_USER value user

airflow-cloudwatch-role-ddd4444

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents",
                "logs:CreateLogGroup"
            ],
            "Effect": "Allow",
            "Resource": "arn:aws:logs:eu-central-1:923046849141:log-group:airflow-dags-logs-bbb2222*"
        }
    ]
}

settings.py

from __future__ import annotations

import os
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.configuration import conf
from copy import deepcopy
from typing import TYPE_CHECKING
from urllib.parse import urlsplit

if TYPE_CHECKING:
    from airflow.logging.remote import RemoteLogIO
    from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO

LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)

LOGGING_CONFIG['handlers']['console'] = {
    'class': 'logging.StreamHandler',
}

LOGGING_CONFIG['formatters']['default'] = {'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'}

LOGGING_CONFIG['loggers']['zeep.xsd.types.simple'] = {  # Especially needed for the MaStR api calls.
    'handlers': ['console'],
    'level': 'CRITICAL',
}

# From Airflow's config template:
# https://github.com/apache/airflow/blob/e965c2e676d85ced65a485d4b2601addc2fd3e97/airflow-core/src/airflow/config_templates/airflow_local_settings.py

REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging')
REMOTE_TASK_LOG: CloudWatchRemoteLogIO | RemoteLogIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None
BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER'))

if REMOTE_LOGGING:
    from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO

    remote_base_log_folder: str = conf.get_mandatory_value('logging', 'remote_base_log_folder')
    remote_task_handler_kwargs = conf.getjson('logging', 'remote_task_handler_kwargs', fallback={})
    if not isinstance(remote_task_handler_kwargs, dict):
        raise ValueError(
            'logging/remote_task_handler_kwargs must be a JSON object (a python dict), we got '
            f'{type(remote_task_handler_kwargs)}'
        )
    delete_local_copy = conf.getboolean('logging', 'delete_local_logs')

    url_parts = urlsplit(remote_base_log_folder)
    REMOTE_TASK_LOG = CloudWatchRemoteLogIO(
        **(
            {
                'base_log_folder': BASE_LOG_FOLDER,
                'remote_base': remote_base_log_folder,
                'delete_local_copy': delete_local_copy,
                'log_group_arn': url_parts.netloc + url_parts.path,
            }
            | remote_task_handler_kwargs
        )
    )
    remote_task_handler_kwargs = {}

Deployment

Other

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

apache-airflow==3.1.6 apache-airflow-client==3.0.0 apache-airflow-core==3.1.6 apache-airflow-providers-amazon==9.19.0 apache-airflow-providers-celery==3.15.0 apache-airflow-providers-common-compat==1.14.3 apache-airflow-providers-common-io==1.7.0 apache-airflow-providers-common-sql==1.30.2 apache-airflow-providers-docker==4.5.1 apache-airflow-providers-fab==3.1.2 apache-airflow-providers-http==5.6.2 apache-airflow-providers-postgres==6.5.1 apache-airflow-providers-redis==4.4.1 apache-airflow-providers-smtp==2.4.1 apache-airflow-providers-standard==1.10.2 apache-airflow-task-sdk==1.1.6

Official Helm Chart version

Not Applicable

Kubernetes Version

No response

Helm Chart configuration

No response

Docker Image customizations

No response

Anything else?

There is some prio related issues: https://github.com/apache/airflow/issues/52501 https://github.com/apache/airflow/issues/57356 https://github.com/apache/airflow/issues/55756 https://github.com/apache/airflow/pull/53499 (I tried this, but did not work)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide