apache/seatunnel

[Bug] [connector-kafka] exactly-once. The infinite loop issue in the void abortTransaction(long checkpointId) method of KafkaTransactionSender

Open

#10061 opened on Nov 13, 2025

View on GitHub
 (6 comments) (0 reactions) (0 assignees)Java (6,897 stars) (1,432 forks)batch import
bughelp wanted

Description

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

我使用的kafka版本为3.9.1 if (producer.getEpoch() == 0) { break; } 这段代码没有按预期进入这里,导致机器卡死,合理怀疑为TransactionId没有正确更新。 我目前的修改为 @Override public void abortTransaction(long checkpointId) {

    KafkaInternalProducer<K, V> producer;

    for (long i = checkpointId; ; i++) {
        if (this.kafkaProducer != null) {
            producer = this.kafkaProducer;
        } else {
            producer = getTransactionProducer(this.kafkaProperties, generateTransactionId(this.transactionPrefix, i));
        }

// producer.setTransactionalId(transactionId); if (log.isDebugEnabled()) { log.debug("Abort kafka transaction: {}", transactionId); } // producer.flush(); if (producer.getEpoch() == 0) { break; } } }

SeaTunnel Version

2.3.12

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000
  checkpoint.timeout = 600000
}
source {
  MySQL-CDC {
    plugin_output = "s"
    url = "jdbc:mysql://xxx:3306/testdb"
    username = "xxx"
    password = "xxx"
    database-names = ["testdb"]
    table-names = ["testdb.xxtable"]
    int_type_narrowing = false
    server-id = "25600-25603" 
    startup.mode = "latest"
  }
}
transform {
  Metadata {
    plugin_input = ["s"]
    plugin_output = "ss"
    metadata_fields {
      EventTime = __event_time
      Delay = __delay
    }
  }
  TableFilter {
    plugin_input = ["ss"]
    plugin_output = "xxtable"
    database_pattern = "testdb"
    table_pattern = "xxtable"
  }
}
sink {
  Kafka {
    plugin_input = ["xxtable"]
    bootstrap.servers = "xxx:9092"
    semantics = EXACTLY_ONCE
    kafka.config = {
      acks = "all"
      enable.idempotence = true
      retries = 3
      sasl.jaas.config = "org.apache.kafka.common.security.scram.ScramLoginModule required  username="xxx"  password="xxx";"
      sasl.mechanism = SCRAM-SHA-512
      security.protocol = SASL_PLAINTEXT
      compression.type = "snappy"
    }
    topic = "xxtable"
  }
}

Running Command

bin/seatunnel.sh -c task/xxtable.conf -n xxtable --async

Error Exception

Abort kafka transaction: SeaTunnel5724-1

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide