apache/seatunnel

[Bug] [Zeta] Batch job occasionally hangs in RUNNING forever under high-frequency job submission

Open

#10,997 建立於 2026年6月2日

在 GitHub 查看
 (5 留言) (0 反應) (0 負責人)Java (1,432 fork)batch import
bughelp wanted

倉庫指標

Star
 (6,897 star)
PR 合併指標
 (平均合併 23天) (30 天內合併 98 個 PR)

描述

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

In production we hit an intermittent problem: a BATCH job (JDBC MySQL source → JDBC MultiTableSink) finished reading all source data and the bounded reader closed normally, but the whole job then stays in RUNNING forever and never turns FINISHED. It cannot finish on its own — the only way to clear it is a manual cancel or a master restart.

This does not happen every time; it shows up intermittently under a high job-submission load. Below are the clues we collected. We'd like the community's help to analyze the possible causes and directions for improvement.

Environment & workload characteristics

  • SeaTunnel version: 2.3.13, Zeta engine.
  • Deployment: separated master/worker, 2 masters (HA 32C+48G heap) + 8 workers(3 32C+48G heap + 5 16C+20G heap).
  • Workload: the platform has 1000+ data-collection jobs, submitted periodically by a scheduler, with on average several dozen job submissions per minute (higher at peak). So the cluster runs continuously under "sustained high-frequency submission + many small concurrent jobs".
  • The job that hung is itself trivial: a single bounded table read, BATCH mode, and no checkpoint.interval configured (i.e. checkpoint disabled).

SeaTunnel Version

SeaTunnel version: 2.3.13, Zeta engine

SeaTunnel Config

for master:

-- seatunnel.yaml
seatunnel:
  engine:
    classloader-cache-mode: true
    history-job-expire-minutes: 30
    backup-count: 0
    queue-type: blockingqueue
    job-metrics-backup-interval: 60
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 1800
    job-metrics-partition-count: 4
    job-schedule-strategy: WAIT
    slot-service:
      dynamic-slot: false
    checkpoint:
      interval: 60000
      timeout: 600000
      storage:
        ...
    telemetry:
      metric:
        enabled: false
      logs:
        scheduled-deletion-enable: false
    http:
      enable-http: true
      port: 8010
      enable-dynamic-port: false

-- hazelcast-master.yaml
hazelcast:
  cluster-name: platform
  network:
    rest-api:
      enabled: false
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true
    join:
      ...
    port:
      auto-increment: false
      port: 5801
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30
    hazelcast.logging.type: log4j2
    hazelcast.operation.generic.thread.count: 50
    hazelcast.operation.call.timeout.millis: 60000
    hazelcast.operation.backup.timeout.millis: 60000
    hazelcast.heartbeat.failuredetector.type: phi-accrual
    hazelcast.heartbeat.interval.seconds: 5
    hazelcast.max.no.heartbeat.seconds: 300
    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
    hazelcast.operation.thread.count: 32
    hazelcast.io.thread.count: 32
  member-attributes:
    group:
      type: string
      value: default
    team:
      type: string
      value: default

for worker:

-- seatunnel.yaml
seatunnel:
  engine:
    classloader-cache-mode: true
    history-job-expire-minutes: 30
    backup-count: 0
    queue-type: blockingqueue
    job-metrics-backup-interval: 60
    print-execution-info-interval: 60
    print-job-metrics-info-interval: 1800
    job-metrics-partition-count: 4
    job-schedule-strategy: WAIT
    slot-service:
      dynamic-slot: false
      slot-num: 96
      slot-allocate-strategy: slot_ratio
    checkpoint:
      interval: 60000
      timeout: 600000
      storage:
        ...
    telemetry:
      metric:
        enabled: false
      logs:
        scheduled-deletion-enable: false
    http:
      enable-http: true
      port: 8010
      enable-dynamic-port: false

-- hazelcast-worker.yaml
hazelcast:
  cluster-name: platform
  network:
    join:
      ...
    port:
      auto-increment: false
      port: 5802
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30
    hazelcast.logging.type: log4j2
    hazelcast.io.thread.count: 32
    hazelcast.operation.generic.thread.count: 50
    hazelcast.operation.call.timeout.millis: 60000
    hazelcast.operation.backup.timeout.millis: 60000
    hazelcast.heartbeat.failuredetector.type: phi-accrual
    hazelcast.heartbeat.interval.seconds: 5
    hazelcast.max.no.heartbeat.seconds: 300
    hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
    hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
    hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
    hazelcast.operation.thread.count: 32
  member-attributes:
    group:
      type: string
      value: default
    team:
      type: string
      value: default

Running Command

start up: ./bin/seatunnel-cluster.sh -d -r master  ./bin/seatunnel-cluster.sh -d -r worker
submit job: /submit-job API use the JSON config

Error Exception

## Clues we found

### Clue 1: state of the three task threads while hung (from jstack)

| Role | Thread-stack location | Meaning |
|---|---|---|
| Source reader (taskGroup 2) | looping in `SourceFlowLifeCycle.collect:194` (sleep) | reader's internal `prepareClose=true`, but the **task-level `prepareCloseStatus=false`, still in RUNNING** |
| Sink side (taskGroup 2) | blocked in `IntermediateBlockingQueue.collect:54` (poll) | waiting forever for data / a close marker on the intermediate queue that never arrives |
| SplitEnumerator (taskGroup 1) | `SourceSplitEnumeratorTask.stateProcess:350` (PREPARE_CLOSE) | already detected all readers finished, already sent `LastCheckpointNotifyOperation` to master, waiting for `closeCalled` |

The source finished reading at `17:48:42`; after that there is **almost no further log** for this
job on either worker or master.

### Clue 2: how a bounded BATCH job is supposed to finish (for reference)

Normally a bounded BATCH job is closed by a "final checkpoint":

1. reader finishes → `signalNoMoreElement()` → notifies the enumerator;
2. enumerator detects all readers finished → sends `LastCheckpointNotifyOperation` to master →
   `CheckpointCoordinator.readyToClose()`;
3. master triggers the final checkpoint `tryTriggerPendingCheckpoint(COMPLETED_POINT_TYPE)`;
4. this close barrier flows enumerator → reader → sink, each acks, and it sets `prepareCloseStatus`
   to true on every task, advancing each task `RUNNING → PREPARE_CLOSE → CLOSED`;
5. once all complete, master tells the tasks to close and the job finishes.

In the hung job, step 3 appears to have happened (the enumerator is already in PREPARE_CLOSE), but
the **source reader's `prepareCloseStatus` was never set** — i.e. the close-barrier handshake never
completed / never reached the reader.

### Clue 3: in the same time window, worker↔master Hazelcast communication was massively timing out and the connection was flapping

In the logs of the worker that ran the source/sink (172.18.1.145) and the active master
(172.18.1.143), **right in the window when the job tried to close (around source close at
`17:48:42`)**, cluster-internal communication was badly degraded:

- the master logged **1419 `Slow operation detected: ReportMetricsOperation`** warnings within one
  hour; `ReportMetricsOperation` (partitionId=-1) was hogging almost all generic-operation threads;
- several operations from the worker to the master were **rejected by the 60s
  operation-call-timeout**, e.g. `JobEventReportOperation` (17:46:36, 17:48:03, with the connection
  already `alive=false`) and `ReportMetricsOperation` (17:48:59, `ERROR ... failed to update metrics`);
- the TCP cluster connection worker(145)↔master(143) was **re-initialized 6 times between 17:39 and
  17:56** (each reconnect drops the operations in flight on that connection); meanwhile **no
  Hazelcast cluster membership change occurred** (no master failover) — only the connection flapped;
- shortly after, `SourceRegisterOperation` was flagged as a slow operation (17:50:37) and then
  **failed after 30 retries** (17:51:27), confirming operations were being lost during this period.

### Clue 4: the close handshake relies on a chain of Hazelcast operations

The close flow in step 2 is, in implementation, a chain of cross-node Hazelcast operations:
`LastCheckpointNotifyOperation` (worker→master), `CheckpointBarrierTriggerOperation`
(master→enumerator), `BarrierFlowOperation` (enumerator→reader), `TaskAcknowledgeOperation`
(reader→master). If any one of these is lost (in a connection drop / 60s-timeout rejection) and is
not retried, the close flow cannot complete.

We also noticed that when `checkpoint.interval` is not configured (checkpoint disabled), this final
checkpoint used to close the bounded source does not seem to have the same timeout detection that a
normal checkpoint has (this is only an observation; we're not sure whether it is by design).

## Our questions / what we'd like the community to look at

1. Under our "1000+ jobs, several dozen submissions per minute" high-frequency scenario,
   `ReportMetricsOperation` and similar internal operations saturate the master's operation threads
   and cause the worker↔master connection to repeatedly time out / reconnect — is this a known
   scalability bottleneck? How does the community usually mitigate it?
2. For this phenomenon, what does the community think are the **possible root causes** and the
   **directions for future optimization**? (Whether at the config, deployment, or engine level —
   we'd love to hear any thoughts.)

Zeta or Flink or Spark Version

No response

Java or Scala Version

Java: 8

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

貢獻者指南