Skip to content

FlowSDK

FlowSDK is the primary interface for code running inside the Manifest Platform -- hosted services, code blocks, agents, and workflows. It provides a unified facade with typed namespaces for every platform capability: AI inference, datasets, execution orchestration, state management, secrets, storage, and more.

Available Now

FlowSDK is available today inside all Manifest Platform runtimes — Code Blocks, Hosted Services, Agents, and Workflows. No installation required.

Initialization

Inside a hosted service or code block, FlowSDK auto-configures from environment variables injected by the platform runtime:

from flow_sdk import FlowSDK

flow = FlowSDK()

For local development or testing, pass configuration explicitly:

flow = FlowSDK(
    org_id="your-org-uuid",
    workspace_id="your-workspace-uuid",
    auth_token="your-jwt-token",
    platform_url="https://api.flow.marut.cloud",
    gateway_url="https://api.flow.marut.cloud",
)

Constructor Parameters

Parameter Type Default Description
org_id str \| UUID ORG_ID env var Organization ID
workspace_id str \| UUID WORKSPACE_ID env var Workspace ID
auth_token str AUTH_TOKEN env var JWT authentication token
platform_url str PLATFORM_API_URL env var Platform API base URL
gateway_url str GATEWAY_URL env var AI Gateway URL (falls back to platform_url)
service_id str \| UUID SERVICE_ID env var Hosted service ID (for service context)
ring_id str \| UUID RING_ID env var Deployment ring ID

Environment Auto-Configuration

When your code runs inside the platform, all required environment variables are automatically set. You never need to pass constructor arguments in production.

Namespaces

FlowSDK organizes its capabilities into namespaces accessed as properties on the flow instance:

graph LR
    FlowSDK --> gateway["flow.gateway"]
    FlowSDK --> datasets["flow.datasets"]
    FlowSDK --> code_blocks["flow.code_blocks"]
    FlowSDK --> agents["flow.agents"]
    FlowSDK --> workflows["flow.workflows"]
    FlowSDK --> services["flow.services"]
    FlowSDK --> connectors["flow.connectors"]
    FlowSDK --> state["flow.state"]
    FlowSDK --> secrets["flow.secrets"]
    FlowSDK --> storage["flow.storage"]
    FlowSDK --> vector["flow.vector"]
    FlowSDK --> memory["flow.memory"]
    FlowSDK --> auth["flow.auth"]
    FlowSDK --> audit["flow.audit"]
    FlowSDK --> jobs["flow.jobs"]
    FlowSDK --> service_tokens["flow.service_tokens"]
    FlowSDK --> conversations["flow.conversations"]
    FlowSDK --> service["flow.service"]

Gateway

Call the AI Gateway for chat completions and embeddings.

# Non-streaming chat completion
response = await flow.gateway.chat_completion(
    model="openai/gpt-4o",
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "What is the capital of France?"},
    ],
    temperature=0.7,
    max_tokens=256,
)
print(response["choices"][0]["message"]["content"])
# Streaming chat completion
async for chunk in await flow.gateway.stream_chat_completion(
    model="anthropic/claude-sonnet-4-20250514",
    messages=[{"role": "user", "content": "Write a haiku about code."}],
):
    delta = chunk["choices"][0]["delta"]
    print(delta.get("content", ""), end="", flush=True)
# Generate embeddings
result = await flow.gateway.embedding(
    model="openai/text-embedding-3-small",
    input="Manifest Platform is an enterprise AI operating system.",
)
vector = result["data"][0]["embedding"]

Datasets

Query platform datasets with structured filters or vector search.

# Structured query
rows = await flow.datasets.query(
    dataset_id="customer-records",
    request={
        "fields": ["name", "email", "plan"],
        "filters": [{"field": "plan", "op": "eq", "value": "enterprise"}],
        "limit": 50,
    },
)
# Vector similarity search
results = await flow.datasets.vector_query(
    dataset_id="knowledge-base",
    embedding=query_vector,       # list[float] from an embedding model
    vector_field="content_embedding",
    top_k=5,
    distance_metric="cosine",
    fields=["title", "content"],
)

Code Blocks

Submit and manage durable code block executions.

# Submit and wait for the result in one call
result = await flow.code_blocks.run(
    code_block_id="cb-uuid-here",
    input_parameters={"url": "https://example.com"},
    ring_id="dev-ring-uuid",
)
print(result)
# Submit asynchronously
handle = await flow.code_blocks.submit(
    code_block_id="cb-uuid-here",
    input_parameters={"url": "https://example.com"},
    ring_id="dev-ring-uuid",
)

execution_id = handle["execution_id"]
print(f"Submitted: {execution_id}")

# Poll for the result
result = await flow.code_blocks.await_result(
    execution_id,
    timeout_seconds=120,
)
# Use result_mode="output" to unwrap the execution envelope
output = await flow.code_blocks.run(
    code_block_id="cb-uuid-here",
    input_parameters={"text": "Analyze this."},
    result_mode="output",  # returns just the output, not the envelope
)

