Cloudwatch remote logging does not work for ECS Executor
#66475 opened on May 6, 2026
Description
Under which category would you file this issue?
Task SDK
Apache Airflow version
3.1.6
What happened and how to reproduce it?
I have just switched from CeleryExecutor to AWSECSExecutor. Remote logging worked just fine under the CeleryExecutor. I have not changed any configuration regarding remote logging and the new task definition has the very same settings and environment variables as the previous worker. Even the very same Docker image.
{ "timestamp": "2026-05-05T19:45:34.865349Z", "level": "info", "event": "Executing workload", "workload": "ExecuteTask(token='ey123', ti=TaskInstance(id=UUID('019df9ab-4492-7e33-a50f-d63ba5e69a7e'), dag_version_id=UUID('019df35c-b3dd-7352-875f-15a8fda74161'), task_id='execute_clear_tokens_command', dag_id='clear_tokens', run_id='manual__2026-05-05T19:44:02+00:00', try_number=1, map_index=-1, pool_slots=1, queue='default', priority_weight=1, executor_config=None, parent_context_carrier={'traceparent': '00-3ac0d9bb09e7b23afad9ff7e24c5cf45-496565643a83fe9c-01'}, context_carrier={'traceparent': '00-3ac0d9bb09e7b23afad9ff7e24c5cf45-3fd301e67f5f5aa3-01'}), dag_rel_path=PurePosixPath('management_commands/clear_tokens.py'), bundle_info=BundleInfo(name='dags-folder', version=None), log_path='dag_id=clear_tokens/run_id=manual__2026-05-05T19:44:02+00:00/task_id=execute_clear_tokens_command/attempt=1.log', type='ExecuteTask')", "logger": "__main__", "filename": "execute_workload.py", "lineno": 56 }
{ "timestamp": "2026-05-05T19:45:34.865902Z", "level": "info", "event": "Connecting to server:", "server": "https://my-server/execution/", "logger": "__main__", "filename": "execute_workload.py", "lineno": 64 }
{ "timestamp": "2026-05-05T19:45:34.937261Z", "level": "info", "event": "Failed parsing the json for key role_arn", "logger": "airflow.models.connection.Connection", "filename": "connection.py", "lineno": 248 }
{ "timestamp": "2026-05-05T19:45:34.938875Z", "level": "info", "event": "Secrets backends loaded for worker", "count": 1, "backend_classes": [ "EnvironmentVariablesBackend" ], "logger": "supervisor", "filename": "supervisor.py", "lineno": 1975 }
{ "timestamp": "2026-05-05T19:45:35.062342Z", "level": "warning", "event": "Received message after logging system shutdown", "category": "WatchtowerWarning", "filename": "/bin/app/lib/python3.12/site-packages/watchtower/__init__.py", "lineno": 464, "logger": "py.warnings" }
{ "timestamp": "2026-05-05T19:45:35.059730Z", "level": "info", "event": "DAG bundles loaded: dags-folder", "logger": "airflow.dag_processing.bundles.manager.DagBundlesManager", "filename": "supervisor.py", "lineno": 1806 }
{ "timestamp": "2026-05-05T19:45:35.060682Z", "level": "info", "event": "Filling up the DagBag from /opt/airflow/dags/management_commands/clear_tokens.py", "logger": "airflow.models.dagbag.DagBag", "filename": "supervisor.py", "lineno": 1806 }
{ "timestamp": "2026-05-05T19:46:16.720326Z", "level": "info", "event": "Successfully deleted 0 tokens", "logger": "task.stdout", "filename": "supervisor.py", "lineno": 1821 }
{ "timestamp": "2026-05-05T19:46:16.718629Z", "level": "info", "event": "Done. Returned value was: None", "logger": "airflow.task.operators.dags.custom_operators.DjangoOperator", "filename": "supervisor.py", "lineno": 1806 }
{ "timestamp": "2026-05-05T19:46:21.894417Z", "level": "info", "event": "Task finished", "task_instance_id": "019df9ab-4492-7e33-a50f-d63ba5e69a7e", "exit_code": 0, "duration": 46.96043953400002, "final_state": "success", "logger": "supervisor", "filename": "supervisor.py", "lineno": 1995 }
The logs are never written to the corresponding cloudwatch group and therefore, the webserver fails fetching the dag logs.
What you think should happen instead?
To send the logs to cloudwatch and show up in the UI.
Operating System
AWS ECS Fargate
envs
| Key | Type | Value |
|---|---|---|
| ACCESS_LOG_LEVEL | value | WARNING |
| AIRFLOW__API__BASE_URL | value | https://airflow.staging.example.com |
| AIRFLOW__API__SECRET_KEY | valueFrom | arn:aws:ssm:eu-central-1:123456789012:parameter/airflowApiSecretK-def5678 |
| AIRFLOW__API_AUTH__JWT_SECRET | valueFrom | arn:aws:ssm:eu-central-1:123456789012:parameter/airflowApiAuthJwt-ghi9012 |
| AIRFLOW__AWS_ECS_EXECUTOR__ASSIGN_PUBLIC_IP | value | DISABLED |
| AIRFLOW__AWS_ECS_EXECUTOR__CHECK_HEALTH_ON_STARTUP | value | False |
| AIRFLOW__AWS_ECS_EXECUTOR__CLUSTER | value | airflow-ecs-cluster-aaa1111 |
| AIRFLOW__AWS_ECS_EXECUTOR__CONTAINER_NAME | value | airflow-executor-container-definition |
| AIRFLOW__AWS_ECS_EXECUTOR__LAUNCH_TYPE | value | FARGATE |
| AIRFLOW__AWS_ECS_EXECUTOR__REGION_NAME | value | eu-central-1 |
| AIRFLOW__AWS_ECS_EXECUTOR__SECURITY_GROUPS | value | sg-0aabbccddee112233 |
| AIRFLOW__AWS_ECS_EXECUTOR__SUBNETS | value | subnet-0aaaa1111bbbb2222,subnet-0cccc3333dddd4444 |
| AIRFLOW__AWS_ECS_EXECUTOR__TASK_DEFINITION | value | airflow-executor |
| AIRFLOW__CORE__AUTH_MANAGER | value | airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager |
| AIRFLOW__CORE__DEFAULT_TIMEZONE | value | Europe/Berlin |
| AIRFLOW__CORE__EXECUTOR | value | airflow.providers.amazon.aws.executors.ecs.ecs_executor.AwsEcsExecutor |
| AIRFLOW__CORE__FERNET_KEY | valueFrom | arn:aws:ssm:eu-central-1:123456789012:parameter/airflowCoreFernet-jkl3456 |
| AIRFLOW__CORE__LOAD_EXAMPLES | value | false |
| AIRFLOW__CORE__MIN_SERIALIZED_DAG_FETCH_INTERVAL | value | 600 |
| AIRFLOW__CORE__MIN_SERIALIZED_DAG_UPDATE_INTERVAL | value | 600 |
| AIRFLOW__DAG_PROCESSOR__MIN_FILE_PROCESS_INTERVAL | value | 600 |
| AIRFLOW__DAG_PROCESSOR__PARSING_PROCESSES | value | 2 |
| AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_ENABLED | value | false |
| AIRFLOW__LOGGING__ENCRYPT_S3_LOGS | value | false |
| AIRFLOW__LOGGING__LOGGING_CONFIG_CLASS | value | dags.settings.LOGGING_CONFIG |
| AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER | value | cloudwatch://arn:aws:logs:eu-central-1:123456789012:log-group:airflow-dags-logs-bbb2222 |
| AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID | value | logs_cloudwatch |
| AIRFLOW__LOGGING__REMOTE_LOGGING | value | true |
| AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT | value | False |
| AIRFLOW__SCHEDULER__SCHEDULER_IDLE_SLEEP_TIME | value | 5 |
| AIRFLOW__TRACES__OTEL_HOST | value | localhost |
| AIRFLOW__TRACES__OTEL_ON | value | true |
| AIRFLOW__TRACES__OTEL_PORT | value | 4318 |
| AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE | value | Europe/Berlin |
| AIRFLOW__WEBSERVER__SHOW_TRIGGER_FORM_IF_NO_PARAMS | value | True |
| AIRFLOW__WEBSERVER__SKIP_TRIGGER_FORM_IF_NO_PARAM | value | False |
| AIRFLOW_CONN_ECS_FARGATE | value | aws://?role_arn=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fairflow-ecs-role-ccc3333 |
| AIRFLOW_CONN_LOGS_CLOUDWATCH | value | aws://?role_arn=arn%3Aaws%3Aiam%3A%3A123456789012%3Arole%2Fairflow-cloudwatch-role-ddd4444 |
| AIRFLOW_HOST | value | airflow.staging.example.com |
| AIRFLOW_REST_API_PASSWORD | valueFrom | arn:aws:ssm:eu-central-1:123456789012:parameter/airflow-rest-api-password-eee5555 |
| AIRFLOW_REST_API_ROLE | value | Admin |
| AIRFLOW_REST_API_USER | value | user |
airflow-cloudwatch-role-ddd4444
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:CreateLogGroup"
],
"Effect": "Allow",
"Resource": "arn:aws:logs:eu-central-1:923046849141:log-group:airflow-dags-logs-bbb2222*"
}
]
}
settings.py
from __future__ import annotations
import os
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
from airflow.configuration import conf
from copy import deepcopy
from typing import TYPE_CHECKING
from urllib.parse import urlsplit
if TYPE_CHECKING:
from airflow.logging.remote import RemoteLogIO
from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO
LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
LOGGING_CONFIG['handlers']['console'] = {
'class': 'logging.StreamHandler',
}
LOGGING_CONFIG['formatters']['default'] = {'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s'}
LOGGING_CONFIG['loggers']['zeep.xsd.types.simple'] = { # Especially needed for the MaStR api calls.
'handlers': ['console'],
'level': 'CRITICAL',
}
# From Airflow's config template:
# https://github.com/apache/airflow/blob/e965c2e676d85ced65a485d4b2601addc2fd3e97/airflow-core/src/airflow/config_templates/airflow_local_settings.py
REMOTE_LOGGING: bool = conf.getboolean('logging', 'remote_logging')
REMOTE_TASK_LOG: CloudWatchRemoteLogIO | RemoteLogIO | None = None
DEFAULT_REMOTE_CONN_ID: str | None = None
BASE_LOG_FOLDER: str = os.path.expanduser(conf.get_mandatory_value('logging', 'BASE_LOG_FOLDER'))
if REMOTE_LOGGING:
from airflow.providers.amazon.aws.log.cloudwatch_task_handler import CloudWatchRemoteLogIO
remote_base_log_folder: str = conf.get_mandatory_value('logging', 'remote_base_log_folder')
remote_task_handler_kwargs = conf.getjson('logging', 'remote_task_handler_kwargs', fallback={})
if not isinstance(remote_task_handler_kwargs, dict):
raise ValueError(
'logging/remote_task_handler_kwargs must be a JSON object (a python dict), we got '
f'{type(remote_task_handler_kwargs)}'
)
delete_local_copy = conf.getboolean('logging', 'delete_local_logs')
url_parts = urlsplit(remote_base_log_folder)
REMOTE_TASK_LOG = CloudWatchRemoteLogIO(
**(
{
'base_log_folder': BASE_LOG_FOLDER,
'remote_base': remote_base_log_folder,
'delete_local_copy': delete_local_copy,
'log_group_arn': url_parts.netloc + url_parts.path,
}
| remote_task_handler_kwargs
)
)
remote_task_handler_kwargs = {}
Deployment
Other
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow==3.1.6 apache-airflow-client==3.0.0 apache-airflow-core==3.1.6 apache-airflow-providers-amazon==9.19.0 apache-airflow-providers-celery==3.15.0 apache-airflow-providers-common-compat==1.14.3 apache-airflow-providers-common-io==1.7.0 apache-airflow-providers-common-sql==1.30.2 apache-airflow-providers-docker==4.5.1 apache-airflow-providers-fab==3.1.2 apache-airflow-providers-http==5.6.2 apache-airflow-providers-postgres==6.5.1 apache-airflow-providers-redis==4.4.1 apache-airflow-providers-smtp==2.4.1 apache-airflow-providers-standard==1.10.2 apache-airflow-task-sdk==1.1.6
Official Helm Chart version
Not Applicable
Kubernetes Version
No response
Helm Chart configuration
No response
Docker Image customizations
No response
Anything else?
There is some prio related issues: https://github.com/apache/airflow/issues/52501 https://github.com/apache/airflow/issues/57356 https://github.com/apache/airflow/issues/55756 https://github.com/apache/airflow/pull/53499 (I tried this, but did not work)
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct