SDK
frank-sdk is the Python contract for custom transform authors. It gives Python-runner patterns a stable way to read runtime config, query data, emit metrics and lineage, and return structured results to the platform.
Install
From the shared utilities package:
cd utils
# Core SDK
pip install -e ".[sdk]"
# With Trino support
pip install -e ".[sdk-trino]"
# With pandas and pyarrow
pip install -e ".[sdk-pandas]"
# Full local data stack
pip install -e ".[sdk-full]"In packaged environments:
pip install "frank-shared[sdk-full]"Runtime contract
Python-runner transforms receive configuration through TRANSFORM_CONFIG.
{
"artifact_id": "uuid",
"run_id": "uuid",
"tenant_id": "uuid",
"source_table": "iceberg.bronze.orders",
"source_tables": ["iceberg.bronze.orders"],
"target_table": "iceberg.silver.orders_clean",
"params": {
"filter_expression": "status != 'deleted'"
},
"cursors": {
"input_cursors": {
"iceberg.bronze.orders": {
"mode": "delta",
"cursor_field": "_extracted_at",
"cursor_value": "2026-02-01T00:00:00Z"
}
},
"cutoff_cursors": {
"iceberg.bronze.orders": {
"cursor_value": "2026-02-04T12:00:00Z"
}
}
}
}The transform writes one final FrankResult JSON object to stdout. Logs, metrics, progress, and lineage should go through SDK emitters.
Minimal transform
from frank_sdk import FrankContext, FrankResult
def main():
ctx = FrankContext.from_env()
result = FrankResult.success(
output_row_count=0,
metrics={
"mode": ctx.get_param("mode", "default"),
"target_table": ctx.target_table,
},
)
result.write_to_stdout()
if __name__ == "__main__":
main()Querying with Trino
from frank_sdk import FrankContext, FrankResult, emit_metric, emit_lineage
def main():
ctx = FrankContext.from_env()
conn = ctx.get_trino_connection()
cur = conn.cursor()
where = ctx.build_incremental_filter(ctx.source_table)
cur.execute(f"""
CREATE TABLE {ctx.target_table} AS
SELECT *
FROM {ctx.source_table}
WHERE {where}
""")
cur.execute(f"SELECT COUNT(*) FROM {ctx.target_table}")
count = cur.fetchone()[0]
emit_metric("rows_processed", count)
emit_lineage(
source=ctx.source_table,
target=ctx.target_table,
operation="copy",
)
FrankResult.success(output_row_count=count).write_to_stdout()
if __name__ == "__main__":
main()Connection environment variables:
| Variable | Default |
|---|---|
TRINO_HOST | localhost |
TRINO_PORT | 8080 |
TRINO_USER | frank-transform |
TRINO_CATALOG | iceberg |
TRINO_SCHEMA | unset |
TRINO_PASSWORD | unset |
FrankContext
Important properties:
| Property | Meaning |
|---|---|
artifact_id | Transform artifact UUID. |
run_id | Transform run UUID. |
tenant_id | Tenant UUID. |
source_table | Primary input table. |
source_tables | All input tables. |
target_table | Output table. |
params | Pattern or transform parameters. |
cursors | Input and cutoff cursor state. |
raw_config | Full runtime config. |
Useful methods:
ctx.get_param("key", default=None)
ctx.require_param("key")
ctx.get_trino_connection()
ctx.get_input_cursor(table)
ctx.get_cutoff_cursor(table)
ctx.build_incremental_filter(table)FrankResult
Use factory helpers:
FrankResult.success(output_row_count=100, input_row_count=120)
FrankResult.failure("Invalid source data", {"column": "id"})
FrankResult.partial(output_row_count=80, error_message="20 rows skipped")Add metrics and data quality:
from frank_sdk import FrankResult, DataQualityResult
result = FrankResult.success(output_row_count=100)
result.add_metric("duplicates_removed", 5)
result.add_data_quality_result(
DataQualityResult(
rule_name="non_null_id",
passed=True,
total_rows=100,
failed_rows=0,
)
)
result.write_to_stdout()Output fields:
| Field | Meaning |
|---|---|
status | success, failure, or partial. |
output_row_count | Rows written. |
input_row_count | Rows read. |
rows_affected | Rows changed by update/delete style operations. |
metrics | Custom JSON-serializable metrics. |
data_quality | Data quality check results. |
error_message | Human-readable error. |
error_details | Structured error context. |
output_snapshot_id | Iceberg snapshot ID. |
Structured logging, metrics, and lineage
from frank_sdk import emit_log, emit_warning, emit_metric, emit_progress, emit_lineage
emit_log("Starting transform", context={"table": "orders"})
emit_metric("rows_processed", 1000, tags={"stage": "clean"})
emit_progress(current=5, total=10, message="Processed batch 5")
emit_warning("Skipped rows with invalid dates", {"count": 3})
emit_lineage(
source=["iceberg.bronze.orders", "iceberg.bronze.customers"],
target="iceberg.gold.customer_orders",
operation="join",
columns={"customer_id": ["orders.customer_id", "customers.id"]},
)Emitters write structured records to stderr for platform capture while keeping stdout reserved for the final FrankResult.
Local testing
Pair the SDK with the Python pattern CLI:
frank init my-pattern --template python
cd patterns/my-pattern
frank validate .
frank test . --config tests/sample_config.json --env-file .env.testUse test fixtures in tests/sample_config.json to create Trino tables, run assertions, and clean up after the transform.