apache/airflow

Extract shared request handlers from supervisor _handle_request methods

Closed

#65570 opened on Apr 20, 2026

View on GitHub
 (10 comments) (1 reaction) (0 assignees)Python (44,809 stars) (16,781 forks)batch import
area:coregood first issue

Description

We have four WatchedSubprocess subclasses that each implement _handle_request with large if/elif chains dispatching comms messages. Many of these handlers are duplicated inline across multiple supervisors:

  • ActivitySubprocess (task-sdk, supervisor.py)
  • CallbackSubprocess (task-sdk, callback_supervisor.py)
  • TriggerRunnerSupervisor (airflow-core, triggerer_job_runner.py)
  • DagFileProcessorProcess (airflow-core, processor.py)

A request_handlers.py module already exists in task-sdk/src/airflow/sdk/execution_time/ with shared handlers for GetConnection, GetVariable, GetAssetByName, GetAssetByUri, and MaskSecret. The Callback and Activity supervisors use these, but the Triggerer and DFP still have inline duplicates.

The Task:

Extract the remaining duplicated handler logic into shared functions in request_handlers.py and have all four supervisors call them.

Note: The implementations across supervisors are similar but not identical. Some include isinstance() guards on the response; some pass different parameter subsets; some set dump_opts while others don't. When extracting, don't just copy one supervisor's version verbatim. Compare all implementations of each handler and produce a single version that incorporates the best practices from each (e.g. proper response type checking, consistent dump_opts, full parameter forwarding).

Here is my suggestion, but you may come up with a better set of rules once you start working on it:

  1. Use the existing shared handlers as a guide, but don't be afraid to modify them if you come up with a new standard.
  2. Each new handler should be a standalone function with this signature:
    def handle_<message_type>(client: Client, msg: <MessageType>) -> tuple[BaseModel | None, dict[str, bool]]:
    
  3. Always guard the response with isinstance before converting it to a comms result model. The API client can return ErrorResponse on failure; when that happens, pass it through unchanged.
  4. Always return dump_opts as the second tuple element. Use {"exclude_unset": True} for result models that wrap API responses to avoid serializing None fields. Add {"by_alias": True} when the result model uses field aliases (currently only ConnectionResult). For simple pass-through responses or error responses, return {}.
  5. Forward all message fields to the client call. Some supervisors currently pass fewer parameters than others for the same message type (e.g. the Triggerer's GetXCom omits include_prior_dates). The shared handler should forward every field that the message carries and let the client and server handle defaults for optional fields.
  6. Always mask secrets. If the response contains sensitive data (passwords, tokens, variable values), call mask_secret() before returning. Look at handle_get_connection and handle_get_variable for examples.
  7. Handlers that don't return a response such as PutVariable and DeleteXCom can use a simpler signature. return (None, {}) or return None directly like handle_mask_secret does. Pick whichever is cleaner for the specific case, but be consistent within a batch.
  8. Keep handlers stateless. They should not touch subprocess internals (exit codes, terminal states, etc.). If a message type requires updating subprocess state, it belongs inline in that supervisor's _handle_request, not in a shared handler.

Phase 1: High-overlap handlers (three or more supervisors share this logic)

  • GetXCom - Activity, Triggerer, DFP
  • PutVariable - Activity, Triggerer, DFP
  • DeleteVariable - Activity, Triggerer, DFP
  • GetTICount - Activity, Triggerer, DFP
  • GetTaskStates - Activity, Triggerer, DFP
  • GetPreviousTI - Activity, Triggerer, DFP

Phase 2: Medium-overlap handlers (two supervisors share these)

  • SetXCom - Activity, Triggerer
  • DeleteXCom - Activity, Triggerer
  • GetDRCount - Activity, Triggerer
  • GetDagRunState - Activity, Triggerer
  • GetPreviousDagRun - Activity, DFP
  • GetPrevSuccessfulDagRun - Activity, DFP
  • GetXComCount - Activity, DFP
  • GetXComSequenceItem - Activity, DFP
  • GetXComSequenceSlice - Activity, DFP

Phase 2.5: Callback Supervisor

Comms channels were added to the Callback supervisor in https://github.com/apache/airflow/pull/65269 and all of the comms channels it has as of the time I am writing this are already in shared helpers, but have a look and make sure that is still true while you are doing this.

Phase 3: Migrate DagFileProcessorProcess onto shared handlers

The DFP currently has fully inline versions of GetConnection, GetVariable, and MaskSecret even though shared handlers already exist. One thing to note: the DFP's inline GetConnection handler skips the mask_secret() calls on password and extra that the shared handler performs, which I personally feel is a bug but may require discussion. Maybe there's a reason I'm not aware of.

Out of Scope:

Supervisor-specific messages (e.g. TaskState, DeferTask, TriggerStateChanges, DagFileParsingResult) should either stay inline or move to a supervisor-specific helper/utils module (I vote leave alone, personally) since they interact with internal subprocess state.

Contributor guide