[Bug] [Zeta] Batch job occasionally hangs in RUNNING forever under high-frequency job submission
#10,997 建立於 2026年6月2日
倉庫指標
- Star
- (6,897 star)
- PR 合併指標
- (平均合併 23天) (30 天內合併 98 個 PR)
描述
Search before asking
- I had searched in the issues and found no similar issues.
What happened
In production we hit an intermittent problem: a BATCH job (JDBC MySQL source → JDBC
MultiTableSink) finished reading all source data and the bounded reader closed normally, but the
whole job then stays in RUNNING forever and never turns FINISHED. It cannot finish on its
own — the only way to clear it is a manual cancel or a master restart.
This does not happen every time; it shows up intermittently under a high job-submission load. Below are the clues we collected. We'd like the community's help to analyze the possible causes and directions for improvement.
Environment & workload characteristics
- SeaTunnel version: 2.3.13, Zeta engine.
- Deployment: separated master/worker, 2 masters (HA 32C+48G heap) + 8 workers(3 32C+48G heap + 5 16C+20G heap).
- Workload: the platform has 1000+ data-collection jobs, submitted periodically by a scheduler, with on average several dozen job submissions per minute (higher at peak). So the cluster runs continuously under "sustained high-frequency submission + many small concurrent jobs".
- The job that hung is itself trivial: a single bounded table read, BATCH mode, and no
checkpoint.intervalconfigured (i.e. checkpoint disabled).
SeaTunnel Version
SeaTunnel version: 2.3.13, Zeta engine
SeaTunnel Config
for master:
-- seatunnel.yaml
seatunnel:
engine:
classloader-cache-mode: true
history-job-expire-minutes: 30
backup-count: 0
queue-type: blockingqueue
job-metrics-backup-interval: 60
print-execution-info-interval: 60
print-job-metrics-info-interval: 1800
job-metrics-partition-count: 4
job-schedule-strategy: WAIT
slot-service:
dynamic-slot: false
checkpoint:
interval: 60000
timeout: 600000
storage:
...
telemetry:
metric:
enabled: false
logs:
scheduled-deletion-enable: false
http:
enable-http: true
port: 8010
enable-dynamic-port: false
-- hazelcast-master.yaml
hazelcast:
cluster-name: platform
network:
rest-api:
enabled: false
endpoint-groups:
CLUSTER_WRITE:
enabled: true
DATA:
enabled: true
join:
...
port:
auto-increment: false
port: 5801
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.operation.generic.thread.count: 50
hazelcast.operation.call.timeout.millis: 60000
hazelcast.operation.backup.timeout.millis: 60000
hazelcast.heartbeat.failuredetector.type: phi-accrual
hazelcast.heartbeat.interval.seconds: 5
hazelcast.max.no.heartbeat.seconds: 300
hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
hazelcast.operation.thread.count: 32
hazelcast.io.thread.count: 32
member-attributes:
group:
type: string
value: default
team:
type: string
value: default
for worker:
-- seatunnel.yaml
seatunnel:
engine:
classloader-cache-mode: true
history-job-expire-minutes: 30
backup-count: 0
queue-type: blockingqueue
job-metrics-backup-interval: 60
print-execution-info-interval: 60
print-job-metrics-info-interval: 1800
job-metrics-partition-count: 4
job-schedule-strategy: WAIT
slot-service:
dynamic-slot: false
slot-num: 96
slot-allocate-strategy: slot_ratio
checkpoint:
interval: 60000
timeout: 600000
storage:
...
telemetry:
metric:
enabled: false
logs:
scheduled-deletion-enable: false
http:
enable-http: true
port: 8010
enable-dynamic-port: false
-- hazelcast-worker.yaml
hazelcast:
cluster-name: platform
network:
join:
...
port:
auto-increment: false
port: 5802
properties:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
hazelcast.io.thread.count: 32
hazelcast.operation.generic.thread.count: 50
hazelcast.operation.call.timeout.millis: 60000
hazelcast.operation.backup.timeout.millis: 60000
hazelcast.heartbeat.failuredetector.type: phi-accrual
hazelcast.heartbeat.interval.seconds: 5
hazelcast.max.no.heartbeat.seconds: 300
hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
hazelcast.operation.thread.count: 32
member-attributes:
group:
type: string
value: default
team:
type: string
value: default
Running Command
start up: ./bin/seatunnel-cluster.sh -d -r master ./bin/seatunnel-cluster.sh -d -r worker
submit job: /submit-job API use the JSON config
Error Exception
## Clues we found
### Clue 1: state of the three task threads while hung (from jstack)
| Role | Thread-stack location | Meaning |
|---|---|---|
| Source reader (taskGroup 2) | looping in `SourceFlowLifeCycle.collect:194` (sleep) | reader's internal `prepareClose=true`, but the **task-level `prepareCloseStatus=false`, still in RUNNING** |
| Sink side (taskGroup 2) | blocked in `IntermediateBlockingQueue.collect:54` (poll) | waiting forever for data / a close marker on the intermediate queue that never arrives |
| SplitEnumerator (taskGroup 1) | `SourceSplitEnumeratorTask.stateProcess:350` (PREPARE_CLOSE) | already detected all readers finished, already sent `LastCheckpointNotifyOperation` to master, waiting for `closeCalled` |
The source finished reading at `17:48:42`; after that there is **almost no further log** for this
job on either worker or master.
### Clue 2: how a bounded BATCH job is supposed to finish (for reference)
Normally a bounded BATCH job is closed by a "final checkpoint":
1. reader finishes → `signalNoMoreElement()` → notifies the enumerator;
2. enumerator detects all readers finished → sends `LastCheckpointNotifyOperation` to master →
`CheckpointCoordinator.readyToClose()`;
3. master triggers the final checkpoint `tryTriggerPendingCheckpoint(COMPLETED_POINT_TYPE)`;
4. this close barrier flows enumerator → reader → sink, each acks, and it sets `prepareCloseStatus`
to true on every task, advancing each task `RUNNING → PREPARE_CLOSE → CLOSED`;
5. once all complete, master tells the tasks to close and the job finishes.
In the hung job, step 3 appears to have happened (the enumerator is already in PREPARE_CLOSE), but
the **source reader's `prepareCloseStatus` was never set** — i.e. the close-barrier handshake never
completed / never reached the reader.
### Clue 3: in the same time window, worker↔master Hazelcast communication was massively timing out and the connection was flapping
In the logs of the worker that ran the source/sink (172.18.1.145) and the active master
(172.18.1.143), **right in the window when the job tried to close (around source close at
`17:48:42`)**, cluster-internal communication was badly degraded:
- the master logged **1419 `Slow operation detected: ReportMetricsOperation`** warnings within one
hour; `ReportMetricsOperation` (partitionId=-1) was hogging almost all generic-operation threads;
- several operations from the worker to the master were **rejected by the 60s
operation-call-timeout**, e.g. `JobEventReportOperation` (17:46:36, 17:48:03, with the connection
already `alive=false`) and `ReportMetricsOperation` (17:48:59, `ERROR ... failed to update metrics`);
- the TCP cluster connection worker(145)↔master(143) was **re-initialized 6 times between 17:39 and
17:56** (each reconnect drops the operations in flight on that connection); meanwhile **no
Hazelcast cluster membership change occurred** (no master failover) — only the connection flapped;
- shortly after, `SourceRegisterOperation` was flagged as a slow operation (17:50:37) and then
**failed after 30 retries** (17:51:27), confirming operations were being lost during this period.
### Clue 4: the close handshake relies on a chain of Hazelcast operations
The close flow in step 2 is, in implementation, a chain of cross-node Hazelcast operations:
`LastCheckpointNotifyOperation` (worker→master), `CheckpointBarrierTriggerOperation`
(master→enumerator), `BarrierFlowOperation` (enumerator→reader), `TaskAcknowledgeOperation`
(reader→master). If any one of these is lost (in a connection drop / 60s-timeout rejection) and is
not retried, the close flow cannot complete.
We also noticed that when `checkpoint.interval` is not configured (checkpoint disabled), this final
checkpoint used to close the bounded source does not seem to have the same timeout detection that a
normal checkpoint has (this is only an observation; we're not sure whether it is by design).
## Our questions / what we'd like the community to look at
1. Under our "1000+ jobs, several dozen submissions per minute" high-frequency scenario,
`ReportMetricsOperation` and similar internal operations saturate the master's operation threads
and cause the worker↔master connection to repeatedly time out / reconnect — is this a known
scalability bottleneck? How does the community usually mitigate it?
2. For this phenomenon, what does the community think are the **possible root causes** and the
**directions for future optimization**? (Whether at the config, deployment, or engine level —
we'd love to hear any thoughts.)
Zeta or Flink or Spark Version
No response
Java or Scala Version
Java: 8
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct