apache/seatunnel
View on GitHub[Bug] [connector-kafka] exactly-once. The infinite loop issue in the void abortTransaction(long checkpointId) method of KafkaTransactionSender
Open
#10061 opened on Nov 13, 2025
bughelp wanted
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
我使用的kafka版本为3.9.1 if (producer.getEpoch() == 0) { break; } 这段代码没有按预期进入这里,导致机器卡死,合理怀疑为TransactionId没有正确更新。 我目前的修改为 @Override public void abortTransaction(long checkpointId) {
KafkaInternalProducer<K, V> producer;
for (long i = checkpointId; ; i++) {
if (this.kafkaProducer != null) {
producer = this.kafkaProducer;
} else {
producer = getTransactionProducer(this.kafkaProperties, generateTransactionId(this.transactionPrefix, i));
}
// producer.setTransactionalId(transactionId); if (log.isDebugEnabled()) { log.debug("Abort kafka transaction: {}", transactionId); } // producer.flush(); if (producer.getEpoch() == 0) { break; } } }
SeaTunnel Version
2.3.12
SeaTunnel Config
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 60000
checkpoint.timeout = 600000
}
source {
MySQL-CDC {
plugin_output = "s"
url = "jdbc:mysql://xxx:3306/testdb"
username = "xxx"
password = "xxx"
database-names = ["testdb"]
table-names = ["testdb.xxtable"]
int_type_narrowing = false
server-id = "25600-25603"
startup.mode = "latest"
}
}
transform {
Metadata {
plugin_input = ["s"]
plugin_output = "ss"
metadata_fields {
EventTime = __event_time
Delay = __delay
}
}
TableFilter {
plugin_input = ["ss"]
plugin_output = "xxtable"
database_pattern = "testdb"
table_pattern = "xxtable"
}
}
sink {
Kafka {
plugin_input = ["xxtable"]
bootstrap.servers = "xxx:9092"
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
enable.idempotence = true
retries = 3
sasl.jaas.config = "org.apache.kafka.common.security.scram.ScramLoginModule required username="xxx" password="xxx";"
sasl.mechanism = SCRAM-SHA-512
security.protocol = SASL_PLAINTEXT
compression.type = "snappy"
}
topic = "xxtable"
}
}
Running Command
bin/seatunnel.sh -c task/xxtable.conf -n xxtable --async
Error Exception
Abort kafka transaction: SeaTunnel5724-1
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct