Pipelines are where your business logic lives in Data Estuary. They define how data moves, transforms, and evolves across your system. Understanding pipeline execution is key to building efficient, reliable data workflows.
Step-Based Execution
Pipelines consist of ordered steps that execute sequentially. Each step can query, transform, or create entities. Pipelines can be triggered by events, schedules, or manual invocation.
definePipeline("ProcessOrder", {
triggers: {
entityCreated: "Order" // Run when new Order is created
},
steps: [
{
name: "validate_inventory",
action: "query",
entityType: "InventoryItem",
filter: "sku IN order.items[].sku"
},
{
name: "check_availability",
action: "validate",
logic: "all(inventory.quantityAvailable > order.items.quantity)"
},
{
name: "reserve_inventory",
action: "update",
entityType: "InventoryItem",
updates: "quantityAvailable -= order.items.quantity"
},
{
name: "confirm_order",
action: "update",
entityType: "Order",
updates: { status: "confirmed" }
},
{
name: "create_shipment",
action: "create",
entityType: "Shipment",
data: "fromOrder(order)"
}
],
runOn: ["primary-cluster"],
onError: "rollback" // Transactional semantics
});Action Types
Pipelines can operate on multiple entities and entity types, enabling complex workflows that coordinate across different parts of your system.
Query Actions
Fetch entities based on filters and conditions
Transform Actions
Aggregate, map, filter, and reshape data
Update Actions
Modify existing entities with validation
Create Actions
Generate new entities from pipeline logic
Cluster Affinity
You control where pipelines execute with the runOn parameter. This is crucial for edge processing, data locality, and bandwidth optimization.
Example: A pipeline that aggregates machine logs can run on-premise where the data is generated, then sync only summaries to cloud clusters—saving bandwidth and reducing latency.
runOn: ["on-prem-factory-cluster"]Error Handling
Pipelines support transactional semantics. If a step fails, you can configure automatic rollback to maintain data consistency.
- •Rollback: Automatically undo all changes if any step fails
- •Continue: Log the error but continue execution
- •Retry: Automatically retry failed steps with backoff
Triggers
Pipelines can be triggered in multiple ways:
- Entity Events: Trigger when entities are created, updated, or reach a specific state
- Schedules: Run on a cron schedule (daily, hourly, custom)
- Manual Invocation: Call via API or admin interface
- External Webhooks: Respond to external system events
Best Practices
Keep Steps Focused
Each step should do one thing well. This makes pipelines easier to debug, test, and maintain. It also enables better performance optimization by the platform.
Consider Data Locality
Run pipelines close to where the data lives. If you're processing data that exists primarily in an edge cluster, run the pipeline there rather than pulling data to a central location.
Use Appropriate Error Handling
Critical workflows (like order processing) should use rollback on error. Analytics pipelines might use continue-on-error to avoid blocking on data quality issues.
Real-World Example
A manufacturing company uses pipelines to aggregate sensor data from factory machines. The pipeline runs on-premise every 5 minutes, aggregates logs by machine, calculates statistics, and creates summary entities that automatically replicate to their cloud analytics cluster.
This approach reduced their cloud data transfer costs by 90% while maintaining near-real-time visibility into factory operations.