apache/seatunnel

[Feature] [Connector] Support Python-based Custom Data Sources As source for SeaTunnel

Open

#10182 opened on Dec 11, 2025

View on GitHub
 (6 comments) (1 reaction) (1 assignee)Java (6,897 stars) (1,432 forks)batch import
good first issuehelp wanted

Description

Background

Currently, SeaTunnel supports so many connectors written in Java, but we lack a native support for Python-based data sources. Many data engineers and data scientists prefer Python for its rich ecosystem of data processing libraries (pandas, requests, boto3, etc.) and ease of use.

This feature request proposes to enable users to write custom data sources in Python while leveraging SeaTunnel's powerful distributed execution engine.

Goals

Phase 1 (MVP)

  • Enable Python scripts to act as SeaTunnel data sources
  • Support basic data exchange between Java and Python processes via Java ProcessBuilder
  • Provide simple configuration and Python script specification
  • Support text-based data serialization first (aligned with LocalFile text format)
  • Handle common data types (string, int, float, boolean, timestamp, etc.)

Architecture Overview

┌─────────────────────────────────────┐
│   SeaTunnel Engine (Java)           │
│  ┌──────────────────────────────┐   │
│  │  PythonSource                │   │
│  │  - createReader()            │   │
│  │  - createEnumerator()        │   │
│  └──────────┬───────────────────┘   │
│             │                        │
│  ┌──────────▼───────────────────┐   │
│  │  PythonSourceReader          │   │
│  │  - pollNext()                │   │
│  │  - snapshotState()           │   │
│  └──────────┬───────────────────┘   │
│             │ IPC (ProcessBuilder)    │
└─────────────┼────────────────────────┘
              │
              │ stdout (line-based)
              │ each line = one record
              │
┌─────────────▼────────────────────────┐
│   Python Process                     │
│  ┌──────────────────────────────┐    │
│  │  User Python Script          │    │
│  │  - generate and print lines  │    │
│  └──────────────────────────────┘    │
└──────────────────────────────────────┘

Example Configuration

source {
  PythonSource {
    python.executable = "python3"
    python.script.path = "/path/to/user_script.py"
    python.script.config = {
      api_url = "https://api.example.com"
      api_key = "xxx"
    }

    # text format, same style as LocalFile connector
    file_format_type = "text"
    field_delimiter = ","

    # latest schema options (SchemaOptions), including columnLength, columnScale, nullable, etc.
    schema = {
      table = "default.python_table"
      columns = [
        {
          name = "id"
          type = "bigint"
          nullable = false
          columnLength = 20
          defaultValue = 0
          comment = "primary key id"
        },
        {
          name = "name"
          type = "string"
          nullable = true
          columnLength = 255
          comment = "user name"
        },
        {
          name = "score"
          type = "double"
          nullable = true
          comment = "user score"
        }
      ]
    }

    result_table_name = "python_table"
  }
}

Example Python Script

#!/usr/bin/env python3
import sys

def main():
    """
    Minimal PythonSource script:
    - Reads optional JSON config from the first line of stdin (produced by PythonSourceReader).
    - Prints one line per record to stdout.
    - Line format is controlled by `file_format_type` and `field_delimiter` in SeaTunnel conf.
    - For Phase 1, we only support `file_format_type = "text"` with a single `field_delimiter`.
    """
    import json

    # Optional: first line from Java may contain JSON-encoded config, including script config.
    first_line = sys.stdin.readline()
    config = {}
    if first_line:
        try:
            config = json.loads(first_line)
        except json.JSONDecodeError:
            # If not JSON, just ignore and treat as no config.
            config = {}

    # User-defined data reading logic; here we just generate mock records.
    # The Python script only cares about printing lines; it does not know SeaTunnelRow or schema.
    # Example output for file_format_type = "text" and field_delimiter = ",":
    #   1,alice,88.1
    #   2,bob,92.5
    #   ...
    for i in range(1, 101):
        record = f"{i},user_{i},{80.0 + i * 0.1}"
        print(record, flush=True)


