Skip to content

SDK Examples

Practical recipes and patterns for common tasks with the Flow SDK.


Authentication and Setup

Coming Soon

CLI authentication and CLIClient will be publicly available in an upcoming release. Inside platform Code Blocks and Hosted Services, authentication is automatic.

Authenticate from the CLI

flow-sdk login --api-key fl_your_api_key --url https://api.flow.marut.cloud

Load Credentials in a Script

from flow_sdk.config import load_config
from flow_sdk.cli_client import CLIClient

config = load_config()  # reads ~/.flow/config.json
client = CLIClient(config)

# Verify the credentials are still valid
client.verify_credentials()

Calling a Hosted Service from a Script

Coming Soon

CLIClient for invoking services from external scripts will be publicly available in an upcoming release. Inside platform runtimes, use flow.services via FlowSDK.

The most common use case for CLIClient -- invoke a deployed service endpoint from outside the platform.

from flow_sdk.config import load_config
from flow_sdk.cli_client import CLIClient, CLIClientError

config = load_config()
client = CLIClient(config)

try:
    result = client.invoke_service(
        service_id="<service-uuid>",
        method="POST",
        path="/analyze",
        body={
            "text": "Quarterly revenue increased by 15% year-over-year.",
            "analysis_type": "sentiment",
        },
    )
    print(f"Sentiment: {result['sentiment']}")
    print(f"Confidence: {result['confidence']}")
except CLIClientError as e:
    print(f"Service call failed: {e}")

Streaming Chat Completions

Standalone Script with GatewayClient

Coming Soon

Standalone GatewayClient usage will be publicly available in an upcoming release. Inside platform runtimes, use flow.gateway via FlowSDK.

import asyncio
from flow_sdk import GatewayClient

async def main():
    async with GatewayClient(
        base_url="https://api.flow.marut.cloud",
        auth_token="your-jwt-token",
    ) as gateway:
        print("Assistant: ", end="")
        async for chunk in gateway.stream_chat_completion(
            model="openai/gpt-4o",
            messages=[
                {"role": "system", "content": "You are a concise assistant."},
                {"role": "user", "content": "What are the benefits of RAG?"},
            ],
            temperature=0.5,
            max_tokens=300,
        ):
            content = chunk["choices"][0]["delta"].get("content", "")
            print(content, end="", flush=True)
        print()

asyncio.run(main())

Inside a Hosted Service with FlowSDK

from flow_sdk import FlowSDK

flow = FlowSDK()

async def stream_response(user_message: str):
    """Stream a response from the AI Gateway."""
    collected = []
    async for chunk in await flow.gateway.stream_chat_completion(
        model="anthropic/claude-sonnet-4-20250514",
        messages=[{"role": "user", "content": user_message}],
    ):
        token = chunk["choices"][0]["delta"].get("content", "")
        collected.append(token)
        yield token  # forward to caller

    full_response = "".join(collected)
    return full_response

Building an Agent with FlowSDK

A hosted service that acts as a simple RAG agent -- queries a dataset for context, then generates an answer.

from flow_sdk import FlowSDK

flow = FlowSDK()

async def answer_question(question: str) -> dict:
    # Step 1: Generate an embedding for the question
    embed_result = await flow.gateway.embedding(
        model="openai/text-embedding-3-small",
        input=question,
    )
    query_vector = embed_result["data"][0]["embedding"]

    # Step 2: Search the knowledge base
    search_results = await flow.datasets.vector_query(
        dataset_id="knowledge-base",
        embedding=query_vector,
        vector_field="content_embedding",
        top_k=5,
        fields=["title", "content"],
    )

    # Step 3: Build context from search results
    context_parts = []
    for doc in search_results.get("results", []):
        context_parts.append(f"## {doc['title']}\n{doc['content']}")
    context = "\n\n".join(context_parts)

    # Step 4: Generate an answer using the context
    response = await flow.gateway.chat_completion(
        model="openai/gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Answer the user's question using the provided context. "
                    "If the context does not contain relevant information, say so."
                ),
            },
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"},
        ],
        temperature=0.3,
        max_tokens=1000,
    )

    answer = response["choices"][0]["message"]["content"]

    # Step 5: Log the interaction for audit
    await flow.audit.log(
        action="question.answered",
        resource_type="knowledge-base",
        resource_id="knowledge-base",
        details={
            "question": question,
            "sources": len(search_results.get("results", [])),
        },
    )

    return {
        "answer": answer,
        "sources": [doc["title"] for doc in search_results.get("results", [])],
        "tokens_used": response["usage"]["total_tokens"],
    }

Working with Datasets

Query with Filters

from flow_sdk import FlowSDK

flow = FlowSDK()

# Structured query with filters
active_users = await flow.datasets.query(
    dataset_id="users",
    request={
        "fields": ["id", "name", "email", "plan", "last_active"],
        "filters": [
            {"field": "plan", "op": "eq", "value": "enterprise"},
            {"field": "last_active", "op": "gte", "value": "2025-01-01"},
        ],
        "limit": 100,
        "offset": 0,
    },
)

for user in active_users.get("results", []):
    print(f"{user['name']} ({user['email']}) - {user['plan']}")
# Get embedding for the search query
embed = await flow.gateway.embedding(
    model="openai/text-embedding-3-small",
    input="machine learning best practices",
)
query_vector = embed["data"][0]["embedding"]

# Vector search with metadata filters
results = await flow.datasets.vector_query(
    dataset_id="articles",
    embedding=query_vector,
    vector_field="content_embedding",
    top_k=10,
    distance_metric="cosine",
    fields=["title", "author", "content"],
    filters=[{"field": "category", "op": "eq", "value": "engineering"}],
)

Orchestrating Code Blocks

Run a Code Block and Extract Output

from flow_sdk import FlowSDK

flow = FlowSDK()

# Run and get just the output (unwrapped from the execution envelope)
output = await flow.code_blocks.run(
    code_block_id="data-processor-uuid",
    input_parameters={
        "source_url": "https://example.com/data.csv",
        "transform": "normalize",
    },
    result_mode="output",
)
print(f"Processed {output['row_count']} rows")

Chain Multiple Code Blocks

# Step 1: Fetch data
fetch_result = await flow.code_blocks.run(
    code_block_id="data-fetcher-uuid",
    input_parameters={"url": "https://api.example.com/data"},
    result_mode="output",
)

# Step 2: Transform data
transform_result = await flow.code_blocks.run(
    code_block_id="data-transformer-uuid",
    input_parameters={"data": fetch_result},
    result_mode="output",
)

# Step 3: Load into dataset
load_result = await flow.code_blocks.run(
    code_block_id="data-loader-uuid",
    input_parameters={
        "data": transform_result,
        "dataset_id": "processed-data",
    },
    result_mode="output",
)
print(f"Loaded {load_result['records_written']} records")

Persistent State

Track state across invocations of a hosted service.

from flow_sdk import FlowSDK

flow = FlowSDK()

async def incremental_sync():
    """Sync only new records since the last run."""
    # Retrieve the last sync cursor
    last_cursor = await flow.state.get("sync_cursor")

    # Fetch new data from the connector
    new_records = await flow.connectors.execute_operation(
        connector_instance_id="salesforce-instance-uuid",
        operation="read_query",
        params={
            "query": f"SELECT * FROM Contact WHERE LastModifiedDate > '{last_cursor}'",
        },
    )

    if new_records.get("rows"):
        # Process new records...
        process_records(new_records["rows"])

        # Update the cursor to the latest timestamp
        latest = max(r["LastModifiedDate"] for r in new_records["rows"])
        await flow.state.set("sync_cursor", latest)
        print(f"Synced {len(new_records['rows'])} records, cursor now: {latest}")
    else:
        print("No new records to sync.")

Service-to-Service Calls

Call another hosted service and pass through authentication.

from flow_sdk import FlowSDK

flow = FlowSDK()

