more streams and persist records to external systems.
The connectivity between plugins is managed through two critical configuration keys: plugin_output and plugin_input. These function as logical ports in the DAG.
plugin_output: Stream Identification
When a source or transform emits data, it must assign a unique identifier to the resulting stream. This identifier is the plugin_output. It decouples the data generator from the consumer, allowing multiple downstream plugins to reference the same stream.
plugin_input: Stream Subscription
Sinks and transforms declare which streams they consume via plugin_input. This field accepts a single stream ID or a list of IDs. When a sink subscribes to multiple streams, SeaTunnel merges them at the stream level.
Architecture Decision: Fan-In Merging
When a sink receives multiple input streams, SeaTunnel performs a stream merge, which is an append operation. Records from Stream A and Stream B are interleaved or batched sequentially based on the execution engine's scheduling. This is not a relational join. The engine does not match records by key; it simply appends records from all subscribed streams into the sink's write buffer.
Prerequisite: Schema Alignment
For a merge to succeed, all input streams must be schema-compatible. SeaTunnel validates this at runtime. The streams must have:
- The same number of fields.
- Compatible data types for corresponding fields.
- Aligned field names, or a mapping strategy defined via Transform.
If schemas diverge, the pipeline fails with a runtime exception. This enforces strict data contracts in the DAG.
New Code Example: Unified Event Aggregation
The following example demonstrates a fan-in topology where application logs from Kafka and audit trails from MySQL are merged into a ClickHouse analytics table. Note the use of explicit stream naming and schema alignment via Transform.
env {
execution.parallelism = 4
job.mode = "STREAMING"
}
source {
KafkaSource {
bootstrap_servers = "kafka-broker:9092"
topic = "app-events"
plugin_output = "kafka_events_stream"
schema {
fields {
event_id = "string"
user_id = "string"
payload = "string"
ts = "timestamp"
}
}
}
JdbcSource {
url = "jdbc:mysql://db-host:3306/audit"
table = "audit_logs"
plugin_output = "mysql_audit_stream"
schema {
fields {
event_id = "string"
user_id = "string"
payload = "string"
ts = "timestamp"
}
}
}
}
transform {
Sql {
plugin_input = ["kafka_events_stream", "mysql_audit_stream"]
plugin_output = "unified_stream"
query = """
SELECT
event_id,
user_id,
payload,
ts
FROM
unified_stream
"""
}
}
sink {
ClickHouseSink {
host = "clickhouse:8123"
database = "analytics"
table = "unified_events"
plugin_input = ["unified_stream"]
schema {
fields {
event_id = "string"
user_id = "string"
payload = "string"
ts = "timestamp"
}
}
}
}
Rationale for Design Choices
- Explicit Stream Naming: Using descriptive IDs like
kafka_events_stream improves pipeline observability and allows platform tools to visualize the DAG accurately.
- Transform for Alignment: Even if schemas appear identical, introducing a
Sql transform with a unified plugin_output ensures that field order and types are strictly enforced before the sink. This acts as a schema gatekeeper.
- List-Based Input: The sink uses
plugin_input = ["unified_stream"]. While a single stream can be passed as a string, using array syntax consistently prepares the configuration for future expansion to multiple inputs without syntax changes.
- Stream Mode: Setting
job.mode = "STREAMING" leverages SeaTunnel's continuous processing capabilities, essential for real-time aggregation topologies.
Pitfall Guide
1. Schema Drift During Merge
- Explanation: Engineers assume that if two sources have the same field names, they can merge. However, if one source defines
user_id as string and another as int, the merge fails at runtime.
- Fix: Use Transform plugins to cast and normalize types across all streams before merging. Define schemas explicitly in sources to catch mismatches early.
2. Expecting Join Semantics in Multi-Input Sinks
- Explanation: A common mistake is configuring a sink with two inputs expecting records to be matched on a key. SeaTunnel performs an append merge, resulting in duplicated or interleaved data rather than joined records.
- Fix: If relational logic is required, implement it in a Transform plugin using SQL
JOIN operations, or perform the join upstream in the source systems. Never rely on sink-level merging for joins.
3. Omitting plugin_output in Complex Graphs
- Explanation: In simple 1:1 pipelines,
plugin_output may be optional. In DAGs with fan-out or fan-in, omitting this field causes the engine to generate implicit IDs, making the graph opaque and difficult to debug or manage programmatically.
- Fix: Always define
plugin_output explicitly. For platform builders, enforce auto-generation of unique IDs if the user omits the field to maintain graph integrity.
4. Ignoring Transform's Role in Schema Governance
- Explanation: Teams often try to handle schema alignment in the sink configuration or assume sources will adapt automatically. This leads to brittle pipelines that break on source schema changes.
- Fix: Treat Transform as the schema enforcement layer. Use it to project, rename, and cast fields, ensuring that sinks receive a stable contract regardless of source variations.
5. Linear Configuration Thinking
- Explanation: Writing configurations that read like sequential scripts rather than graph definitions. This limits the ability to reuse streams or create efficient topologies.
- Fix: Design the pipeline as a graph first. Identify stream IDs, map connections, and then write the configuration. Visualize the DAG to ensure all paths are valid and acyclic.
6. Misunderstanding Sink Subscription Limits
- Explanation: Assuming a sink can only consume one stream because the DSL often shows a single input. This prevents leveraging fan-in capabilities.
- Fix: Review sink documentation for multi-input support. Configure
plugin_input as a list to subscribe to multiple streams. Verify that the sink connector supports concurrent stream consumption.
7. Runtime Schema Validation Surprises
- Explanation: Developers accustomed to SQL's compile-time checks are surprised when schema errors occur during job execution.
- Fix: Implement pre-flight schema validation in CI/CD pipelines. Use SeaTunnel's schema discovery features to verify compatibility before deployment. Monitor runtime logs for schema mismatch exceptions.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Merge multiple sources to one sink | Fan-in DAG with schema-aligned streams | Reduces sink connections and simplifies write path | Low infrastructure cost; higher CPU for merge |
| Fan-out single source to multiple sinks | Fan-out DAG with shared plugin_output | Decouples consumers; allows different formats | Minimal overhead; scales with sink count |
| Require relational join logic | Transform plugin with SQL JOIN | SeaTunnel merge is append-only; join requires key matching | Higher compute cost in transform phase |
| Schema mismatch between sources | Transform plugin for casting/mapping | Ensures runtime compatibility; prevents pipeline failure | Moderate transform overhead |
| Real-time aggregation needs | Streaming mode with parallel sinks | Low latency; continuous processing | Higher resource usage; requires cluster tuning |
Configuration Template
This template provides a robust starting point for a fan-in pipeline with schema governance. It includes environment settings, explicit stream naming, transform-based alignment, and a multi-input sink.
env {
execution.parallelism = 8
job.mode = "STREAMING"
checkpoint.interval = 10000
}
source {
KafkaSource {
bootstrap_servers = "kafka-cluster:9092"
topic = "user_activity"
plugin_output = "kafka_activity_stream"
schema {
fields {
user_id = "string"
action = "string"
timestamp = "timestamp"
metadata = "string"
}
}
}
JdbcSource {
url = "jdbc:postgresql://postgres:5432/events"
table = "system_events"
plugin_output = "pg_events_stream"
schema {
fields {
user_id = "string"
action = "string"
timestamp = "timestamp"
metadata = "string"
}
}
}
}
transform {
Sql {
plugin_input = ["kafka_activity_stream", "pg_events_stream"]
plugin_output = "normalized_event_stream"
query = """
SELECT
user_id,
action,
timestamp,
metadata
FROM
normalized_event_stream
WHERE
user_id IS NOT NULL
"""
}
}
sink {
ElasticsearchSink {
hosts = ["es-node:9200"]
index = "events-index"
plugin_input = ["normalized_event_stream"]
schema {
fields {
user_id = "string"
action = "string"
timestamp = "timestamp"
metadata = "string"
}
}
}
}
Quick Start Guide
- Install SeaTunnel: Deploy the SeaTunnel engine and connectors for your sources and sinks. Ensure the cluster is configured for streaming execution.
- Define Sources with Outputs: Create source configurations for each data origin. Assign unique
plugin_output IDs to each source to generate named streams.
- Configure Sink Subscriptions: Set up sink configurations with
plugin_input listing the stream IDs to consume. Verify schema compatibility between streams.
- Add Transform if Needed: If schemas require alignment, insert a Transform plugin between sources and sinks to normalize fields and enforce contracts.
- Execute and Validate: Run the pipeline in a test environment. Monitor logs for schema validation errors and verify that data from all sources appears in the sink. Adjust parallelism and checkpoint settings based on throughput requirements.