Skip to content

PlatformClient

Coming Soon — Standalone Usage

Standalone PlatformClient usage (outside the platform) is not yet publicly available. It will be released in an upcoming release. For code running inside platform runtimes, use FlowSDK.

PlatformClient is a low-level async HTTP client for making authenticated requests directly to the Manifest Platform API. It handles JWT authentication, error mapping, and execution orchestration.

Advanced Use Only

Most users should use FlowSDK (inside services) or CLIClient (from scripts). PlatformClient is intended for advanced scenarios where you need direct API access or are building custom platform integrations.

When to Use PlatformClient

Scenario Recommended Client
Code inside a hosted service FlowSDK
Invoking services from scripts CLIClient
AI model inference GatewayClient
Custom API endpoints not covered by FlowSDK PlatformClient
Building custom execution runners PlatformClient
Direct access to execution transport PlatformClient

Initialization

from flow_sdk import PlatformClient

# From environment variables (PLATFORM_API_URL, AUTH_TOKEN)
client = PlatformClient()

# Explicit configuration
client = PlatformClient(
    base_url="https://api.flow.marut.cloud",
    auth_token="your-jwt-token",
    timeout=300.0,
)

Constructor Parameters

Parameter Type Default Description
base_url str PLATFORM_API_URL env var Platform API base URL
auth_token str AUTH_TOKEN env var JWT authentication token
timeout float 300.0 Request timeout in seconds

Context Manager

async with PlatformClient() as client:
    result = await client._request("GET", "/api/v1/tenants/orgs")
# Connections are automatically closed

Making Authenticated Requests

The _request method is the foundation for all platform API calls. It adds authentication headers, handles error mapping, and parses responses:

# GET request
orgs = await client._request("GET", "/api/v1/tenants/orgs")

# POST with JSON body
result = await client._request(
    "POST",
    "/api/v1/orgs/{org_id}/workspaces/{ws_id}/datasets/{dataset_id}/query",
    json={"fields": ["name", "email"], "limit": 50},
)

# POST with query parameters
result = await client._request(
    "GET",
    "/api/v1/gateway/v1/models",
    params={"scope": "organization"},
)

# PUT with raw bytes
await client._request(
    "PUT",
    f"/api/v1/orgs/{org_id}/storage/objects/my-file.pdf",
    content=file_bytes,
    headers={"Content-Type": "application/pdf"},
)

_request() Parameters

Parameter Type Description
method str HTTP method (GET, POST, PUT, DELETE)
path str API path (e.g., /api/v1/tenants/orgs)
params dict \| None URL query parameters
json Any \| None JSON request body (auto-serialized)
content bytes \| None Raw request body
headers dict \| None Additional HTTP headers

The method returns parsed JSON for application/json responses, or raw bytes otherwise.


Component Execution

PlatformClient provides the execution primitives that FlowSDK's namespaces build upon.

Code Blocks

from uuid import UUID

# Synchronous execution (submit + poll)
result = await client.execute_code_block(
    code_block_id=UUID("..."),
    input_parameters={"text": "Process this."},
    context={"org_id": org_id, "workspace_id": ws_id},
    ring_id="ring-uuid",
    timeout_seconds=120,
)

Agents

# Isolated agent execution
result = await client.execute_agent(
    agent_id=UUID("..."),
    input_data={"question": "What are the trends?"},
    context={},
    execution_mode="isolated",
    ring_id="ring-uuid",
)

Workflows

result = await client.execute_workflow(
    workflow_id=UUID("..."),
    input_data={"document_url": "https://example.com/report.pdf"},
    ring_id="ring-uuid",
    timeout_seconds=300,
)

Durable Execution API

For finer control over execution lifecycle, use the submit/poll/cancel primitives directly:

# 1. Submit an execution
handle = await client.submit_code_block_execution(
    code_block_id=UUID("..."),
    input_parameters={"data": "value"},
    ring_id="ring-uuid",
    timeout_seconds=300,
)
execution_id = handle["execution_id"]

# 2. Poll for status
status = await client.get_execution_status(execution_id)
print(f"Status: {status['status']}")

# 3. Wait for terminal state
result = await client.await_execution_result(
    execution_id,
    timeout_seconds=300,
    poll_interval_seconds=1.0,
)

# 4. Cancel if needed
await client.cancel_execution(execution_id)

Execution Lifecycle

stateDiagram-v2
    [*] --> pending: submit
    pending --> running: worker picks up
    running --> completed: success
    running --> failed: error
    running --> timeout: deadline exceeded
    running --> cancelled: cancel requested
    pending --> cancelled: cancel requested
    completed --> [*]
    failed --> [*]
    timeout --> [*]
    cancelled --> [*]

HSL Skills

Fetch the runtime-neutral skills artifact for a hosted service:

skills = await client.get_service_skills(
    service_id="service-uuid",
    version=2,  # optional -- latest if omitted
)
print(skills["endpoints"])

Error Handling

PlatformClient maps HTTP status codes to typed exceptions:

from flow_sdk.platform_client import (
    PlatformClient,
    PlatformClientError,
    PlatformAuthenticationError,
    PlatformPermissionError,
    PlatformNotFoundError,
    PlatformExecutionError,
)

try:
    result = await client._request("GET", "/api/v1/some/endpoint")
except PlatformAuthenticationError:
    print("401 -- Token expired or invalid")
except PlatformPermissionError:
    print("403 -- Insufficient permissions")
except PlatformNotFoundError:
    print("404 -- Resource not found")
except PlatformExecutionError:
    print("500 -- Server-side execution failure")
except PlatformClientError as e:
    print(f"Other error: {e}")

Exception Reference

Exception HTTP Status Description
PlatformAuthenticationError 401 Invalid or expired JWT token
PlatformPermissionError 403 Token lacks required permissions
PlatformNotFoundError 404 Resource does not exist
PlatformExecutionError 500 Server-side execution failure
PlatformClientError Other Base class for all platform errors

Billing Context

PlatformClient automatically forwards billing context headers when present in the environment:

Header Environment Variable Purpose
X-Parent-Billing-Txn-Id PARENT_BILLING_TXN_ID Hierarchical cost roll-up for nested executions
X-Lifecycle-Event LIFECYCLE_EVENT Billing discount propagation (test, workflow, agent_run)

These headers enable the platform to attribute costs accurately when components invoke other components.


Resource Cleanup

Always close the client to release connections:

async with PlatformClient() as client:
    result = await client._request("GET", "/api/v1/tenants/orgs")
client = PlatformClient()
try:
    result = await client._request("GET", "/api/v1/tenants/orgs")
finally:
    await client.close()