apache/seatunnel

[Bug] [MongoDB-CDC] Bug DataException: operationType is not a valid field name

Open

#11.008 aberto em 4 de jun. de 2026

Ver no GitHub
 (1 comment) (0 reactions) (0 assignees)Java (1.432 forks)batch import
bughelp wanted

Métricas do repositório

Stars
 (6.897 stars)
Métricas de merge de PR
 (Mesclagem média 23d) (98 fundiu PRs em 30d)

Description

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

2.3.12版本、2.3.13版本均出现Mongo-CDC同步时,Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name

例如test-db to test-db2,表t_sys_user, 数据示例: db.t_sys_user.insertOne({ "_id":"12", phone: "47873454385", name: "直接写入测试8" }) env { job.mode = "STREAMING" checkpoint.interval = 10000 parallelism = 1 }

SeaTunnel Version

2.3.12版本、2.3.13版本

SeaTunnel Config

conf示例:
source {
  MongoDB-CDC {
    # ✅ 建议:在hosts和connection.options中都进行配置,以确保正确连接到副本集
    hosts = "127.0.0.1:27017"
    # 在URI中指定副本集名称和读偏好,强制从Primary读取,确保能获取完整的Change Stream数据
    connection.options = "replicaSet=rs0&readPreference=primary"

    database = ["test-db"]
    collection = ["test-db.t_sys_user"]

    # ✅ 关键优化:设置一个大于0的心跳间隔,防止resume token过期
    # 官方建议当集合变更慢时强烈建议设置此参数 [citation:3]
    heartbeat.interval.ms = 10000
    schema = {
      table = "test-db.t_sys_user"
      fields {
        "_id" : string
        "phone" : string,
        "name" : string
      }
    }
  }
}

sink {
  MongoDB {
    uri = "mongodb://127.0.0.1:27017"
    database = "test-db2"
    collection = "t_sys_user"
    # 🔑 关键修改:使用 distributed_id 作为主键
    primary-key = ["_id"]
    # 兼容旧版本的写法
    #upsert-key = ["_id"]
    upsert-enable = true
    buffer-flush.max-rows = 1000
    buffer-flush.interval = 10000
  }
}

Running Command

String configurePath = args.length > 0 ? args[0] : "/examples/mongoA_to_mongoB.conf";

Error Exception

ERROR org.apache.seatunnel.core.starter.SeaTunnel - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:49)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.kafka.connect.errors.DataException: operationType is not a valid field name
	at org.apache.kafka.connect.data.Struct.lookupField(Struct.java:254)
	at org.apache.kafka.connect.data.Struct.getCheckType(Struct.java:261)
	at org.apache.kafka.connect.data.Struct.getString(Struct.java:158)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.operationTypeFor(MongoDBConnectorDeserializationSchema.java:183)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema.deserialize(MongoDBConnectorDeserializationSchema.java:107)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:201)
	at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.source.MongoDBRecordEmitter.processElement(MongoDBRecordEmitter.java:79)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:102)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:62)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:110)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:114)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:842)

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Guia do colaborador