apache/seatunnel

[Feature][Connector] Add Salesforce Source Connector

Open

#10356 opened on Jan 17, 2026

View on GitHub
 (7 comments) (0 reactions) (1 assignee)Java (6,897 stars) (1,432 forks)batch import
good first issuehelp wanted

Description

Background

Salesforce is the world's leading Customer Relationship Management (CRM) platform with over 20% market share globally. It serves as the single source of truth for customer data, sales opportunities, service cases, and marketing campaigns across millions of enterprises.

Currently, SeaTunnel lacks native support for Salesforce as a data source, preventing users from building data pipelines that integrate CRM data with their data warehouses and analytics platforms.

Motivation

  • Market Leader: Salesforce dominates the enterprise CRM space.
  • API-Only Access: Salesforce uses REST/SOAP APIs exclusively - there is no JDBC support.
  • Multi-Object Integration: Enterprises need to sync multiple Salesforce objects (Account, Contact, Opportunity, Case, etc.) simultaneously.

Proposed Solution

Implement a dedicated Salesforce Source connector using Salesforce REST API and Bulk API 2.0 with multi-object support.

This connector will follow SeaTunnel's standard multi-table configuration (aligned with JDBC Source) using table_list and table_path.

Core Features

  1. Multi-Object Support (Standardized)

    • Use table_list standard parameter for multi-object definition.
    • Use table_path (format: database.object_name) where database can be flexible (e.g., Salesforce instance name or org ID) and object_name is the Salesforce object name (e.g. Account).
    • Support specialized configuration per object (fields, filters).
  2. API Support

    • Rest API
    • Bulk API 2.0

Configuration Examples

Multi-Object Configuration (Standardized)

env {
  parallelism = 2
  job.mode = "BATCH"
}

source {
  Salesforce {
    # Authentication
    auth_type = "oauth2_jwt"
    client_id = "your_connected_app_client_id"
    client_secret = "your_client_secret"
    username = "user@company.com"
    private_key_file = "/path/to/private-key.pem"
    
    instance_url = "https://yourinstance.salesforce.com"
    api_version = "v59.0"
    
    # Multi-object standard configuration
    table_list = [
      {
        # Standard table_path format: database.object
        # database implies the logical source, object is the Salesforce Object
        table_path = "salesforce.Account"
        
        # Object specific settings
        extraction_mode = "incremental"
        incremental_field = "LastModifiedDate"
        start_date = "2024-01-01T00:00:00Z"
        
        # Field selection
        fields = ["Id", "Name", "Industry", "AnnualRevenue", "CreatedDate", "LastModifiedDate"]
        
        # Filtering
        filter = "Industry != null AND AnnualRevenue > 1000000"
      },
      {
        table_path = "salesforce.Contact"
        extraction_mode = "incremental"
        incremental_field = "LastModifiedDate"
        fetch_all_fields = true
      },
      {
        table_path = "salesforce.Opportunity"
        extraction_mode = "full"
        
        # Custom SOQL query (optional override)
        soql_query = "SELECT Id, Name, Amount, StageName FROM Opportunity WHERE StageName = 'Closed Won'"
      }
    ]
    
    # Global settings
    max_retries = 3
    request_timeout_ms = 60000
    enable_parallel_extraction = true
  }
}

sink {
  Console {}
}

CDC Multi-Object Configuration

source {
  Salesforce {
    # ... auth config ...
    
    extraction_mode = "cdc"
    cdc_type = "change_data_capture"
    
    table_list = [
      {
        table_path = "salesforce.Account"
      },
      {
        table_path = "salesforce.Contact"
      }
    ]
    
    replay_id = -1
  }
}

Technical Considerations

Multi-Object Configuration Standardization

  • Parameter Alignment: Adopt table_list and table_path to align with SeaTunnel's JDBC multi-table standard. object-configs is replaced by table_list.
  • Table Path: The table_path will be used to identify the specific Salesforce object (e.g. Account, Custom_Object__c).

Dependencies

  • Salesforce REST API client or standard HTTP Client
  • OAuth 2.0 library

Contributor guide