apache/seatunnel
View on GitHub[Feature] [Connector] Support Python-based Custom Data Sources As source for SeaTunnel
Open
#10182 opened on Dec 11, 2025
good first issuehelp wanted
Description
Background
Currently, SeaTunnel supports so many connectors written in Java, but we lack a native support for Python-based data sources. Many data engineers and data scientists prefer Python for its rich ecosystem of data processing libraries (pandas, requests, boto3, etc.) and ease of use.
This feature request proposes to enable users to write custom data sources in Python while leveraging SeaTunnel's powerful distributed execution engine.
Goals
Phase 1 (MVP)
- Enable Python scripts to act as SeaTunnel data sources
- Support basic data exchange between Java and Python processes via Java ProcessBuilder
- Provide simple configuration and Python script specification
- Support text-based data serialization first (aligned with LocalFile
textformat) - Handle common data types (string, int, float, boolean, timestamp, etc.)
Architecture Overview
┌─────────────────────────────────────┐
│ SeaTunnel Engine (Java) │
│ ┌──────────────────────────────┐ │
│ │ PythonSource │ │
│ │ - createReader() │ │
│ │ - createEnumerator() │ │
│ └──────────┬───────────────────┘ │
│ │ │
│ ┌──────────▼───────────────────┐ │
│ │ PythonSourceReader │ │
│ │ - pollNext() │ │
│ │ - snapshotState() │ │
│ └──────────┬───────────────────┘ │
│ │ IPC (ProcessBuilder) │
└─────────────┼────────────────────────┘
│
│ stdout (line-based)
│ each line = one record
│
┌─────────────▼────────────────────────┐
│ Python Process │
│ ┌──────────────────────────────┐ │
│ │ User Python Script │ │
│ │ - generate and print lines │ │
│ └──────────────────────────────┘ │
└──────────────────────────────────────┘
Example Configuration
source {
PythonSource {
python.executable = "python3"
python.script.path = "/path/to/user_script.py"
python.script.config = {
api_url = "https://api.example.com"
api_key = "xxx"
}
# text format, same style as LocalFile connector
file_format_type = "text"
field_delimiter = ","
# latest schema options (SchemaOptions), including columnLength, columnScale, nullable, etc.
schema = {
table = "default.python_table"
columns = [
{
name = "id"
type = "bigint"
nullable = false
columnLength = 20
defaultValue = 0
comment = "primary key id"
},
{
name = "name"
type = "string"
nullable = true
columnLength = 255
comment = "user name"
},
{
name = "score"
type = "double"
nullable = true
comment = "user score"
}
]
}
result_table_name = "python_table"
}
}
Example Python Script
#!/usr/bin/env python3
import sys
def main():
"""
Minimal PythonSource script:
- Reads optional JSON config from the first line of stdin (produced by PythonSourceReader).
- Prints one line per record to stdout.
- Line format is controlled by `file_format_type` and `field_delimiter` in SeaTunnel conf.
- For Phase 1, we only support `file_format_type = "text"` with a single `field_delimiter`.
"""
import json
# Optional: first line from Java may contain JSON-encoded config, including script config.
first_line = sys.stdin.readline()
config = {}
if first_line:
try:
config = json.loads(first_line)
except json.JSONDecodeError:
# If not JSON, just ignore and treat as no config.
config = {}
# User-defined data reading logic; here we just generate mock records.
# The Python script only cares about printing lines; it does not know SeaTunnelRow or schema.
# Example output for file_format_type = "text" and field_delimiter = ",":
# 1,alice,88.1
# 2,bob,92.5
# ...
for i in range(1, 101):
record = f"{i},user_{i},{80.0 + i * 0.1}"
print(record, flush=True)
if __name__ == "__main__":
main()
# Notes:
# - PythonSource uses line-based output; each line is one record.
# - The output field delimiter (for text format) must be configured in the SeaTunnel conf
# via `field_delimiter`, following the same semantics as LocalFile text format.
# - The Python script itself only needs to print one line per record; it does not need to
# know the SeaTunnel schema. `PythonSourceReader` in Java will parse each line according
# to `file_format_type`, `field_delimiter`, and the configured `schema`, and then convert
# it into a `SeaTunnelRow`. The goal is to keep the Python script as simple as possible.
Module Structure
seatunnel-connectors-v2/
└── connector-python/
├── pom.xml
└── src/
├── main/java/.../python/
│ ├── source/
│ │ ├── PythonSource.java
│ │ ├── PythonSourceReader.java
│ │ └── PythonSourceSplitEnumerator.java
│ ├── config/PythonSourceConfig.java
│ └── serialize/JsonToRowDeserializer.java
└── test/
└── resources/python/
└── test_source.py
Acceptance Criteria
The following acceptance criteria are defined for the Python Source connector:
1. Core Functionality
1.1 Configuration and Startup
- Configuration parsing: The connector correctly reads
PythonSourceconfiguration options, including:python.executable(path to the Python interpreter)python.script.path(path to the user Python script)python.script.config(configuration/parameters passed to the script)file_format_type(for Phase 1, support at least"text")field_delimiter(text field delimiter, same semantics as LocalFile text)schema(using the latest SchemaOptions, includingcolumns,columnLength,columnScale,nullable, etc.)
- Process management: The Java side starts the Python subprocess via
ProcessBuilderand successfully establishes stdin/stdout communication streams.
1.2 Data Stream and Parsing
- Line-based output: The Python script outputs one record per line to stdout. For
file_format_type = "text", fields in a line are separated by the configuredfield_delimiter. - Schema-driven parsing:
PythonSourceReaderreads each line, splits it according tofield_delimiter, and converts fields to Java types based on the configuredschema.columns[i].type. - End-of-stream handling: When the Python script finishes and the stdout stream is closed,
PythonSourceReaderdetects end-of-stream and gracefully completes.
1.3 Data Type Support
Text fields must map correctly (via parsing in PythonSourceReader) to SeaTunnel (Java) types defined in schema:
-
string->STRING(JavaString) -
int/bigint->INT/BIGINT(JavaInteger/Long) -
float/double->FLOAT/DOUBLE(JavaFloat/Double) -
boolean->BOOLEAN(JavaBoolean) - Complex types (array / map / row) can be added in later phases by reusing SeaTunnel’s existing type system and parsing rules.
2. Robustness
- Error handling: If the Python script throws an exception or exits with a non-zero code, the SeaTunnel job fails with an exception instead of hanging or silently ignoring the error.
- Log forwarding:
stderroutput from the Python script is redirected or recorded into SeaTunnel worker logs to facilitate troubleshooting. - Environment validation: Before startup, the connector verifies that the Python environment and script path exist; if not, it fails fast with a clear error.
3. Usability
- Documentation: Documentation is provided describing how to configure Python environment dependencies,
PythonSourceparameters, and howPythonSourceReaderconverts lines intoSeaTunnelRowbased on the schema.
4. Testing
- Unit tests: Mock-based tests are added for
PythonSourceReaderto validate:- Configuration parsing (including
schema,file_format_type,field_delimiter) - Line splitting and type conversion according to schema
- End-of-stream detection and error handling
- Configuration parsing (including
- Integration tests (E2E):
- A real Python script is implemented (for example, generating mock data as comma-separated lines).
- A SeaTunnel job is run that uses
PythonSourceto read from this script withfile_format_type = "text"and an explicitfield_delimiter. - The job successfully writes data to a
ConsoleorAssertsink, and both data content and types are verified as correct.