async def enrich_and_store(document: dict) -> dict:
    # Call the enrichment service (reuse caller's token for auth chain)
    enriched = await flow.services.post(
        "/enrich",
        body={"text": document["content"]},
        service_id="enrichment-service-uuid",
        reuse_service_token=True,
    )

    # Call the storage service
    stored = await flow.services.post(
        "/documents",
        body={
            "title": document["title"],
            "content": document["content"],
            "entities": enriched["entities"],
            "summary": enriched["summary"],
        },
        service_id="storage-service-uuid",
        reuse_service_token=True,
    )

    return {"document_id": stored["id"], "entities_found": len(enriched["entities"])}

Error Handling Patterns

Retry with Backoff

import asyncio
from flow_sdk import FlowSDK
from flow_sdk.gateway_client import GatewayRateLimitError, GatewayProviderError

flow = FlowSDK()

async def chat_with_retry(messages: list, max_attempts: int = 3) -> dict:
    """Call the gateway with exponential backoff on transient errors."""
    for attempt in range(max_attempts):
        try:
            return await flow.gateway.chat_completion(
                model="openai/gpt-4o",
                messages=messages,
            )
        except GatewayRateLimitError:
            if attempt == max_attempts - 1:
                raise
            wait = 2 ** attempt  # 1s, 2s, 4s
            print(f"Rate limited, retrying in {wait}s...")
            await asyncio.sleep(wait)
        except GatewayProviderError:
            if attempt == max_attempts - 1:
                raise
            wait = 2 ** attempt
            print(f"Provider error, retrying in {wait}s...")
            await asyncio.sleep(wait)

Graceful Fallback

from flow_sdk.gateway_client import GatewayModelNotFoundError

async def chat_with_fallback(messages: list) -> dict:
    """Try the primary model, fall back to a secondary model."""
    try:
        return await flow.gateway.chat_completion(
            model="anthropic/claude-sonnet-4-20250514",
            messages=messages,
        )
    except GatewayModelNotFoundError:
        print("Primary model unavailable, falling back...")
        return await flow.gateway.chat_completion(
            model="openai/gpt-4o-mini",
            messages=messages,
        )

Comprehensive Error Handling

from flow_sdk import FlowSDK, FlowSDKError
from flow_sdk.platform_client import (
    PlatformAuthenticationError,
    PlatformPermissionError,
    PlatformNotFoundError,
    PlatformExecutionError,
)

flow = FlowSDK()

async def safe_execute(code_block_id: str, params: dict) -> dict | None:
    """Execute a code block with comprehensive error handling."""
    try:
        return await flow.code_blocks.run(
            code_block_id=code_block_id,
            input_parameters=params,
            result_mode="output",
        )
    except PlatformAuthenticationError:
        await flow.audit.log(
            action="execution.auth_failure",
            resource_type="code_block",
            resource_id=code_block_id,
            details={"error": "authentication_failed"},
        )
        return None
    except PlatformNotFoundError:
        print(f"Code block {code_block_id} not found")
        return None
    except PlatformPermissionError:
        print(f"No permission to execute {code_block_id}")
        return None
    except PlatformExecutionError as e:
        print(f"Execution failed: {e}")
        return None
    except TimeoutError:
        print(f"Execution of {code_block_id} timed out")
        return None
    except FlowSDKError as e:
        print(f"Unexpected SDK error: {e}")
        return None

Scaffold, Develop, Deploy Workflow

Coming Soon

The flow-sdk CLI for local scaffolding, testing, and deployment will be publicly available in an upcoming release.

End-to-end workflow for creating and deploying a new component.

flow-sdk scaffold my-processor --template basic
cd my-processor
# my-processor/component.py
from flow_sdk import BaseComponent, ComponentInput, ComponentOutput

class MyProcessor(BaseComponent):
    async def execute(self, input: ComponentInput) -> ComponentOutput:
        data = input.parameters.get("data", "")
        processed = data.upper()  # your logic here
        return ComponentOutput(output={"result": processed})
# Validate structure
flow-sdk validate ./my-processor

# Run with test input
echo '{"data": "hello world"}' > input.json
flow-sdk test ./my-processor --input input.json
# Deploy to dev ring
flow-sdk deploy ./my-processor --ring dev

# Promote to staging when ready
flow-sdk deploy ./my-processor --ring staging
# Publish to the component catalog
flow-sdk publish ./my-processor