apache/seatunnel
Vedi su GitHub[Bug] [MongoDB-CDC] Bug DataException: operationType is not a valid field name
Open
#11.008 aperta il 4 giu 2026
bughelp wanted
Metriche repository
- Star
- (6897 star)
- Metriche merge PR
- (Merge medio 23g) (98 PR mergiate in 30 g)
Descrizione
Search before asking
- I had searched in the issues and found no similar issues.
What happened
2.3.12版本、2.3.13版本均出现Mongo-CDC同步时,Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name
例如test-db to test-db2,表t_sys_user, 数据示例: db.t_sys_user.insertOne({ "_id":"12", phone: "47873454385", name: "直接写入测试8" }) env { job.mode = "STREAMING" checkpoint.interval = 10000 parallelism = 1 }
SeaTunnel Version
2.3.12版本、2.3.13版本
SeaTunnel Config
conf示例:
source {
MongoDB-CDC {
# ✅ 建议:在hosts和connection.options中都进行配置,以确保正确连接到副本集
hosts = "127.0.0.1:27017"
# 在URI中指定副本集名称和读偏好,强制从Primary读取,确保能获取完整的Change Stream数据
connection.options = "replicaSet=rs0&readPreference=primary"
database = ["test-db"]
collection = ["test-db.t_sys_user"]
# ✅ 关键优化:设置一个大于0的心跳间隔,防止resume token过期
# 官方建议当集合变更慢时强烈建议设置此参数 [citation:3]
heartbeat.interval.ms = 10000
schema = {
table = "test-db.t_sys_user"
fields {
"_id" : string
"phone" : string,
"name" : string
}
}
}
}
sink {
MongoDB {
uri = "mongodb://127.0.0.1:27017"
database = "test-db2"
collection = "t_sys_user"
# 🔑 关键修改:使用 distributed_id 作为主键
primary-key = ["_id"]
# 兼容旧版本的写法
#upsert-key = ["_id"]
upsert-enable = true
buffer-flush.max-rows = 1000
buffer-flush.interval = 10000
}
}
Running Command
String configurePath = args.length > 0 ? args[0] : "/examples/mongoA_to_mongoB.conf";
Error Exception
ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:49)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name
at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
at org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.operationTypeFor(MongoDBConnectorDeserializationSchema.java:183)
at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.deserialize(MongoDBConnectorDeserializationSchema.java:107)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:201)
at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter.processElement(MongoDBRecordEmitter.java:79)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:62)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114)
at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:842)
at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct