Transform Components¶
A transform component defines a data transformation step that reshapes, converts, or maps data between components. Transforms ensure that the output of one component matches the expected input of the next.
Creating a Transform¶
Coming Soon
The Python SDK for local development is not yet publicly available.
from flow_sdk.cli_client import CLIClient
client = CLIClient(config)
transform = client.transforms.create({
"name": "Flatten Nested Response",
"slug": "flatten-nested-response",
"description": "Flattens nested API response into a flat record for downstream processing",
"function_ref": "app.transforms.flatten_response",
"input_schema_ref": nested_schema.id,
"output_schema_ref": flat_schema.id,
"timeout_seconds": 30,
"tags": ["data-processing"],
"examples": [
{
"input": {"user": {"name": "Alice", "email": "alice@example.com"}, "status": "active"},
"output": {"user_name": "Alice", "user_email": "alice@example.com", "status": "active"},
},
],
})
Navigate to Components > Transforms > New. Select function reference, configure input/output schemas. Click Create.
Configuration¶
| Field | Type | Required | Description |
|---|---|---|---|
name |
string | Yes | Display name |
slug |
string | Yes | URL-safe identifier |
function_ref |
string | Yes | Python function path (e.g., app.transforms.flatten_response) |
codeblock_ref |
UUID | No | Reference to a Code Block containing the transform logic |
input_schema_ref |
UUID | No | Reference to a Schema defining expected input |
output_schema_ref |
UUID | No | Reference to a Schema defining expected output |
timeout_seconds |
int | No | Execution timeout (1-300 seconds) |
requires_context |
bool | No | Whether the transform needs execution context (default: false) |
examples |
list | No | Input/output examples for documentation and testing |
Implementation¶
Transforms are implemented as Python functions. The function receives the input data and returns the transformed output.
Using a Function Reference¶
# In your codebase: app/transforms/flatten.py
def flatten_response(data: dict) -> dict:
"""Flatten nested API response into a flat record."""
result = {}
for key, value in data.items():
if isinstance(value, dict):
for nested_key, nested_value in value.items():
result[f"{key}_{nested_key}"] = nested_value
else:
result[key] = value
return result
Using a Code Block¶
Alternatively, reference a Code Block for transforms that need managed dependencies or Git-backed versioning:
Coming Soon
The Python SDK for local development is not yet publicly available.
# Create the code block with transform logic
transform_code = client.code_blocks.create({
"name": "Currency Converter",
"slug": "currency-converter",
"language": "python",
"code": """
import decimal
EXCHANGE_RATES = {
"USD_EUR": decimal.Decimal("0.92"),
"USD_GBP": decimal.Decimal("0.79"),
"EUR_USD": decimal.Decimal("1.09"),
}
def main(amount: float, from_currency: str, to_currency: str) -> dict:
key = f"{from_currency}_{to_currency}"
rate = EXCHANGE_RATES.get(key)
if not rate:
return {"error": f"Unsupported conversion: {key}"}
converted = float(decimal.Decimal(str(amount)) * rate)
return {"amount": converted, "currency": to_currency, "rate": float(rate)}
""",
"entry_point": "main",
})
# Reference it from the transform
transform = client.transforms.create({
"name": "Currency Converter",
"slug": "currency-converter-transform",
"function_ref": "main",
"codeblock_ref": transform_code.id,
"input_schema_ref": currency_input_schema.id,
"output_schema_ref": currency_output_schema.id,
})
Input and Output Mapping¶
Transforms sit between components and reshape data to match expected interfaces. The input_schema_ref and output_schema_ref fields document the data contract.
graph LR
A["Agent Output<br/>(nested JSON)"]
T["Transform<br/><i>flatten-response</i>"]
W["Workflow Node<br/>(expects flat records)"]
A -->|"input_schema"| T
T -->|"output_schema"| W
Transform Chains¶
Multiple transforms can be chained together in a workflow to progressively reshape data:
Coming Soon
The Python SDK for local development is not yet publicly available.
workflow = client.workflows.create({
"name": "Data Enrichment Pipeline",
"slug": "data-enrichment",
"trigger_type": "manual",
"nodes": [
{"id": "extract", "type": "codeblock",
"component_ref": extract_block.id, "label": "Extract Raw Data"},
{"id": "normalize", "type": "transform",
"component_ref": normalize_transform.id, "label": "Normalize Fields"},
{"id": "enrich", "type": "transform",
"component_ref": enrich_transform.id, "label": "Add Derived Fields"},
{"id": "validate", "type": "validation",
"component_ref": quality_check.id, "label": "Quality Check"},
{"id": "load", "type": "codeblock",
"component_ref": load_block.id, "label": "Load to Warehouse"},
],
"edges": [
{"source": "extract", "target": "normalize"},
{"source": "normalize", "target": "enrich"},
{"source": "enrich", "target": "validate"},
{"source": "validate", "target": "load"},
],
})
Context-Aware Transforms¶
Set requires_context=True when the transform needs access to execution context such as tenant information, user identity, or workspace configuration:
Coming Soon
The Python SDK for local development is not yet publicly available.
transform = client.transforms.create({
"name": "Tenant-Scoped Enrichment",
"slug": "tenant-enrichment",
"function_ref": "app.transforms.enrich_with_tenant_data",
"requires_context": True,
})
The function receives the execution context as an additional parameter:
def enrich_with_tenant_data(data: dict, context: dict) -> dict:
"""Add tenant-specific metadata to the record."""
data["org_id"] = context["org_id"]
data["workspace_id"] = context["workspace_id"]
data["processed_by"] = context["user_id"]
return data
Examples¶
Attach input/output examples for documentation and validation:
Coming Soon
The Python SDK for local development is not yet publicly available.
transform = client.transforms.create({
"name": "Extract Email Domain",
"slug": "extract-email-domain",
"function_ref": "app.transforms.extract_domain",
"examples": [
{
"input": {"email": "alice@example.com"},
"output": {"email": "alice@example.com", "domain": "example.com"},
},
{
"input": {"email": "bob@company.co.uk"},
"output": {"email": "bob@company.co.uk", "domain": "company.co.uk"},
},
],
})