technicalarchitecturepipelines

Understanding Pipeline Execution

Pipelines are where your business logic lives in Data Estuary. They define how data moves, transforms, and evolves across your system. Understanding pipeline execution is key to building efficient, reliable data workflows.

Step-Based Execution

Pipelines consist of ordered steps that execute sequentially. Each step can query, transform, or create entities. Pipelines can be triggered by events, schedules, or manual invocation.

definePipeline("ProcessOrder", {
  triggers: {
    entityCreated: "Order"  // Run when new Order is created
  },
  steps: [
    {
      name: "validate_inventory",
      action: "query",
      entityType: "InventoryItem",
      filter: "sku IN order.items[].sku"
    },
    {
      name: "check_availability",
      action: "validate",
      logic: "all(inventory.quantityAvailable > order.items.quantity)"
    },
    {
      name: "reserve_inventory",
      action: "update",
      entityType: "InventoryItem",
      updates: "quantityAvailable -= order.items.quantity"
    },
    {
      name: "confirm_order",
      action: "update",
      entityType: "Order",
      updates: { status: "confirmed" }
    },
    {
      name: "create_shipment",
      action: "create",
      entityType: "Shipment",
      data: "fromOrder(order)"
    }
  ],
  runOn: ["primary-cluster"],
  onError: "rollback"  // Transactional semantics
});

Action Types

Pipelines can operate on multiple entities and entity types, enabling complex workflows that coordinate across different parts of your system.

Query Actions

Fetch entities based on filters and conditions

Transform Actions

Aggregate, map, filter, and reshape data

Update Actions

Modify existing entities with validation

Create Actions

Generate new entities from pipeline logic

Cluster Affinity

You control where pipelines execute with the runOn parameter. This is crucial for edge processing, data locality, and bandwidth optimization.

Example: A pipeline that aggregates machine logs can run on-premise where the data is generated, then sync only summaries to cloud clusters—saving bandwidth and reducing latency.

runOn: ["on-prem-factory-cluster"]

Error Handling

Pipelines support transactional semantics. If a step fails, you can configure automatic rollback to maintain data consistency.

  • Rollback: Automatically undo all changes if any step fails
  • Continue: Log the error but continue execution
  • Retry: Automatically retry failed steps with backoff

Triggers

Pipelines can be triggered in multiple ways:

  • Entity Events: Trigger when entities are created, updated, or reach a specific state
  • Schedules: Run on a cron schedule (daily, hourly, custom)
  • Manual Invocation: Call via API or admin interface
  • External Webhooks: Respond to external system events

Best Practices

Keep Steps Focused

Each step should do one thing well. This makes pipelines easier to debug, test, and maintain. It also enables better performance optimization by the platform.

Consider Data Locality

Run pipelines close to where the data lives. If you're processing data that exists primarily in an edge cluster, run the pipeline there rather than pulling data to a central location.

Use Appropriate Error Handling

Critical workflows (like order processing) should use rollback on error. Analytics pipelines might use continue-on-error to avoid blocking on data quality issues.

Real-World Example

A manufacturing company uses pipelines to aggregate sensor data from factory machines. The pipeline runs on-premise every 5 minutes, aggregates logs by machine, calculates statistics, and creates summary entities that automatically replicate to their cloud analytics cluster.

This approach reduced their cloud data transfer costs by 90% while maintaining near-real-time visibility into factory operations.