Pipelines
Pipelines compose transforms into versioned DAGs. They are the product layer for repeatable multi-step data work.
What a pipeline stores
A pipeline owns:
- Metadata: tenant, name, description, created by.
- Status: draft, active, paused, error, archived.
- Source tables and target hints.
- Current version pointer.
- Schedule config.
Each pipeline version stores:
- Immutable DAG spec.
- Content hash.
- Created timestamp and author.
- Optional AI session ID.
- Steps and edges.
Pipeline status
draft -> active -> paused -> active
| | |
| +-> error -+
+-> archived
active -> archived
error -> draft | active | archivedArchived is terminal.
Steps
A step can reference:
- An existing transform.
- A transform pattern and params.
- Custom SQL.
- Custom Python content.
- Generated code that is being published to an external pattern repository.
Important step fields:
| Field | Meaning |
|---|---|
step_name | Stable name inside the version. |
transform_id | Existing transform to execute. |
pattern_id | Pattern to materialize as a transform during activation. |
pattern_params | Params for the pattern. |
step_type | catalog_pattern, field_mapping, custom_sql, or custom_python. |
is_intermediate | Whether this step is an internal stage. |
is_terminal | Whether this step is an output of the DAG. |
error_policy | Step failure policy, including halt, skip, and retry:N. |
config.output_layer | Usually silver for intermediate and gold for curated outputs. |
pattern_status | Tracks external pattern publishing, such as available or building. |
pr_url | Pull request for generated/published custom patterns. |
Edges and validation
Edges are directed dependencies between steps. Frank validates every DAG before sandboxing or activation:
- Every edge endpoint must reference an existing step.
- Self-edges are rejected.
- Cycles are rejected with a cycle path.
- Topological order is computed deterministically.
- Roots, terminal steps, intermediate steps, and fan-in steps are classified.
This lets the UI and CLI catch broken structures before a runtime job starts.
Versioning
Pipeline versions are immutable snapshots. Frank computes a SHA-256 content hash from sorted step content and edge pairs. If the DAG content has not changed, duplicate versions are avoided.
This gives you:
- A clean audit trail.
- Safe roll-forward through new versions.
- Stable run history tied to the actual DAG that ran.
- A clear boundary between editing and activation.
Sandbox validation
Sandbox runs are the pre-activation safety gate:
frankctl pipelines validate <pipeline-id> --sample-limit 1000 --timeout 600The CLI starts:
POST /api/v1/pipelines/{pipeline_id}/sandboxThen polls:
GET /api/v1/pipelines/{pipeline_id}/sandbox/{workflow_id}/statusStep badges stream to stderr and final JSON is emitted to stdout. Completed exits 0; failed or partial failure exits 5.
Activation
Activation turns a draft/version into runnable transforms:
- Validate the DAG.
- Create or link transforms for each step.
- Compute output table names through Frank naming helpers.
- Set the pipeline current version.
- Move pipeline status to active.
- Trigger downstream synchronization with Dagster where needed.
Pipeline step output table names follow the pipeline + step naming convention, then layer into Silver or Gold.
AI composition
The pipeline composer calls Martha workflow frank_compose_pipeline. Input:
- Source tables.
- Target description.
- Optional target schema or SDM ID.
- Pipeline context.
- Pipeline name.
Output can include proposed steps, pattern choices, params, dependencies, reasoning, and confidence. The UI keeps the human in control: AI composes a draft; users review, edit, sandbox, and activate.
CLI:
frankctl ai compose-pipeline -f pipeline-intent.yaml --timeout 600Common pipeline shapes
Staging to mart
raw.orders -> stg_orders -> fct_daily_sales
raw.products -> stg_products -/Customer 360
raw.postgres_customers \
raw.salesforce_contacts -> dim_customer_360
raw.stripe_customers /Geospatial enrichment
raw.events -> geo_parse_wkt -> h3_enrich -> h3_aggregateSemantic publication
raw.source -> stg_clean -> dim_entity -> backing dataset -> ontology syncRelated APIs
POST /api/v1/pipelines
GET /api/v1/pipelines
GET /api/v1/pipelines/{pipeline_id}
PUT /api/v1/pipelines/{pipeline_id}
POST /api/v1/pipelines/validate-dag
POST /api/v1/pipelines/{pipeline_id}/versions
POST /api/v1/pipelines/{pipeline_id}/activate
POST /api/v1/pipelines/{pipeline_id}/pause
POST /api/v1/pipelines/{pipeline_id}/sandbox
GET /api/v1/pipelines/{pipeline_id}/runs