apache/seatunnel

[Bug] [MongoDB-CDC] Bug DataException: operationType is not a valid field name

Open

#11,008 opened on 2026年6月4日

GitHub で見る
 (1 comment) (0 reactions) (0 assignees)Java (1,432 forks)batch import
bughelp wanted

Repository metrics

Stars
 (6,897 stars)
PR merge metrics
 (平均マージ 23d) (30d で 98 merged PRs)

説明

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

2.3.12版本、2.3.13版本均出现Mongo-CDC同步时,Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name

例如test-db to test-db2,表t_sys_user, 数据示例: db.t_sys_user.insertOne({ "_id":"12", phone: "47873454385", name: "直接写入测试8" }) env { job.mode = "STREAMING" checkpoint.interval = 10000 parallelism = 1 }

SeaTunnel Version

2.3.12版本、2.3.13版本

SeaTunnel Config

conf示例:
source {
  MongoDB-CDC {
    # ✅ 建议:在hosts和connection.options中都进行配置,以确保正确连接到副本集
    hosts = "127.0.0.1:27017"
    # 在URI中指定副本集名称和读偏好,强制从Primary读取,确保能获取完整的Change Stream数据
    connection.options = "replicaSet=rs0&readPreference=primary"

    database = ["test-db"]
    collection = ["test-db.t_sys_user"]

    # ✅ 关键优化:设置一个大于0的心跳间隔,防止resume token过期
    # 官方建议当集合变更慢时强烈建议设置此参数 [citation:3]
    heartbeat.interval.ms = 10000
    schema = {
      table = "test-db.t_sys_user"
      fields {
        "_id" : string
        "phone" : string,
        "name" : string
      }
    }
  }
}

sink {
  MongoDB {
    uri = "mongodb://127.0.0.1:27017"
    database = "test-db2"
    collection = "t_sys_user"
    # 🔑 关键修改:使用 distributed_id 作为主键
    primary-key = ["_id"]
    # 兼容旧版本的写法
    #upsert-key = ["_id"]
    upsert-enable = true
    buffer-flush.max-rows = 1000
    buffer-flush.interval = 10000
  }
}

Running Command

String configurePath = args.length > 0 ? args[0] : "/examples/mongoA_to_mongoB.conf";

Error Exception

ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:49)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name
	at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
	at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
	at org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.operationTypeFor(MongoDBConnectorDeserializationSchema.java:183)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.deserialize(MongoDBConnectorDeserializationSchema.java:107)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:201)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter.processElement(MongoDBRecordEmitter.java:79)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:62)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

コントリビューターガイド