Extract shared request handlers from supervisor _handle_request methods
#65570 opened on Apr 20, 2026
Description
We have four WatchedSubprocess subclasses that each implement _handle_request with large if/elif chains dispatching comms messages. Many of these handlers are duplicated inline across multiple supervisors:
- ActivitySubprocess (task-sdk, supervisor.py)
- CallbackSubprocess (task-sdk, callback_supervisor.py)
- TriggerRunnerSupervisor (airflow-core, triggerer_job_runner.py)
- DagFileProcessorProcess (airflow-core, processor.py)
A request_handlers.py module already exists in task-sdk/src/airflow/sdk/execution_time/ with shared handlers for GetConnection, GetVariable, GetAssetByName, GetAssetByUri, and MaskSecret. The Callback and Activity supervisors use these, but the Triggerer and DFP still have inline duplicates.
The Task:
Extract the remaining duplicated handler logic into shared functions in request_handlers.py and have all four supervisors call them.
Note: The implementations across supervisors are similar but not identical. Some include isinstance() guards on the response; some pass different parameter subsets; some set dump_opts while others don't. When extracting, don't just copy one supervisor's version verbatim. Compare all implementations of each handler and produce a single version that incorporates the best practices from each (e.g. proper response type checking, consistent dump_opts, full parameter forwarding).
Here is my suggestion, but you may come up with a better set of rules once you start working on it:
- Use the existing shared handlers as a guide, but don't be afraid to modify them if you come up with a new standard.
- Each new handler should be a standalone function with this signature:
def handle_<message_type>(client: Client, msg: <MessageType>) -> tuple[BaseModel | None, dict[str, bool]]: - Always guard the response with
isinstancebefore converting it to a comms result model. The API client can returnErrorResponseon failure; when that happens, pass it through unchanged. - Always return dump_opts as the second tuple element. Use
{"exclude_unset": True}for result models that wrap API responses to avoid serializingNonefields. Add{"by_alias": True}when the result model uses field aliases (currently onlyConnectionResult). For simple pass-through responses or error responses, return{}. - Forward all message fields to the client call. Some supervisors currently pass fewer parameters than others for the same message type (e.g. the Triggerer's
GetXComomitsinclude_prior_dates). The shared handler should forward every field that the message carries and let the client and server handle defaults for optional fields. - Always mask secrets. If the response contains sensitive data (passwords, tokens, variable values), call
mask_secret()before returning. Look athandle_get_connectionandhandle_get_variablefor examples. - Handlers that don't return a response such as
PutVariableandDeleteXComcan use a simpler signature.return (None, {})or returnNonedirectly likehandle_mask_secretdoes. Pick whichever is cleaner for the specific case, but be consistent within a batch. - Keep handlers stateless. They should not touch subprocess internals (exit codes, terminal states, etc.). If a message type requires updating subprocess state, it belongs inline in that supervisor's
_handle_request, not in a shared handler.
Phase 1: High-overlap handlers (three or more supervisors share this logic)
- GetXCom - Activity, Triggerer, DFP
- PutVariable - Activity, Triggerer, DFP
- DeleteVariable - Activity, Triggerer, DFP
- GetTICount - Activity, Triggerer, DFP
- GetTaskStates - Activity, Triggerer, DFP
- GetPreviousTI - Activity, Triggerer, DFP
Phase 2: Medium-overlap handlers (two supervisors share these)
- SetXCom - Activity, Triggerer
- DeleteXCom - Activity, Triggerer
- GetDRCount - Activity, Triggerer
- GetDagRunState - Activity, Triggerer
- GetPreviousDagRun - Activity, DFP
- GetPrevSuccessfulDagRun - Activity, DFP
- GetXComCount - Activity, DFP
- GetXComSequenceItem - Activity, DFP
- GetXComSequenceSlice - Activity, DFP
Phase 2.5: Callback Supervisor
Comms channels were added to the Callback supervisor in https://github.com/apache/airflow/pull/65269 and all of the comms channels it has as of the time I am writing this are already in shared helpers, but have a look and make sure that is still true while you are doing this.
Phase 3: Migrate DagFileProcessorProcess onto shared handlers
The DFP currently has fully inline versions of GetConnection, GetVariable, and MaskSecret even though shared handlers already exist. One thing to note: the DFP's inline GetConnection handler skips the mask_secret() calls on password and extra that the shared handler performs, which I personally feel is a bug but may require discussion. Maybe there's a reason I'm not aware of.
Out of Scope:
Supervisor-specific messages (e.g. TaskState, DeferTask, TriggerStateChanges, DagFileParsingResult) should either stay inline or move to a supervisor-specific helper/utils module (I vote leave alone, personally) since they interact with internal subprocess state.