apache/seatunnel

[Bug] [transform-v2] An error occurs after adding transform table_filter

Open

#9,583 opened on Jul 17, 2025

View on GitHub
 (1 comment) (0 reactions) (0 assignees)Java (6,897 stars) (1,432 forks)batch import
bughelp wanted

Description

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

加上transform table_filter后就报错,

经过调试源码,发现加了transform后,sink就不在map里面

SeaTunnel Version

目前数影seatunnel2.3.11

SeaTunnel Config

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######

env {
  # You can set SeaTunnel environment configuration here
	parallelism = 1
    job.mode = "STREAMING"
    checkpoint.interval = 5000
    read_limit.bytes_per_second=70000000
    read_limit.rows_per_second=100000
}

source {
MySQL-CDC {
	server-id = 11
    username = "root"
    password = "root%123"
	database-names = [""]
    table-pattern = "test2.*\\.*.*"
    base-url = "jdbc:mysql://hadoop001:3306/test2"
    startup.mode="INITIAL"
	snapshot.split.size	= 8096
	snapshot.fetch.size=4096
	server-time-zone="Asia/Shanghai"
	exactly_once=true
    schema-changes.enabled = true
    plugin_output = "source1"
	}
}

transform {
    TableFilter {
        plugin_input = "source1"
        plugin_output = "transform_1"
        database_pattern = "test2"
        table_pattern = "test2.student"
        pattern_mode = "EXCLUDE"
    }
}

sink {
 StarRocks {
 plugin_input = "transform_1"
    nodeUrls = []
    base-url = "jdbc:mysql://xx:xxx"
    username = xxx
    password = "xxx"
    database = "xxx"
	max_retries=10
    batch_max_rows = 100000
	schema_save_mode="CREATE_SCHEMA_WHEN_NOT_EXIST"
    starrocks.config = {
      format = "JSON"
      strip_outer_array = true
    }
	enable_upsert_delete = true
	save_mode_create_template = """
    CREATE TABLE IF NOT EXISTS xxx.`${table_name}` (
        ${rowtype_primary_key},
        ${rowtype_fields}
        ) ENGINE=OLAP
        PRIMARY KEY (${rowtype_primary_key})
        DISTRIBUTED BY HASH (${rowtype_primary_key})
        PROPERTIES (
                "replication_num" = "2",
                "in_memory" = "false",
                "enable_persistent_index" = "true",
                "replicated_storage" = "true",
                "compression" = "LZ4"
          )
    """
  }
}

Running Command

idea本地运行

Error Exception

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
	at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
	at org.apache.seatunnel.example.engine.SeaTunnelEngineLocalExample.main(SeaTunnelEngineLocalExample.java:49)
Caused by: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0
	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.tryGenerateMultiTableSink(MultipleTableJobConfigParser.java:642)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSink(MultipleTableJobConfigParser.java:605)
	at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:240)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:123)
	at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:191)
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:165)

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide