kestra-io/kestra

[TBD] Filter out tasks (like log) that don't generate any outputs from the Outputs tab

Open

#5,447 opened on Oct 14, 2024

View on GitHub
 (7 comments) (1 reaction) (0 assignees)Java (26,854 stars) (2,579 forks)batch import
area/frontendgood first issue

Description

Feature description

In some executions where you have many task runs, it seems useful to filter out task runs that don't have any outputs (tasks like ForEachItem or WorkingDirectory that are placeholders for downstream tasks could, in theory, stay in Outputs but also not strictly necessary).

image

I'd advocate for filtering out all task runs that have 0 outputs from this tab

Example reproduce with child flow in the description:

id: file_output_parent_concat_foreach_batch_1
namespace: company.team

description: |
  id: file_output_child
  namespace: company.team

  inputs:
    - id: ion_file
      type: FILE

  tasks:
    - id: read_raw_ion_file
      type: io.kestra.plugin.core.log.Log
      message: "{{ read(inputs.ion_file) }}"

    - id: transform
      type: io.kestra.plugin.transform.jsonata.TransformItems
      from: "{{ inputs.ion_file }}"
      expression: |
        {
          "customer_email": customer_email,
          "total": total
        }

  outputs:
    - id: myoutput
      type: FILE
      value: "{{ outputs.transform.uri }}"

tasks:
  - id: extract
    type: io.kestra.plugin.jdbc.duckdb.Query
    sql: |
      INSTALL httpfs;
      LOAD httpfs;
      SELECT *
      FROM read_csv_auto('https://huggingface.co/datasets/kestra/datasets/raw/main/csv/orders.csv', header=True);
    store: true

  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.extract.uri }}"
    batch:
      rows: 1 # TBD annoying that it cannot be passed from inputs
      # https://github.com/kestra-io/kestra/issues/5442
    namespace: company.team
    flowId: file_output_child
    wait: true
    transmitFailed: true
    inputs:
      ion_file: "{{ taskrun.items }}"

  - id: ion_to_json
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.for_each_item_merge.subflowOutputs }}"
    newLine: false

  - id: merge
    type: io.kestra.plugin.core.storage.Concat
    files: "{{ read(outputs.ion_to_json.uri) | jq('map(.myoutput)') | flatten }}"
    separator: "\n"

  - id: for_each
    type: io.kestra.plugin.core.flow.ForEach
    allowFailure: true
    description: this will fail with Unable to read the file 'kestra:///....ion' as it didn't belong to the current execution
    values: "{{ read(outputs.ion_to_json.uri) }}"
    tasks:
      - id: log
        type: io.kestra.plugin.core.log.Log
        allowFailure: true
        message: "{{ read(json(taskrun.value).myoutput) }}"

Contributor guide