Code Block Methods

Method Description
run(code_block_id, input_parameters, result_mode) Submit and wait for result
submit(code_block_id, input_parameters) Submit execution, return handle
get_status(execution_id) Check execution status
await_result(execution_id, timeout_seconds) Poll until terminal state
cancel(execution_id) Best-effort cancellation

Agents

Submit and manage durable agent executions.

# Run an agent and wait for the result
result = await flow.agents.run(
    agent_id="agent-uuid-here",
    input_data={"question": "What were last quarter's revenue trends?"},
    ring_id="dev-ring-uuid",
)
print(result["output"])
# Submit asynchronously and poll later
handle = await flow.agents.submit(
    agent_id="agent-uuid-here",
    input_data={"question": "Summarize the dataset."},
    ring_id="dev-ring-uuid",
)

status = await flow.agents.get_status(handle["execution_id"])
print(f"Status: {status['status']}")

Workflows

Submit and manage durable workflow executions.

result = await flow.workflows.run(
    workflow_id="workflow-uuid-here",
    input_data={"document_url": "https://example.com/report.pdf"},
    ring_id="dev-ring-uuid",
)

Ring ID Required

All execution methods (code_blocks, agents, workflows) require a ring_id. Inside a hosted service this is automatically read from the RING_ID environment variable.


Services

Call other hosted service endpoints from within a running service.

# Call another service's endpoint
response = await flow.services.post(
    "/summarize",
    body={"text": "Long document content here..."},
    service_id="target-service-uuid",
)
# Call with version pinning
response = await flow.services.call(
    "GET",
    "/health",
    service_id="target-service-uuid",
    version=2,
)
# Reuse the caller's service token for downstream calls
response = await flow.services.post(
    "/process",
    body={"data": payload},
    service_id="downstream-service-uuid",
    reuse_service_token=True,
)
# Strict caller-token forwarding (enforced auth header precedence)
response = await flow.services.post(
    "/process",
    body={"data": payload},
    service_id="downstream-service-uuid",
    auth="caller_service_token",
)

Caller Token Precedence

With auth="caller_service_token", runtime inbound auth headers are enforced and caller-supplied headers cannot override them.

Convenience Methods

Method Description
call(method, path, body, service_id, version) Generic HTTP call
get(path, **kwargs) GET request
post(path, **kwargs) POST request
put(path, **kwargs) PUT request
delete(path, **kwargs) DELETE request
skills(service_id, version) Fetch HSL skills artifact

Service Tokens

Mint and manage HSL service tokens from runtime code:

minted = await flow.service_tokens.mint(
    {"tenant_id": "t-123", "role": "reader"},
    ttl_minutes=60,
    service_id="target-service-uuid",
)

Canonical signature:

  • flow.service_tokens.mint(claims, *, ttl_minutes=60, service_id=None)

Connectors

Execute operations against connector instances.

# Execute a read operation
result = await flow.connectors.execute_operation(
    connector_instance_id="instance-uuid",
    operation="read_query",
    params={"query": "SELECT * FROM users LIMIT 10"},
)
# Execute DDL (preview mode)
preview = await flow.connectors.execute_ddl(
    connector_instance_id="instance-uuid",
    sql="ALTER TABLE users ADD COLUMN active BOOLEAN DEFAULT true",
    confirm=False,  # preview only -- set True to execute
)

Large Writes

Write operations with more than 100 rows or 256 KB of data are automatically routed through the async write job system. The SDK handles submission, polling, and result retrieval transparently.


State

Persistent key-value state scoped to the running component.

# Store a value
await flow.state.set("last_sync_cursor", "2025-01-15T10:30:00Z")

# Retrieve a value
cursor = await flow.state.get("last_sync_cursor")

# Delete a value
await flow.state.delete("last_sync_cursor")

# Initialize state from component spec defaults
await flow.state.initialize()

Runtime Binding Required

flow.state is runtime-only and fail-closed. It requires a bound hosted-service request context with service.component_id and valid runtime auth/platform context.


Secrets

Access secrets stored in the platform's secret manager.

api_key = await flow.secrets.get("OPENAI_API_KEY")

Read-Only at Runtime

flow.secrets is read-only in runtime code. Create/update secrets through platform management APIs/CLI.

Breaking Changes

  • flow.service_tokens.mint(api_key, claims, ...) was removed.
    Use flow.service_tokens.mint(claims, *, ttl_minutes=..., service_id=...).

Storage

Upload and download objects from platform storage.

# Upload a file
await flow.storage.upload(
    key="reports/q4-summary.pdf",
    data=pdf_bytes,
    content_type="application/pdf",
)

# Download a file
content = await flow.storage.download("reports/q4-summary.pdf")

# Generate pre-signed URLs
put_url = await flow.storage.presign_put("uploads/image.png", ttl_seconds=600)
get_url = await flow.storage.presign_get("uploads/image.png", ttl_seconds=3600)

Create and query vector indexes directly.

# Create an index
index = await flow.vector.create_index(
    name="product-embeddings",
    dimensions=1536,
    metric="cosine",
)

# Upsert vectors
await flow.vector.upsert(
    index_id=index["id"],
    vectors=[
        {"id": "doc-1", "values": embedding_1, "metadata": {"title": "Doc 1"}},
        {"id": "doc-2", "values": embedding_2, "metadata": {"title": "Doc 2"}},
    ],
)

# Search
results = await flow.vector.search(
    index_id=index["id"],
    query_embedding=query_vector,
    top_k=5,
)

Memory

Semantic memory with automatic embedding -- store and recall content without managing vectors directly.

# Store a memory
await flow.memory.store(
    content="The customer prefers email communication.",
    metadata={"customer_id": "cust-123"},
    namespace="customer-prefs",
)

# Recall relevant memories
memories = await flow.memory.recall(
    query="How does the customer prefer to be contacted?",
    top_k=3,
    namespace="customer-prefs",
)

# Forget specific memories
await flow.memory.forget(
    vector_ids=["mem-uuid-1", "mem-uuid-2"],
    namespace="customer-prefs",
)

Auth

Access the caller's identity context inside a hosted service.

# Get the caller's subject ID
subject = flow.auth.subject_id

# Check scopes
if "admin" in flow.auth.scopes:
    # perform privileged operation
    ...

# Access custom claims from service tokens
tenant_id = flow.auth.custom_claims.get("tenant_id")
# or use the shorthand:
tenant_id = flow.auth.claim("tenant_id")

Service Auth

For more advanced service-to-service authentication patterns (token reuse, header forwarding), see flow.service.auth:

# Get the inbound service token
token = flow.service.auth.token(required=True)

# Build headers for downstream service calls
headers = flow.service.auth.reuse(required=True)

Conversations

List and retrieve agent conversations stored by the platform.

# List conversations in the current workspace
convos = await flow.conversations.list(limit=20, offset=0)

# Retrieve a specific conversation
convo = await flow.conversations.get("conversation-uuid")

Audit

Write audit log entries for compliance tracking.

await flow.audit.log(
    action="document.reviewed",
    resource_type="document",
    resource_id="doc-uuid-here",
    details={"reviewer": flow.auth.subject_id, "decision": "approved"},
)

Async Jobs

Submit long-running work to the async job system.

# Submit an async job
job = await flow.jobs.submit(
    endpoint_path="/heavy-compute",
    body={"dataset_id": "large-dataset"},
    lane="standard",
)
job_id = job["job_id"]

# Check status
status = await flow.jobs.get_status(job_id)

# Cancel if needed
await flow.jobs.cancel(job_id)

Error Handling

All FlowSDK operations raise FlowSDKError or its subclasses for SDK-level issues. Underlying platform and gateway errors propagate from PlatformClientError and GatewayClientError hierarchies.

from flow_sdk import FlowSDK, FlowSDKError
from flow_sdk.platform_client import PlatformNotFoundError, PlatformAuthenticationError
from flow_sdk.gateway_client import GatewayRateLimitError

flow = FlowSDK()

try:
    result = await flow.code_blocks.run(
        code_block_id="nonexistent-uuid",
        input_parameters={},
    )
except PlatformNotFoundError:
    print("Code block not found")
except PlatformAuthenticationError:
    print("Authentication expired -- token refresh needed")
except GatewayRateLimitError:
    print("Rate limited -- back off and retry")
except FlowSDKError as e:
    print(f"SDK error: {e}")

Error Hierarchy

FlowSDKError
PlatformClientError
  +-- PlatformAuthenticationError  (401)
  +-- PlatformPermissionError      (403)
  +-- PlatformNotFoundError        (404)
  +-- PlatformExecutionError       (500)
GatewayClientError
  +-- GatewayAuthError             (401/403)
  +-- GatewayModelNotFoundError    (404)
  +-- GatewayRateLimitError        (429)
  +-- GatewayProviderError         (502)

Request Context Binding

In hosted services that handle concurrent requests, bind per-request context so that flow.auth and execution metadata are correct for each request:

token = flow.bind_request_context(
    auth={
        "subject_id": "user-123",
        "scopes": ["read", "write"],
        "token_type": "user",
    },
    execution={
        "execution_id": "exec-uuid",
        "request_id": "req-uuid",
    },
)

try:
    # flow.auth.subject_id is now "user-123" in this async context
    await handle_request()
finally:
    flow.reset_request_context(token)

Framework Integration

The platform runtime typically calls bind_request_context and reset_request_context automatically for each inbound request. You only need to call these manually if you are building custom request handling logic.