SDK Examples¶
Practical recipes and patterns for common tasks with the Flow SDK.
Authentication and Setup¶
Coming Soon
CLI authentication and CLIClient will be publicly available in an upcoming release. Inside platform Code Blocks and Hosted Services, authentication is automatic.
Authenticate from the CLI¶
Load Credentials in a Script¶
from flow_sdk.config import load_config
from flow_sdk.cli_client import CLIClient
config = load_config() # reads ~/.flow/config.json
client = CLIClient(config)
# Verify the credentials are still valid
client.verify_credentials()
Calling a Hosted Service from a Script¶
Coming Soon
CLIClient for invoking services from external scripts will be publicly available in an upcoming release. Inside platform runtimes, use flow.services via FlowSDK.
The most common use case for CLIClient -- invoke a deployed service endpoint from outside the platform.
from flow_sdk.config import load_config
from flow_sdk.cli_client import CLIClient, CLIClientError
config = load_config()
client = CLIClient(config)
try:
result = client.invoke_service(
service_id="<service-uuid>",
method="POST",
path="/analyze",
body={
"text": "Quarterly revenue increased by 15% year-over-year.",
"analysis_type": "sentiment",
},
)
print(f"Sentiment: {result['sentiment']}")
print(f"Confidence: {result['confidence']}")
except CLIClientError as e:
print(f"Service call failed: {e}")
Streaming Chat Completions¶
Standalone Script with GatewayClient¶
Coming Soon
Standalone GatewayClient usage will be publicly available in an upcoming release. Inside platform runtimes, use flow.gateway via FlowSDK.
import asyncio
from flow_sdk import GatewayClient
async def main():
async with GatewayClient(
base_url="https://api.flow.marut.cloud",
auth_token="your-jwt-token",
) as gateway:
print("Assistant: ", end="")
async for chunk in gateway.stream_chat_completion(
model="openai/gpt-4o",
messages=[
{"role": "system", "content": "You are a concise assistant."},
{"role": "user", "content": "What are the benefits of RAG?"},
],
temperature=0.5,
max_tokens=300,
):
content = chunk["choices"][0]["delta"].get("content", "")
print(content, end="", flush=True)
print()
asyncio.run(main())
Inside a Hosted Service with FlowSDK¶
from flow_sdk import FlowSDK
flow = FlowSDK()
async def stream_response(user_message: str):
"""Stream a response from the AI Gateway."""
collected = []
async for chunk in await flow.gateway.stream_chat_completion(
model="anthropic/claude-sonnet-4-20250514",
messages=[{"role": "user", "content": user_message}],
):
token = chunk["choices"][0]["delta"].get("content", "")
collected.append(token)
yield token # forward to caller
full_response = "".join(collected)
return full_response
Building an Agent with FlowSDK¶
A hosted service that acts as a simple RAG agent -- queries a dataset for context, then generates an answer.
from flow_sdk import FlowSDK
flow = FlowSDK()
async def answer_question(question: str) -> dict:
# Step 1: Generate an embedding for the question
embed_result = await flow.gateway.embedding(
model="openai/text-embedding-3-small",
input=question,
)
query_vector = embed_result["data"][0]["embedding"]
# Step 2: Search the knowledge base
search_results = await flow.datasets.vector_query(
dataset_id="knowledge-base",
embedding=query_vector,
vector_field="content_embedding",
top_k=5,
fields=["title", "content"],
)
# Step 3: Build context from search results
context_parts = []
for doc in search_results.get("results", []):
context_parts.append(f"## {doc['title']}\n{doc['content']}")
context = "\n\n".join(context_parts)
# Step 4: Generate an answer using the context
response = await flow.gateway.chat_completion(
model="openai/gpt-4o",
messages=[
{
"role": "system",
"content": (
"Answer the user's question using the provided context. "
"If the context does not contain relevant information, say so."
),
},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {question}"},
],
temperature=0.3,
max_tokens=1000,
)
answer = response["choices"][0]["message"]["content"]
# Step 5: Log the interaction for audit
await flow.audit.log(
action="question.answered",
resource_type="knowledge-base",
resource_id="knowledge-base",
details={
"question": question,
"sources": len(search_results.get("results", [])),
},
)
return {
"answer": answer,
"sources": [doc["title"] for doc in search_results.get("results", [])],
"tokens_used": response["usage"]["total_tokens"],
}
Working with Datasets¶
Query with Filters¶
from flow_sdk import FlowSDK
flow = FlowSDK()
# Structured query with filters
active_users = await flow.datasets.query(
dataset_id="users",
request={
"fields": ["id", "name", "email", "plan", "last_active"],
"filters": [
{"field": "plan", "op": "eq", "value": "enterprise"},
{"field": "last_active", "op": "gte", "value": "2025-01-01"},
],
"limit": 100,
"offset": 0,
},
)
for user in active_users.get("results", []):
print(f"{user['name']} ({user['email']}) - {user['plan']}")
Vector Similarity Search¶
# Get embedding for the search query
embed = await flow.gateway.embedding(
model="openai/text-embedding-3-small",
input="machine learning best practices",
)
query_vector = embed["data"][0]["embedding"]
# Vector search with metadata filters
results = await flow.datasets.vector_query(
dataset_id="articles",
embedding=query_vector,
vector_field="content_embedding",
top_k=10,
distance_metric="cosine",
fields=["title", "author", "content"],
filters=[{"field": "category", "op": "eq", "value": "engineering"}],
)
Orchestrating Code Blocks¶
Run a Code Block and Extract Output¶
from flow_sdk import FlowSDK
flow = FlowSDK()
# Run and get just the output (unwrapped from the execution envelope)
output = await flow.code_blocks.run(
code_block_id="data-processor-uuid",
input_parameters={
"source_url": "https://example.com/data.csv",
"transform": "normalize",
},
result_mode="output",
)
print(f"Processed {output['row_count']} rows")
Chain Multiple Code Blocks¶
# Step 1: Fetch data
fetch_result = await flow.code_blocks.run(
code_block_id="data-fetcher-uuid",
input_parameters={"url": "https://api.example.com/data"},
result_mode="output",
)
# Step 2: Transform data
transform_result = await flow.code_blocks.run(
code_block_id="data-transformer-uuid",
input_parameters={"data": fetch_result},
result_mode="output",
)
# Step 3: Load into dataset
load_result = await flow.code_blocks.run(
code_block_id="data-loader-uuid",
input_parameters={
"data": transform_result,
"dataset_id": "processed-data",
},
result_mode="output",
)
print(f"Loaded {load_result['records_written']} records")
Persistent State¶
Track state across invocations of a hosted service.
from flow_sdk import FlowSDK
flow = FlowSDK()
async def incremental_sync():
"""Sync only new records since the last run."""
# Retrieve the last sync cursor
last_cursor = await flow.state.get("sync_cursor")
# Fetch new data from the connector
new_records = await flow.connectors.execute_operation(
connector_instance_id="salesforce-instance-uuid",
operation="read_query",
params={
"query": f"SELECT * FROM Contact WHERE LastModifiedDate > '{last_cursor}'",
},
)
if new_records.get("rows"):
# Process new records...
process_records(new_records["rows"])
# Update the cursor to the latest timestamp
latest = max(r["LastModifiedDate"] for r in new_records["rows"])
await flow.state.set("sync_cursor", latest)
print(f"Synced {len(new_records['rows'])} records, cursor now: {latest}")
else:
print("No new records to sync.")
Service-to-Service Calls¶
Call another hosted service and pass through authentication.
from flow_sdk import FlowSDK
flow = FlowSDK()
async def enrich_and_store(document: dict) -> dict:
# Call the enrichment service (reuse caller's token for auth chain)
enriched = await flow.services.post(
"/enrich",
body={"text": document["content"]},
service_id="enrichment-service-uuid",
reuse_service_token=True,
)
# Call the storage service
stored = await flow.services.post(
"/documents",
body={
"title": document["title"],
"content": document["content"],
"entities": enriched["entities"],
"summary": enriched["summary"],
},
service_id="storage-service-uuid",
reuse_service_token=True,
)
return {"document_id": stored["id"], "entities_found": len(enriched["entities"])}
Error Handling Patterns¶
Retry with Backoff¶
import asyncio
from flow_sdk import FlowSDK
from flow_sdk.gateway_client import GatewayRateLimitError, GatewayProviderError
flow = FlowSDK()
async def chat_with_retry(messages: list, max_attempts: int = 3) -> dict:
"""Call the gateway with exponential backoff on transient errors."""
for attempt in range(max_attempts):
try:
return await flow.gateway.chat_completion(
model="openai/gpt-4o",
messages=messages,
)
except GatewayRateLimitError:
if attempt == max_attempts - 1:
raise
wait = 2 ** attempt # 1s, 2s, 4s
print(f"Rate limited, retrying in {wait}s...")
await asyncio.sleep(wait)
except GatewayProviderError:
if attempt == max_attempts - 1:
raise
wait = 2 ** attempt
print(f"Provider error, retrying in {wait}s...")
await asyncio.sleep(wait)
Graceful Fallback¶
from flow_sdk.gateway_client import GatewayModelNotFoundError
async def chat_with_fallback(messages: list) -> dict:
"""Try the primary model, fall back to a secondary model."""
try:
return await flow.gateway.chat_completion(
model="anthropic/claude-sonnet-4-20250514",
messages=messages,
)
except GatewayModelNotFoundError:
print("Primary model unavailable, falling back...")
return await flow.gateway.chat_completion(
model="openai/gpt-4o-mini",
messages=messages,
)
Comprehensive Error Handling¶
from flow_sdk import FlowSDK, FlowSDKError
from flow_sdk.platform_client import (
PlatformAuthenticationError,
PlatformPermissionError,
PlatformNotFoundError,
PlatformExecutionError,
)
flow = FlowSDK()
async def safe_execute(code_block_id: str, params: dict) -> dict | None:
"""Execute a code block with comprehensive error handling."""
try:
return await flow.code_blocks.run(
code_block_id=code_block_id,
input_parameters=params,
result_mode="output",
)
except PlatformAuthenticationError:
await flow.audit.log(
action="execution.auth_failure",
resource_type="code_block",
resource_id=code_block_id,
details={"error": "authentication_failed"},
)
return None
except PlatformNotFoundError:
print(f"Code block {code_block_id} not found")
return None
except PlatformPermissionError:
print(f"No permission to execute {code_block_id}")
return None
except PlatformExecutionError as e:
print(f"Execution failed: {e}")
return None
except TimeoutError:
print(f"Execution of {code_block_id} timed out")
return None
except FlowSDKError as e:
print(f"Unexpected SDK error: {e}")
return None
Scaffold, Develop, Deploy Workflow¶
Coming Soon
The flow-sdk CLI for local scaffolding, testing, and deployment will be publicly available in an upcoming release.
End-to-end workflow for creating and deploying a new component.
# my-processor/component.py
from flow_sdk import BaseComponent, ComponentInput, ComponentOutput
class MyProcessor(BaseComponent):
async def execute(self, input: ComponentInput) -> ComponentOutput:
data = input.parameters.get("data", "")
processed = data.upper() # your logic here
return ComponentOutput(output={"result": processed})