PlatformClient¶
flow_sdk.platform_client.PlatformClient(base_url=None, auth_token=None, org_id=None, workspace_id=None, timeout=None, on_auth_token_refresh=None)
¶
Client for calling platform APIs from workflow containers.
This client provides methods for executing components (codeblocks, agents, transforms) via platform APIs. All requests are authenticated with JWT tokens and all component executions run in isolated containers for security.
Attributes:
| Name | Type | Description |
|---|---|---|
base_url |
Platform API base URL (e.g., "https://api.flow.com") |
|
auth_token |
JWT authentication token |
|
client |
Async HTTP client instance |
Initialize platform client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
base_url
|
str | None
|
Platform API base URL (defaults to PLATFORM_API_URL env var) |
None
|
auth_token
|
str | None
|
JWT token (defaults to AUTH_TOKEN env var) |
None
|
org_id
|
str | None
|
Organization ID (defaults to ORG_ID env var) |
None
|
workspace_id
|
str | None
|
Workspace ID (defaults to WORKSPACE_ID env var) |
None
|
timeout
|
float | None
|
Request timeout in seconds (default: env override or 300) |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If base_url or auth_token not provided and not in environment |
Source code in flow_sdk/platform_client.py
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | |
__aenter__()
async
¶
__aexit__(exc_type, exc_val, exc_tb)
async
¶
close()
async
¶
get_service_skills(service_id, *, version=None)
async
¶
Fetch generated HSL skills artifact for a service/version.
Source code in flow_sdk/platform_client.py
execute_code_block(code_block_id, input_parameters, context, scope='workspace', lifecycle_event='workflow', timeout_seconds=300, caller_deployment_id=None, ring_id=None)
async
¶
Execute code block synchronously via durable execution APIs.
Source code in flow_sdk/platform_client.py
submit_code_block_execution(code_block_id, input_parameters, config=None, lifecycle_event='workflow', ring_id=None, timeout_seconds=300)
async
¶
Submit a durable codeblock execution and return its handle.
Source code in flow_sdk/platform_client.py
execute_agent(agent_id, input_data, context, execution_mode='inline', caller_deployment_id=None, ring_id=None, timeout_seconds=300)
async
¶
Execute agent via platform API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
agent_id
|
UUID
|
UUID of agent to execute |
required |
input_data
|
dict[str, Any]
|
Input data for agent |
required |
context
|
dict[str, Any]
|
Execution context |
required |
execution_mode
|
str
|
Execution mode (inline, isolated) |
'inline'
|
caller_deployment_id
|
str | UUID | None
|
Deployment ID of calling component (for ring inheritance) |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Agent execution result with output, usage, and messages |
Raises:
| Type | Description |
|---|---|
PlatformClientError
|
If execution fails |
Source code in flow_sdk/platform_client.py
execute_transform(transform_id, input_data, context, caller_deployment_id=None)
async
¶
Execute transform via platform API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
transform_id
|
UUID
|
UUID of transform to execute |
required |
input_data
|
Any
|
Input data for transform |
required |
context
|
dict[str, Any]
|
Execution context |
required |
caller_deployment_id
|
str | UUID | None
|
Deployment ID of calling component (for ring inheritance) |
None
|
Returns:
| Type | Description |
|---|---|
Any
|
Transform execution result with transformed output |
Raises:
| Type | Description |
|---|---|
PlatformClientError
|
If execution fails |
Source code in flow_sdk/platform_client.py
execute_workflow(workflow_id, input_data, context=None, config=None, auto_deploy=True, lifecycle_event='workflow', caller_deployment_id=None, ring_id=None, timeout_seconds=None)
async
¶
Execute workflow via platform API.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow_id
|
UUID
|
UUID of workflow to execute |
required |
input_data
|
dict[str, Any]
|
Input data for workflow |
required |
context
|
dict[str, Any] | None
|
Optional execution context for traceability |
None
|
config
|
dict[str, Any] | None
|
Optional configuration overrides |
None
|
auto_deploy
|
bool
|
Whether to auto-deploy workflow if not deployed |
True
|
lifecycle_event
|
str
|
Lifecycle event (test, workflow, agent_run, mcp_session) |
'workflow'
|
caller_deployment_id
|
str | UUID | None
|
Deployment ID of calling component (for ring inheritance) |
None
|
timeout_seconds
|
int | None
|
Optional workflow execution timeout override |
None
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Workflow execution result with outputs, metrics, and errors |
Raises:
| Type | Description |
|---|---|
PlatformClientError
|
If execution fails |
Source code in flow_sdk/platform_client.py
submit_workflow_execution(workflow_id, input_data, config=None, lifecycle_event='workflow', ring_id=None, timeout_seconds=300)
async
¶
Submit a durable workflow execution and return its handle.
Source code in flow_sdk/platform_client.py
submit_agent_execution(agent_id, input_data, config=None, lifecycle_event='agent_run', ring_id=None, timeout_seconds=300)
async
¶
Submit a durable isolated-agent execution and return its handle.
Source code in flow_sdk/platform_client.py
get_execution_status(execution_id)
async
¶
Fetch durable execution status.
Source code in flow_sdk/platform_client.py
cancel_execution(execution_id)
async
¶
Request best-effort cancellation of an execution.
Source code in flow_sdk/platform_client.py
await_execution_result(execution_id, *, timeout_seconds=300, poll_interval_seconds=0.5, not_found_retry_seconds=30.0)
async
¶
Poll durable execution state until terminal.
Source code in flow_sdk/platform_client.py
fetch_execution_input(deployment_id, execution_id)
async
¶
Fetch input data for pull-based execution via NATS.
Subscribes to exec.input.{execution_id} (WorkQueue stream, DeliverPolicy.ALL) since the platform publishes input before the container starts.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deployment_id
|
str | UUID | None
|
Deployment ID |
required |
execution_id
|
str | UUID
|
Execution ID |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict with input_data, session_id, messages, state |
Source code in flow_sdk/platform_client.py
push_execution_output(deployment_id, execution_id, output, usage=None, messages=None, state=None, error=None)
async
¶
Push execution output back to platform via NATS.
Publishes to exec.output.{execution_id}. The platform subscribes to this subject and deserializes with NATSExecutionOutput.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deployment_id
|
str | UUID | None
|
Deployment ID |
required |
execution_id
|
str | UUID
|
Execution ID |
required |
output
|
Any
|
Execution output data |
required |
usage
|
dict[str, Any] | None
|
Token usage stats (optional) |
None
|
messages
|
list[dict] | None
|
New conversation messages (optional) |
None
|
state
|
dict[str, Any] | None
|
Updated session state (optional) |
None
|
error
|
str | None
|
Error message if failed (optional) |
None
|
Source code in flow_sdk/platform_client.py
push_execution_event(deployment_id, execution_id, event_type, event_data)
async
¶
Push streaming event during execution via NATS.
Publishes to exec.events.{execution_id}. The platform subscribes to this subject and deserializes with NATSExecutionEvent.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deployment_id
|
str | UUID | None
|
Deployment ID |
required |
execution_id
|
str | UUID
|
Execution ID |
required |
event_type
|
str
|
Event type (message, chunk, tool_call, done, error) |
required |
event_data
|
dict[str, Any]
|
Event payload |
required |
Source code in flow_sdk/platform_client.py
publish_execution_status(execution_id, status)
async
¶
Publish execution status via NATS.