bughelp wanted
Description
Search before asking
- I had searched in the issues and found no similar issues.
What happened
For the same configuration file, there is no response to data synchronization after transform is added, and synchronization can be completed in 14 seconds after transfrom is removed. The data set size is 250W,transform only the timestamp of several fields is simply processed and converted into standard time format. I have encountered the same situation when dealing with json before.
在DS上运行seatunnel脚本时候发现任务无报错,不结束,通过job日志发现transfrom中的from_unixtim函数读取int类型字段报错,java.lang.Integer cannot be cast to java.lang.Long必须要把int字段现转换成bigint以后数据才能正常处理
SeaTunnel Version
2.3.11
SeaTunnel Config
env {
parallelism = 10
job.mode = "BATCH"
job.name = "weiq_cpt_task"
}
source {
jdbc {
url = "jdbc:mysql://xxx:3306/weiq?useSSL=false&useCursorFetch=true&useServerPrepStmts=true&fetchSize=2000&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
driver = "com.mysql.cj.jdbc.Driver"
user = "xxxx"
password = "xxxx"
table_path = "weiq.weiq_cpt_task"
split.size = 50000
fetch_size = 2000
plugin_output = "mysql_table"
}
}
transform {
Sql{
plugin_input = "mysql_table"
plugin_output = "sql_table"
query = """
SELECT
id
, taskname
, wxtitle
, wx_author
, paytype
, type
, plattype
, category
, FROM_UNIXTIME(createtime, 'yyyy-MM-dd HH:mm:ss') createtime
, FROM_UNIXTIME(audittime, 'yyyy-MM-dd HH:mm:ss') audittime
, FROM_UNIXTIME(canceltime, 'yyyy-MM-dd HH:mm:ss') canceltime
, FROM_UNIXTIME(deltime, 'yyyy-MM-dd HH:mm:ss') deltime
, FROM_UNIXTIME(starttime, 'yyyy-MM-dd HH:mm:ss') starttime
, FROM_UNIXTIME(stoptime, 'yyyy-MM-dd HH:mm:ss') stoptime
, is_fee
, sort
, FROM_UNIXTIME(endtime, 'yyyy-MM-dd HH:mm:ss') endtime
, FROM_UNIXTIME(realusetime, 'yyyy-MM-dd HH:mm:ss') realusetime
, status
, payerid
, agentid
, saleid
, managerid
, reason
, resource_status
, feestatus
, FROM_UNIXTIME(feetime, 'yyyy-MM-dd HH:mm:ss') feetime
, operatorid
, mediamanagerid
, identifier
, planid
, affiliatedid
, idstr
, source
, customerid
, customer_remarksid
, pid_complex
, discount_rate
, discount_code
, coupon_id
, isonline
, isdnr
, remarks
, process_type
, bd_brand_id
, weiq_brand_id
, kf_brand_id
, purchase_name
from mysql_table
"""
}
}
sink {
Hive {
plugin_input = "sql_table"
table_name = "temp.org_task_cpt_weiq"
metastore_uri = "thrift://xxxx:9083"
hdfs_site_path = "/etc/hadoop/conf/hdfs-site.xml"
}
}
Running Command
/bin/seatunnel.sh --config /tmp/dolphinscheduler/exec/process/root/8400880170624/15089838574336_106/135251/491568/seatunnel_135251_491568.conf --deploy-mode cluster
Error Exception
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at org.apache.seatunnel.transform.sql.zeta.functions.DateTimeFunction.fromUnixTime(DateTimeFunction.java:557)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.executeFunctionExpr(ZetaSQLFunction.java:535)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.computeForValue(ZetaSQLFunction.java:329)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.project(ZetaSQLEngine.java:283)
at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.transformBySQL(ZetaSQLEngine.java:249)
at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:110)
at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:46)
at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80)
at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:44)
at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:28)
at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:41)
at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:28)
at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:146)
at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122)
at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43)
at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195)
at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112)
at org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceReader.pollNext(JdbcSourceReader.java:70)
at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Zeta or Flink or Spark Version
Zeta
Java or Scala Version
No response
Screenshots
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct