apache/seatunnel

[feature][JDBC] JDBC Source Dialect-Aware Sampled Balanced Sharding (Database-Side Sampling)

Open

#10288 opened on Jan 6, 2026

View on GitHub
 (2 comments) (1 reaction) (1 assignee)Java (6,897 stars) (1,432 forks)batch import
help wanted

Description

Search before asking

  • I had searched in the feature and found no similar feature requirement.

Description

Goal: Use database-side sampling capabilities (Oracle SAMPLE, Postgres TABLESAMPLE, etc.) plus quantile boundaries to reduce split generation cost and improve parallel balance for highly skewed data.

0. Background: Why we need a “faster and more balanced” splitter

The current splitter algorithms can already cover most extraction scenarios, but there is still room for improvement in some extreme cases.

In practice, we have very large data sources (e.g., Oracle / PostgreSQL). A single table can be ~300GB with ~1B rows, and the ID column is not auto-increment (e.g., government ID-like numbers). In such cases the split key distribution is highly skewed, and the existing splitter strategies are not always cost-effective.

When JDBC Source reads in parallel using partition_column, it must first split the full table (or query result) into multiple splits. The splitting strategy directly impacts:

  • Job startup time (database pressure / time cost during split planning)
  • Parallelism utilization (many empty splits vs. a few long-tail splits)
  • Stability (many SQLs vs. a long scan)

When the split key is severely skewed (large gaps, hot/cold regions, value range much larger than row count), simply dividing the value range evenly often leads to unbalanced workloads. SeaTunnel already has optimizations for uneven distributions (DynamicChunkSplitter + sampling sharding), but there is still an opportunity to reduce the cost of split planning by leveraging database-native sampling.


1. Current approach: SeaTunnel DynamicChunkSplitter

Note: DynamicChunkSplitter targets split.size (rows per split) and switches strategies based on data distribution.

At a high level, DynamicChunkSplitter is a trade-off between “fewer SQLs” and “less scanning”:

  • If shard count is very large: it prefers scanning the split key column once (sampling sharding) rather than issuing thousands of queryNextChunkMax SQLs.
  • If shard count is not large: it prefers multiple small boundary queries to avoid a full scan.

2. Limitations of the current approach

2.1 Cost of sampling sharding: it often scans the entire split-key column (or full query result)

The SQL of sampleDataFromColumn is essentially SELECT split_col FROM ..., and the client must iterate the ResultSet to “take 1 row every N rows”. This means:

  • For huge tables: split planning can still trigger a long full scan (although only one column and can be streamed).
  • The driver/network still streams all rows to the client (the client just discards most of them).
  • The sample can still be large: sample size is roughly rowCount / inverseSamplingRate, and the client needs to store and sort the sample.

2.2 Sample representativeness: taking “1 every N rows” is not truly random sampling

The current sampling is a form of systematic sampling based on the ResultSet order, rather than random sampling at the database level:

  • In extreme cases it may still produce unbalanced splits (e.g., hot data concentrated in certain physical ranges).

3. Proposed idea: use database-side sampling to get representative samples/boundaries faster

3.1 Core goal

During split planning, push “sampling” down to the database, so the database returns only a small number of samples / boundaries:

  • Avoid scanning the entire split-key column
  • Reduce network transfer
  • Shorten job startup time
  • Keep the “sampling-based balanced splitting” effect for skewed data

3.2 Implementation pattern

Example pseudo SQL for Oracle/Postgres (dialects vary by database):

WITH sampled AS (
  SELECT col
  FROM table SAMPLE(<percent>)
  WHERE col IS NOT NULL
),
buckets AS (
  SELECT NTILE(<bucket_number>) OVER (ORDER BY col) AS bucket_no, col
  FROM sampled
)
SELECT MAX(col) AS boundary
FROM buckets
WHERE bucket_no < <bucket_number>
GROUP BY bucket_no
ORDER BY bucket_no;

Advantages: only returns bucket_number - 1 boundary values; lower client overhead; faster planning for huge tables.

Risks: relies on window functions/CTE; support varies across databases and versions, so this needs per-dialect implementations and a robust fallback.

3.3 Applicable scenarios & expected benefits

Best-fit scenarios:

  • Very large tables (10M/100M/1B rows) with obvious skew on the split key
  • Databases with efficient sampling (e.g., Oracle/Postgres block-level sampling), with dialect-specific support
  • Users care about database pressure and time cost during job startup

Expected benefits:

  • Network transfer goes from “stream all rows and drop most” to “return only samples/boundaries”
  • The number of splits can be controlled by bucket_number, making it easier to align with source parallelism

4. Risks and constraints

  • Database pressure: for huge tables, if sampling percentage is set too high, database-side sampling can still be costly. In our tests, 0.1% sampling is manageable for billion-row tables.
  • Approximation: sampling is inherently approximate; usually acceptable.
  • Reproducibility: random sampling can produce different boundaries between runs; usually acceptable because splitting is only a parallel read strategy, but should be documented clearly.

5. Configurations

  • split.sampled_balanced_sharding: enable database-side sampled balanced sharding (default false)
  • split.sampling_percentage: sampling ratio in (0.0, 1.0] (default 0.001, i.e. 0.1%)
  • split.bucket_number: target bucket/split count (default 10; must be >= 1; 1 means no splitting)

Usage Scenario

No response

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Contributor guide