if __name__ == "__main__":
    main()

# Notes:
# - PythonSource uses line-based output; each line is one record.
# - The output field delimiter (for text format) must be configured in the SeaTunnel conf
#   via `field_delimiter`, following the same semantics as LocalFile text format.
# - The Python script itself only needs to print one line per record; it does not need to
#   know the SeaTunnel schema. `PythonSourceReader` in Java will parse each line according
#   to `file_format_type`, `field_delimiter`, and the configured `schema`, and then convert
#   it into a `SeaTunnelRow`. The goal is to keep the Python script as simple as possible.

Module Structure

seatunnel-connectors-v2/
└── connector-python/
    ├── pom.xml
    └── src/
        ├── main/java/.../python/
        │   ├── source/
        │   │   ├── PythonSource.java
        │   │   ├── PythonSourceReader.java
        │   │   └── PythonSourceSplitEnumerator.java
        │   ├── config/PythonSourceConfig.java
        │   └── serialize/JsonToRowDeserializer.java
        └── test/
            └── resources/python/
                └── test_source.py

Acceptance Criteria

The following acceptance criteria are defined for the Python Source connector:

1. Core Functionality

1.1 Configuration and Startup

  • Configuration parsing: The connector correctly reads PythonSource configuration options, including:
    • python.executable (path to the Python interpreter)
    • python.script.path (path to the user Python script)
    • python.script.config (configuration/parameters passed to the script)
    • file_format_type (for Phase 1, support at least "text")
    • field_delimiter (text field delimiter, same semantics as LocalFile text)
    • schema (using the latest SchemaOptions, including columns, columnLength, columnScale, nullable, etc.)
  • Process management: The Java side starts the Python subprocess via ProcessBuilder and successfully establishes stdin/stdout communication streams.

1.2 Data Stream and Parsing

  • Line-based output: The Python script outputs one record per line to stdout. For file_format_type = "text", fields in a line are separated by the configured field_delimiter.
  • Schema-driven parsing: PythonSourceReader reads each line, splits it according to field_delimiter, and converts fields to Java types based on the configured schema.columns[i].type.
  • End-of-stream handling: When the Python script finishes and the stdout stream is closed, PythonSourceReader detects end-of-stream and gracefully completes.

1.3 Data Type Support

Text fields must map correctly (via parsing in PythonSourceReader) to SeaTunnel (Java) types defined in schema:

  • string -> STRING (Java String)
  • int / bigint -> INT / BIGINT (Java Integer / Long)
  • float / double -> FLOAT / DOUBLE (Java Float / Double)
  • boolean -> BOOLEAN (Java Boolean)
  • Complex types (array / map / row) can be added in later phases by reusing SeaTunnel’s existing type system and parsing rules.

2. Robustness

  • Error handling: If the Python script throws an exception or exits with a non-zero code, the SeaTunnel job fails with an exception instead of hanging or silently ignoring the error.
  • Log forwarding: stderr output from the Python script is redirected or recorded into SeaTunnel worker logs to facilitate troubleshooting.
  • Environment validation: Before startup, the connector verifies that the Python environment and script path exist; if not, it fails fast with a clear error.

3. Usability

  • Documentation: Documentation is provided describing how to configure Python environment dependencies, PythonSource parameters, and how PythonSourceReader converts lines into SeaTunnelRow based on the schema.

4. Testing

  • Unit tests: Mock-based tests are added for PythonSourceReader to validate:
    • Configuration parsing (including schema, file_format_type, field_delimiter)
    • Line splitting and type conversion according to schema
    • End-of-stream detection and error handling
  • Integration tests (E2E):
    • A real Python script is implemented (for example, generating mock data as comma-separated lines).
    • A SeaTunnel job is run that uses PythonSource to read from this script with file_format_type = "text" and an explicit field_delimiter.
    • The job successfully writes data to a Console or Assert sink, and both data content and types are verified as correct.

Contributor guide