apache/seatunnel

Added transform data cannot be synchronized

Open

#9723 opened on Aug 19, 2025

View on GitHub
 (3 comments) (0 reactions) (1 assignee)Java (6,897 stars) (1,432 forks)batch import
bughelp wanted

Description

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

For the same configuration file, there is no response to data synchronization after transform is added, and synchronization can be completed in 14 seconds after transfrom is removed. The data set size is 250W,transform only the timestamp of several fields is simply processed and converted into standard time format. I have encountered the same situation when dealing with json before.

在DS上运行seatunnel脚本时候发现任务无报错,不结束,通过job日志发现transfrom中的from_unixtim函数读取int类型字段报错,java.lang.Integer cannot be cast to java.lang.Long必须要把int字段现转换成bigint以后数据才能正常处理

SeaTunnel Version

2.3.11

SeaTunnel Config

env {
    parallelism = 10
    job.mode = "BATCH"
    job.name = "weiq_cpt_task"
}
source {
  jdbc {
    url = "jdbc:mysql://xxx:3306/weiq?useSSL=false&useCursorFetch=true&useServerPrepStmts=true&fetchSize=2000&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "xxxx"
    password = "xxxx"
    table_path = "weiq.weiq_cpt_task"
    split.size = 50000
    fetch_size = 2000
    plugin_output = "mysql_table"
  }
}
transform {
  Sql{
    plugin_input = "mysql_table"
    plugin_output = "sql_table"
    query = """
      SELECT
        id
      , taskname
      , wxtitle
      , wx_author
      , paytype
      , type
      , plattype
      , category
      , FROM_UNIXTIME(createtime, 'yyyy-MM-dd HH:mm:ss') createtime
      , FROM_UNIXTIME(audittime, 'yyyy-MM-dd HH:mm:ss') audittime
      , FROM_UNIXTIME(canceltime, 'yyyy-MM-dd HH:mm:ss') canceltime
      , FROM_UNIXTIME(deltime, 'yyyy-MM-dd HH:mm:ss') deltime
      , FROM_UNIXTIME(starttime, 'yyyy-MM-dd HH:mm:ss') starttime
      , FROM_UNIXTIME(stoptime, 'yyyy-MM-dd HH:mm:ss') stoptime
      , is_fee
      , sort
      , FROM_UNIXTIME(endtime, 'yyyy-MM-dd HH:mm:ss') endtime
      , FROM_UNIXTIME(realusetime, 'yyyy-MM-dd HH:mm:ss') realusetime
      , status
      , payerid
      , agentid
      , saleid
      , managerid
      , reason
      , resource_status
      , feestatus
      , FROM_UNIXTIME(feetime, 'yyyy-MM-dd HH:mm:ss') feetime
      , operatorid
      , mediamanagerid
      , identifier
      , planid
      , affiliatedid
      , idstr
      , source
      , customerid
      , customer_remarksid
      , pid_complex
      , discount_rate
      , discount_code
      , coupon_id
      , isonline
      , isdnr
      , remarks
      , process_type
      , bd_brand_id
      , weiq_brand_id
      , kf_brand_id
      , purchase_name
    from mysql_table
    """
  }
}
sink {
  Hive {
    plugin_input = "sql_table"
    table_name = "temp.org_task_cpt_weiq"
    metastore_uri = "thrift://xxxx:9083"
    hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml"
  }
}

Running Command

/bin/seatunnel.sh --config /tmp/dolphinscheduler/exec/process/root/8400880170624/15089838574336_106/135251/491568/seatunnel_135251_491568.conf --deploy-mode cluster

Error Exception

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction.fromUnixTime(DateTimeFunction.java:557)
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.executeFunctionExpr(ZetaSQLFunction.java:535)
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.computeForValue(ZetaSQLFunction.java:329)
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.project(ZetaSQLEngine.java:283)
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.transformBySQL(ZetaSQLEngine.java:249)
	at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:110)
	at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:46)
	at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80)
	at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:44)
	at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:28)
	at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:41)
	at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:28)
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:146)
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122)
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43)
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195)
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112)
	at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.pollNext(JdbcSourceReader.java:70)
	at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
	at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
	at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Zeta or Flink or Spark Version

Zeta

Java or Scala Version

No response

Screenshots

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide