Skip to content

Transform Components

A transform component defines a data transformation step that reshapes, converts, or maps data between components. Transforms ensure that the output of one component matches the expected input of the next.


Creating a Transform

Coming Soon

The Python SDK for local development is not yet publicly available.

from flow_sdk.cli_client import CLIClient

client = CLIClient(config)

transform = client.transforms.create({
    "name": "Flatten Nested Response",
    "slug": "flatten-nested-response",
    "description": "Flattens nested API response into a flat record for downstream processing",
    "function_ref": "app.transforms.flatten_response",
    "input_schema_ref": nested_schema.id,
    "output_schema_ref": flat_schema.id,
    "timeout_seconds": 30,
    "tags": ["data-processing"],
    "examples": [
        {
            "input": {"user": {"name": "Alice", "email": "alice@example.com"}, "status": "active"},
            "output": {"user_name": "Alice", "user_email": "alice@example.com", "status": "active"},
        },
    ],
})

Navigate to Components > Transforms > New. Select function reference, configure input/output schemas. Click Create.


Configuration

Field Type Required Description
name string Yes Display name
slug string Yes URL-safe identifier
function_ref string Yes Python function path (e.g., app.transforms.flatten_response)
codeblock_ref UUID No Reference to a Code Block containing the transform logic
input_schema_ref UUID No Reference to a Schema defining expected input
output_schema_ref UUID No Reference to a Schema defining expected output
timeout_seconds int No Execution timeout (1-300 seconds)
requires_context bool No Whether the transform needs execution context (default: false)
examples list No Input/output examples for documentation and testing

Implementation

Transforms are implemented as Python functions. The function receives the input data and returns the transformed output.

Using a Function Reference

# In your codebase: app/transforms/flatten.py

def flatten_response(data: dict) -> dict:
    """Flatten nested API response into a flat record."""
    result = {}
    for key, value in data.items():
        if isinstance(value, dict):
            for nested_key, nested_value in value.items():
                result[f"{key}_{nested_key}"] = nested_value
        else:
            result[key] = value
    return result

Using a Code Block

Alternatively, reference a Code Block for transforms that need managed dependencies or Git-backed versioning:

Coming Soon

The Python SDK for local development is not yet publicly available.

# Create the code block with transform logic
transform_code = client.code_blocks.create({
    "name": "Currency Converter",
    "slug": "currency-converter",
    "language": "python",
    "code": """
import decimal

EXCHANGE_RATES = {
    "USD_EUR": decimal.Decimal("0.92"),
    "USD_GBP": decimal.Decimal("0.79"),
    "EUR_USD": decimal.Decimal("1.09"),
}

def main(amount: float, from_currency: str, to_currency: str) -> dict:
    key = f"{from_currency}_{to_currency}"
    rate = EXCHANGE_RATES.get(key)
    if not rate:
        return {"error": f"Unsupported conversion: {key}"}
    converted = float(decimal.Decimal(str(amount)) * rate)
    return {"amount": converted, "currency": to_currency, "rate": float(rate)}
""",
    "entry_point": "main",
})

# Reference it from the transform
transform = client.transforms.create({
    "name": "Currency Converter",
    "slug": "currency-converter-transform",
    "function_ref": "main",
    "codeblock_ref": transform_code.id,
    "input_schema_ref": currency_input_schema.id,
    "output_schema_ref": currency_output_schema.id,
})

Input and Output Mapping

Transforms sit between components and reshape data to match expected interfaces. The input_schema_ref and output_schema_ref fields document the data contract.

graph LR
    A["Agent Output<br/>(nested JSON)"]
    T["Transform<br/><i>flatten-response</i>"]
    W["Workflow Node<br/>(expects flat records)"]

    A -->|"input_schema"| T
    T -->|"output_schema"| W

Transform Chains

Multiple transforms can be chained together in a workflow to progressively reshape data:

Coming Soon

The Python SDK for local development is not yet publicly available.

workflow = client.workflows.create({
    "name": "Data Enrichment Pipeline",
    "slug": "data-enrichment",
    "trigger_type": "manual",
    "nodes": [
        {"id": "extract", "type": "codeblock",
         "component_ref": extract_block.id, "label": "Extract Raw Data"},
        {"id": "normalize", "type": "transform",
         "component_ref": normalize_transform.id, "label": "Normalize Fields"},
        {"id": "enrich", "type": "transform",
         "component_ref": enrich_transform.id, "label": "Add Derived Fields"},
        {"id": "validate", "type": "validation",
         "component_ref": quality_check.id, "label": "Quality Check"},
        {"id": "load", "type": "codeblock",
         "component_ref": load_block.id, "label": "Load to Warehouse"},
    ],
    "edges": [
        {"source": "extract", "target": "normalize"},
        {"source": "normalize", "target": "enrich"},
        {"source": "enrich", "target": "validate"},
        {"source": "validate", "target": "load"},
    ],
})

Context-Aware Transforms

Set requires_context=True when the transform needs access to execution context such as tenant information, user identity, or workspace configuration:

Coming Soon

The Python SDK for local development is not yet publicly available.

transform = client.transforms.create({
    "name": "Tenant-Scoped Enrichment",
    "slug": "tenant-enrichment",
    "function_ref": "app.transforms.enrich_with_tenant_data",
    "requires_context": True,
})

The function receives the execution context as an additional parameter:

def enrich_with_tenant_data(data: dict, context: dict) -> dict:
    """Add tenant-specific metadata to the record."""
    data["org_id"] = context["org_id"]
    data["workspace_id"] = context["workspace_id"]
    data["processed_by"] = context["user_id"]
    return data

Examples

Attach input/output examples for documentation and validation:

Coming Soon

The Python SDK for local development is not yet publicly available.

transform = client.transforms.create({
    "name": "Extract Email Domain",
    "slug": "extract-email-domain",
    "function_ref": "app.transforms.extract_domain",
    "examples": [
        {
            "input": {"email": "alice@example.com"},
            "output": {"email": "alice@example.com", "domain": "example.com"},
        },
        {
            "input": {"email": "bob@company.co.uk"},
            "output": {"email": "bob@company.co.uk", "domain": "company.co.uk"},
        },
    ],
})