apache/seatunnel

[Bug] [PostgreSQL CDC] PostgreSQL synchronization to Redis error

Open

#10284 opened on Jan 6, 2026

View on GitHub
 (1 comment) (0 reactions) (0 assignees)Java (6,897 stars) (1,432 forks)batch import
bughelp wanted

Description

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

conf `env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 10000
checkpoint.timeout = 60000
}

source {
Postgres-CDC {
plugin_output = "postgres_cdc_stream"
username = "xxx"
password = "xxx"
database-names = ["sd-admin"]
schema-names = ["public"]
table-names = ["sd-admin.public.system_users"]
url = "jdbc:postgresql://xxx:5432/sd-admin?loggerLevel=OFF"

startup.mode = "initial"  
  

}
}

sink {
Redis {
host = "xxxx"
port = 6379
auth = "xxx"
key = "user:{id}"
support_custom_key = true
data_type = "key"
format = "json"
expire = 86400
}
} ` Error log in attachment

cdc-output.log

SeaTunnel Version

apache-seatunnel-2.3.12

SeaTunnel Config

env {  
  parallelism = 1  
  job.mode = "STREAMING"  
  checkpoint.interval = 10000  
  checkpoint.timeout = 60000  
}  
  
source {  
  Postgres-CDC {  
    plugin_output = "postgres_cdc_stream"  
    username = "xxx"  
    password = "xxx"  
    database-names = ["sd-admin"]  
    schema-names = ["public"]  
    table-names = ["sd-admin.public.system_users"]  
    url = "jdbc:postgresql://xxx:5432/sd-admin?loggerLevel=OFF"  
      
    startup.mode = "initial"  
      
  }  
}  
  
sink {  
  Redis {  
    host = "xxxx"  
    port = 6379  
    auth = "xxx"  
    key = "user:{id}"  
    support_custom_key = true  
    data_type = "key"  
    format = "json"  
    expire = 86400  
  }  
}

Running Command

bin/seatunnel.sh --config config/pg-redis-cdc.conf -m local 2>&1 | tee cdc-output.log

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:545)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:328)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1090)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:614)
	at java.base/java.lang.Thread.run(Thread.java:1474)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
	... 5 more
Caused by: java.io.IOException: Source fetch execution was fail
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:73)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162)
	... 6 more
Caused by: java.io.IOException: org.apache.seatunnel.common.utils.SeaTunnelException: Read split SnapshotSplit(tableId=sd-admin.public.system_users, splitKeyType=ROW<id BIGINT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) error due to java.lang.NullPointerException: Cannot invoke "io.debezium.relational.Table.id()" because "table" is null.
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:78)
	at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54)
	... 7 more
Caused by: org.apache.seatunnel.common.utils.SeaTunnelException: Read split SnapshotSplit(tableId=sd-admin.public.system_users, splitKeyType=ROW<id BIGINT>, splitStart=null, splitEnd=null, lowWatermark=null, highWatermark=null) error due to java.lang.NullPointerException: Cannot invoke "io.debezium.relational.Table.id()" because "table" is null.
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:216)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:117)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:75)
	... 8 more
Caused by: io.debezium.DebeziumException: java.lang.NullPointerException: Cannot invoke "io.debezium.relational.Table.id()" because "table" is null
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.execute(PostgresSnapshotSplitReadTask.java:112)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotFetchTask.execute(PostgresSnapshotFetchTask.java:65)
	at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:96)
	... 5 more
Caused by: java.lang.NullPointerException: Cannot invoke "io.debezium.relational.Table.id()" because "table" is null
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.createDataEventsForTable(PostgresSnapshotSplitReadTask.java:183)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.createDataEvents(PostgresSnapshotSplitReadTask.java:170)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.doExecute(PostgresSnapshotSplitReadTask.java:136)
	at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask.execute(PostgresSnapshotSplitReadTask.java:107)
	... 7 more

	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
	... 2 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

java 21

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide