FlowSDK¶
FlowSDK is the primary interface for code running inside the Manifest Platform -- hosted services, code blocks, agents, and workflows. It provides a unified facade with typed namespaces for every platform capability: AI inference, datasets, execution orchestration, state management, secrets, storage, and more.
Available Now
FlowSDK is available today inside all Manifest Platform runtimes — Code Blocks, Hosted Services, Agents, and Workflows. No installation required.
Initialization¶
Inside a hosted service or code block, FlowSDK auto-configures from environment variables injected by the platform runtime:
For local development or testing, pass configuration explicitly:
flow = FlowSDK(
org_id="your-org-uuid",
workspace_id="your-workspace-uuid",
auth_token="your-jwt-token",
platform_url="https://api.flow.marut.cloud",
gateway_url="https://api.flow.marut.cloud",
)
Constructor Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
org_id |
str \| UUID |
ORG_ID env var |
Organization ID |
workspace_id |
str \| UUID |
WORKSPACE_ID env var |
Workspace ID |
auth_token |
str |
AUTH_TOKEN env var |
JWT authentication token |
platform_url |
str |
PLATFORM_API_URL env var |
Platform API base URL |
gateway_url |
str |
GATEWAY_URL env var |
AI Gateway URL (falls back to platform_url) |
service_id |
str \| UUID |
SERVICE_ID env var |
Hosted service ID (for service context) |
ring_id |
str \| UUID |
RING_ID env var |
Deployment ring ID |
Environment Auto-Configuration
When your code runs inside the platform, all required environment variables are automatically set. You never need to pass constructor arguments in production.
Namespaces¶
FlowSDK organizes its capabilities into namespaces accessed as properties on the flow instance:
graph LR
FlowSDK --> gateway["flow.gateway"]
FlowSDK --> datasets["flow.datasets"]
FlowSDK --> code_blocks["flow.code_blocks"]
FlowSDK --> agents["flow.agents"]
FlowSDK --> workflows["flow.workflows"]
FlowSDK --> services["flow.services"]
FlowSDK --> connectors["flow.connectors"]
FlowSDK --> state["flow.state"]
FlowSDK --> secrets["flow.secrets"]
FlowSDK --> storage["flow.storage"]
FlowSDK --> vector["flow.vector"]
FlowSDK --> memory["flow.memory"]
FlowSDK --> auth["flow.auth"]
FlowSDK --> audit["flow.audit"]
FlowSDK --> jobs["flow.jobs"]
FlowSDK --> service_tokens["flow.service_tokens"]
FlowSDK --> conversations["flow.conversations"]
FlowSDK --> service["flow.service"]
Gateway¶
Call the AI Gateway for chat completions and embeddings.
# Non-streaming chat completion
response = await flow.gateway.chat_completion(
model="openai/gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "What is the capital of France?"},
],
temperature=0.7,
max_tokens=256,
)
print(response["choices"][0]["message"]["content"])
# Streaming chat completion
async for chunk in await flow.gateway.stream_chat_completion(
model="anthropic/claude-sonnet-4-20250514",
messages=[{"role": "user", "content": "Write a haiku about code."}],
):
delta = chunk["choices"][0]["delta"]
print(delta.get("content", ""), end="", flush=True)
# Generate embeddings
result = await flow.gateway.embedding(
model="openai/text-embedding-3-small",
input="Manifest Platform is an enterprise AI operating system.",
)
vector = result["data"][0]["embedding"]
Datasets¶
Query platform datasets with structured filters or vector search.
# Structured query
rows = await flow.datasets.query(
dataset_id="customer-records",
request={
"fields": ["name", "email", "plan"],
"filters": [{"field": "plan", "op": "eq", "value": "enterprise"}],
"limit": 50,
},
)
# Vector similarity search
results = await flow.datasets.vector_query(
dataset_id="knowledge-base",
embedding=query_vector, # list[float] from an embedding model
vector_field="content_embedding",
top_k=5,
distance_metric="cosine",
fields=["title", "content"],
)
Code Blocks¶
Submit and manage durable code block executions.
# Submit asynchronously
handle = await flow.code_blocks.submit(
code_block_id="cb-uuid-here",
input_parameters={"url": "https://example.com"},
ring_id="dev-ring-uuid",
)
execution_id = handle["execution_id"]
print(f"Submitted: {execution_id}")
# Poll for the result
result = await flow.code_blocks.await_result(
execution_id,
timeout_seconds=120,
)
Code Block Methods¶
| Method | Description |
|---|---|
run(code_block_id, input_parameters, result_mode) |
Submit and wait for result |
submit(code_block_id, input_parameters) |
Submit execution, return handle |
get_status(execution_id) |
Check execution status |
await_result(execution_id, timeout_seconds) |
Poll until terminal state |
cancel(execution_id) |
Best-effort cancellation |
Agents¶
Submit and manage durable agent executions.
# Run an agent and wait for the result
result = await flow.agents.run(
agent_id="agent-uuid-here",
input_data={"question": "What were last quarter's revenue trends?"},
ring_id="dev-ring-uuid",
)
print(result["output"])
# Submit asynchronously and poll later
handle = await flow.agents.submit(
agent_id="agent-uuid-here",
input_data={"question": "Summarize the dataset."},
ring_id="dev-ring-uuid",
)
status = await flow.agents.get_status(handle["execution_id"])
print(f"Status: {status['status']}")
Workflows¶
Submit and manage durable workflow executions.
result = await flow.workflows.run(
workflow_id="workflow-uuid-here",
input_data={"document_url": "https://example.com/report.pdf"},
ring_id="dev-ring-uuid",
)
Ring ID Required
All execution methods (code_blocks, agents, workflows) require a ring_id. Inside a hosted service this is automatically read from the RING_ID environment variable.
Services¶
Call other hosted service endpoints from within a running service.
# Call another service's endpoint
response = await flow.services.post(
"/summarize",
body={"text": "Long document content here..."},
service_id="target-service-uuid",
)
# Call with version pinning
response = await flow.services.call(
"GET",
"/health",
service_id="target-service-uuid",
version=2,
)
# Reuse the caller's service token for downstream calls
response = await flow.services.post(
"/process",
body={"data": payload},
service_id="downstream-service-uuid",
reuse_service_token=True,
)
# Strict caller-token forwarding (enforced auth header precedence)
response = await flow.services.post(
"/process",
body={"data": payload},
service_id="downstream-service-uuid",
auth="caller_service_token",
)
Caller Token Precedence
With auth="caller_service_token", runtime inbound auth headers are enforced and caller-supplied headers cannot override them.
Convenience Methods¶
| Method | Description |
|---|---|
call(method, path, body, service_id, version) |
Generic HTTP call |
get(path, **kwargs) |
GET request |
post(path, **kwargs) |
POST request |
put(path, **kwargs) |
PUT request |
delete(path, **kwargs) |
DELETE request |
skills(service_id, version) |
Fetch HSL skills artifact |
Service Tokens¶
Mint and manage HSL service tokens from runtime code:
minted = await flow.service_tokens.mint(
{"tenant_id": "t-123", "role": "reader"},
ttl_minutes=60,
service_id="target-service-uuid",
)
Canonical signature:
flow.service_tokens.mint(claims, *, ttl_minutes=60, service_id=None)
Connectors¶
Execute operations against connector instances.
# Execute a read operation
result = await flow.connectors.execute_operation(
connector_instance_id="instance-uuid",
operation="read_query",
params={"query": "SELECT * FROM users LIMIT 10"},
)
# Execute DDL (preview mode)
preview = await flow.connectors.execute_ddl(
connector_instance_id="instance-uuid",
sql="ALTER TABLE users ADD COLUMN active BOOLEAN DEFAULT true",
confirm=False, # preview only -- set True to execute
)
Large Writes
Write operations with more than 100 rows or 256 KB of data are automatically routed through the async write job system. The SDK handles submission, polling, and result retrieval transparently.
State¶
Persistent key-value state scoped to the running component.
# Store a value
await flow.state.set("last_sync_cursor", "2025-01-15T10:30:00Z")
# Retrieve a value
cursor = await flow.state.get("last_sync_cursor")
# Delete a value
await flow.state.delete("last_sync_cursor")
# Initialize state from component spec defaults
await flow.state.initialize()
Runtime Binding Required
flow.state is runtime-only and fail-closed. It requires a bound hosted-service request context with service.component_id and valid runtime auth/platform context.
Secrets¶
Access secrets stored in the platform's secret manager.
Read-Only at Runtime
flow.secrets is read-only in runtime code. Create/update secrets through platform management APIs/CLI.
Breaking Changes¶
flow.service_tokens.mint(api_key, claims, ...)was removed.
Useflow.service_tokens.mint(claims, *, ttl_minutes=..., service_id=...).
Storage¶
Upload and download objects from platform storage.
# Upload a file
await flow.storage.upload(
key="reports/q4-summary.pdf",
data=pdf_bytes,
content_type="application/pdf",
)
# Download a file
content = await flow.storage.download("reports/q4-summary.pdf")
# Generate pre-signed URLs
put_url = await flow.storage.presign_put("uploads/image.png", ttl_seconds=600)
get_url = await flow.storage.presign_get("uploads/image.png", ttl_seconds=3600)
Vector Search¶
Create and query vector indexes directly.
# Create an index
index = await flow.vector.create_index(
name="product-embeddings",
dimensions=1536,
metric="cosine",
)
# Upsert vectors
await flow.vector.upsert(
index_id=index["id"],
vectors=[
{"id": "doc-1", "values": embedding_1, "metadata": {"title": "Doc 1"}},
{"id": "doc-2", "values": embedding_2, "metadata": {"title": "Doc 2"}},
],
)
# Search
results = await flow.vector.search(
index_id=index["id"],
query_embedding=query_vector,
top_k=5,
)
Memory¶
Semantic memory with automatic embedding -- store and recall content without managing vectors directly.
# Store a memory
await flow.memory.store(
content="The customer prefers email communication.",
metadata={"customer_id": "cust-123"},
namespace="customer-prefs",
)
# Recall relevant memories
memories = await flow.memory.recall(
query="How does the customer prefer to be contacted?",
top_k=3,
namespace="customer-prefs",
)
# Forget specific memories
await flow.memory.forget(
vector_ids=["mem-uuid-1", "mem-uuid-2"],
namespace="customer-prefs",
)
Auth¶
Access the caller's identity context inside a hosted service.
# Get the caller's subject ID
subject = flow.auth.subject_id
# Check scopes
if "admin" in flow.auth.scopes:
# perform privileged operation
...
# Access custom claims from service tokens
tenant_id = flow.auth.custom_claims.get("tenant_id")
# or use the shorthand:
tenant_id = flow.auth.claim("tenant_id")
Service Auth
For more advanced service-to-service authentication patterns (token reuse, header forwarding), see flow.service.auth:
Conversations¶
List and retrieve agent conversations stored by the platform.
# List conversations in the current workspace
convos = await flow.conversations.list(limit=20, offset=0)
# Retrieve a specific conversation
convo = await flow.conversations.get("conversation-uuid")
Audit¶
Write audit log entries for compliance tracking.
await flow.audit.log(
action="document.reviewed",
resource_type="document",
resource_id="doc-uuid-here",
details={"reviewer": flow.auth.subject_id, "decision": "approved"},
)
Async Jobs¶
Submit long-running work to the async job system.
# Submit an async job
job = await flow.jobs.submit(
endpoint_path="/heavy-compute",
body={"dataset_id": "large-dataset"},
lane="standard",
)
job_id = job["job_id"]
# Check status
status = await flow.jobs.get_status(job_id)
# Cancel if needed
await flow.jobs.cancel(job_id)
Error Handling¶
All FlowSDK operations raise FlowSDKError or its subclasses for SDK-level issues. Underlying platform and gateway errors propagate from PlatformClientError and GatewayClientError hierarchies.
from flow_sdk import FlowSDK, FlowSDKError
from flow_sdk.platform_client import PlatformNotFoundError, PlatformAuthenticationError
from flow_sdk.gateway_client import GatewayRateLimitError
flow = FlowSDK()
try:
result = await flow.code_blocks.run(
code_block_id="nonexistent-uuid",
input_parameters={},
)
except PlatformNotFoundError:
print("Code block not found")
except PlatformAuthenticationError:
print("Authentication expired -- token refresh needed")
except GatewayRateLimitError:
print("Rate limited -- back off and retry")
except FlowSDKError as e:
print(f"SDK error: {e}")
Error Hierarchy¶
FlowSDKError
PlatformClientError
+-- PlatformAuthenticationError (401)
+-- PlatformPermissionError (403)
+-- PlatformNotFoundError (404)
+-- PlatformExecutionError (500)
GatewayClientError
+-- GatewayAuthError (401/403)
+-- GatewayModelNotFoundError (404)
+-- GatewayRateLimitError (429)
+-- GatewayProviderError (502)
Request Context Binding¶
In hosted services that handle concurrent requests, bind per-request context so that flow.auth and execution metadata are correct for each request:
token = flow.bind_request_context(
auth={
"subject_id": "user-123",
"scopes": ["read", "write"],
"token_type": "user",
},
execution={
"execution_id": "exec-uuid",
"request_id": "req-uuid",
},
)
try:
# flow.auth.subject_id is now "user-123" in this async context
await handle_request()
finally:
flow.reset_request_context(token)
Framework Integration
The platform runtime typically calls bind_request_context and reset_request_context automatically for each inbound request. You only need to call these manually if you are building custom request handling logic.