Skip to content

PlatformClient

flow_sdk.platform_client.PlatformClient(base_url=None, auth_token=None, org_id=None, workspace_id=None, timeout=None, on_auth_token_refresh=None)

Client for calling platform APIs from workflow containers.

This client provides methods for executing components (codeblocks, agents, transforms) via platform APIs. All requests are authenticated with JWT tokens and all component executions run in isolated containers for security.

Attributes:

Name Type Description
base_url

Platform API base URL (e.g., "https://api.flow.com")

auth_token

JWT authentication token

client

Async HTTP client instance

Initialize platform client.

Parameters:

Name Type Description Default
base_url str | None

Platform API base URL (defaults to PLATFORM_API_URL env var)

None
auth_token str | None

JWT token (defaults to AUTH_TOKEN env var)

None
org_id str | None

Organization ID (defaults to ORG_ID env var)

None
workspace_id str | None

Workspace ID (defaults to WORKSPACE_ID env var)

None
timeout float | None

Request timeout in seconds (default: env override or 300)

None

Raises:

Type Description
ValueError

If base_url or auth_token not provided and not in environment

Source code in flow_sdk/platform_client.py
def __init__(
    self,
    base_url: str | None = None,
    auth_token: str | None = None,
    org_id: str | None = None,
    workspace_id: str | None = None,
    timeout: float | None = None,
    on_auth_token_refresh: Callable[[str], Awaitable[None]] | None = None,
):
    """Initialize platform client.

    Args:
        base_url: Platform API base URL (defaults to PLATFORM_API_URL env var)
        auth_token: JWT token (defaults to AUTH_TOKEN env var)
        org_id: Organization ID (defaults to ORG_ID env var)
        workspace_id: Workspace ID (defaults to WORKSPACE_ID env var)
        timeout: Request timeout in seconds (default: env override or 300)

    Raises:
        ValueError: If base_url or auth_token not provided and not in environment
    """
    self.base_url = base_url or os.environ.get("PLATFORM_API_URL")
    if not self.base_url:
        raise ValueError(
            "Platform API URL not provided. "
            "Pass base_url parameter or set PLATFORM_API_URL environment variable."
        )

    self.auth_token = auth_token or os.environ.get("AUTH_TOKEN")
    if not self.auth_token:
        raise ValueError(
            "Authentication token not provided. "
            "Pass auth_token parameter or set AUTH_TOKEN environment variable."
        )

    # Get org/workspace context for pull-based execution endpoints.
    # Explicit parameters take precedence over environment variables.
    resolved_org = org_id if org_id is not None else os.environ.get("ORG_ID")
    resolved_workspace = (
        workspace_id if workspace_id is not None else os.environ.get("WORKSPACE_ID")
    )
    self.org_id = str(resolved_org).strip() if resolved_org else None
    self.workspace_id = str(resolved_workspace).strip() if resolved_workspace else None
    self._on_auth_token_refresh = on_auth_token_refresh

    # Build headers with auth and optional billing context
    headers = {"Authorization": f"Bearer {self.auth_token}"}

    # Forward parent billing transaction ID for hierarchical cost roll-up
    parent_billing_txn_id = os.environ.get("PARENT_BILLING_TXN_ID")
    if parent_billing_txn_id:
        headers["X-Parent-Billing-Txn-Id"] = parent_billing_txn_id

    # Forward lifecycle event for billing discount propagation
    lifecycle_event = os.environ.get("LIFECYCLE_EVENT")
    if lifecycle_event:
        headers["X-Lifecycle-Event"] = lifecycle_event

    self.client = httpx.AsyncClient(
        base_url=self.base_url,
        headers=headers,
        timeout=self._resolve_timeout(timeout),
    )

    # NATS transport for execution I/O
    self._nats_url = os.environ.get("NATS_URL")
    self._execution_id = os.environ.get("EXECUTION_ID")
    self._nc = None  # Lazy NATS connection
    self._js = None  # Lazy JetStream context
    self._event_sequence = 0  # Monotonic counter for streaming events

    logfire.info(
        "PlatformClient initialized",
        base_url=self.base_url,
        org_id=self.org_id,
        workspace_id=self.workspace_id,
        nats_url=self._nats_url,
        timeout=self.client.timeout,
    )

__aenter__() async

Async context manager entry.

Source code in flow_sdk/platform_client.py
async def __aenter__(self):
    """Async context manager entry."""
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit - closes HTTP and NATS clients.

Source code in flow_sdk/platform_client.py
async def __aexit__(self, exc_type, exc_val, exc_tb):
    """Async context manager exit - closes HTTP and NATS clients."""
    await self.close()

close() async

Close the HTTP client and drain NATS connection.

Source code in flow_sdk/platform_client.py
async def close(self):
    """Close the HTTP client and drain NATS connection."""
    await self.client.aclose()
    if self._nc and not self._nc.is_closed:
        await self._nc.drain()

get_service_skills(service_id, *, version=None) async

Fetch generated HSL skills artifact for a service/version.

Source code in flow_sdk/platform_client.py
async def get_service_skills(
    self,
    service_id: UUID | str,
    *,
    version: int | None = None,
) -> dict[str, Any]:
    """Fetch generated HSL skills artifact for a service/version."""
    if not self.org_id:
        raise PlatformClientError("ORG_ID is required to fetch service skills")

    if version is None:
        path = f"/api/v1/orgs/{self.org_id}/services/{service_id}/skills.json"
    else:
        path = f"/api/v1/orgs/{self.org_id}/services/{service_id}/v{version}/skills.json"

    payload = await self._request("GET", path)
    if not isinstance(payload, dict):
        raise PlatformClientError("Unexpected non-JSON skills response")
    return payload

execute_code_block(code_block_id, input_parameters, context, scope='workspace', lifecycle_event='workflow', timeout_seconds=300, caller_deployment_id=None, ring_id=None) async

Execute code block synchronously via durable execution APIs.

Source code in flow_sdk/platform_client.py
async def execute_code_block(
    self,
    code_block_id: UUID,
    input_parameters: dict[str, Any],
    context: dict[str, Any],
    scope: str = "workspace",
    lifecycle_event: str = "workflow",
    timeout_seconds: int = 300,
    caller_deployment_id: str | UUID | None = None,
    ring_id: str | UUID | None = None,
) -> dict[str, Any]:
    """Execute code block synchronously via durable execution APIs."""
    logfire.info(
        "Executing code block via platform API",
        code_block_id=str(code_block_id),
        scope=scope,
        lifecycle_event=lifecycle_event,
    )
    handle = await self.submit_code_block_execution(
        code_block_id=code_block_id,
        input_parameters=input_parameters,
        config=context or {},
        lifecycle_event=lifecycle_event,
        ring_id=ring_id,
        timeout_seconds=timeout_seconds,
    )
    try:
        return await self.await_execution_result(
            UUID(str(handle["execution_id"])),
            timeout_seconds=timeout_seconds,
        )
    except TimeoutError as exc:
        execution_id = UUID(str(handle["execution_id"]))
        try:
            await self.cancel_execution(execution_id)
        except Exception:
            pass
        raise PlatformExecutionError(
            f"Execution {execution_id} did not complete before the caller wait budget expired"
        ) from exc

submit_code_block_execution(code_block_id, input_parameters, config=None, lifecycle_event='workflow', ring_id=None, timeout_seconds=300) async

Submit a durable codeblock execution and return its handle.

Source code in flow_sdk/platform_client.py
async def submit_code_block_execution(
    self,
    code_block_id: UUID,
    input_parameters: dict[str, Any],
    config: dict[str, Any] | None = None,
    lifecycle_event: str = "workflow",
    ring_id: str | UUID | None = None,
    timeout_seconds: int = 300,
) -> dict[str, Any]:
    """Submit a durable codeblock execution and return its handle."""
    if not ring_id:
        raise PlatformClientError("ring_id is required for code block execution")
    response = await self._send_request(
        "POST",
        f"/api/v1/orgs/{self.org_id}/workspaces/{self.workspace_id}/code-blocks/{code_block_id}/executions",
        json={
            "input_parameters": input_parameters,
            "config": config or {},
            "lifecycle_event": lifecycle_event,
            "ring_id": str(ring_id),
            "timeout_seconds": timeout_seconds,
        },
        max_attempts=3,
        initial_delay_seconds=0.25,
    )

    if response.status_code != 202:
        self._handle_error_response(response)
    return response.json()

execute_agent(agent_id, input_data, context, execution_mode='inline', caller_deployment_id=None, ring_id=None, timeout_seconds=300) async

Execute agent via platform API.

Parameters:

Name Type Description Default
agent_id UUID

UUID of agent to execute

required
input_data dict[str, Any]

Input data for agent

required
context dict[str, Any]

Execution context

required
execution_mode str

Execution mode (inline, isolated)

'inline'
caller_deployment_id str | UUID | None

Deployment ID of calling component (for ring inheritance)

None

Returns:

Type Description
dict[str, Any]

Agent execution result with output, usage, and messages

Raises:

Type Description
PlatformClientError

If execution fails

Source code in flow_sdk/platform_client.py
async def execute_agent(
    self,
    agent_id: UUID,
    input_data: dict[str, Any],
    context: dict[str, Any],
    execution_mode: str = "inline",
    caller_deployment_id: str | UUID | None = None,
    ring_id: str | UUID | None = None,
    timeout_seconds: int = 300,
) -> dict[str, Any]:
    """Execute agent via platform API.

    Args:
        agent_id: UUID of agent to execute
        input_data: Input data for agent
        context: Execution context
        execution_mode: Execution mode (inline, isolated)
        caller_deployment_id: Deployment ID of calling component (for ring inheritance)

    Returns:
        Agent execution result with output, usage, and messages

    Raises:
        PlatformClientError: If execution fails
    """
    logfire.info(
        "Executing agent via platform API",
        agent_id=str(agent_id),
        execution_mode=execution_mode,
    )

    if execution_mode == "isolated":
        handle = await self.submit_agent_execution(
            agent_id=agent_id,
            input_data=input_data,
            config=context or {},
            ring_id=ring_id,
            timeout_seconds=timeout_seconds,
        )
        try:
            return await self.await_execution_result(
                UUID(str(handle["execution_id"])),
                timeout_seconds=timeout_seconds,
            )
        except TimeoutError as exc:
            execution_id = UUID(str(handle["execution_id"]))
            try:
                await self.cancel_execution(execution_id)
            except Exception:
                pass
            raise PlatformExecutionError(
                f"Execution {execution_id} did not complete before the caller wait budget expired"
            ) from exc

    response = await self._send_request(
        "POST",
        f"/api/v1/component-execution/agents/{agent_id}/execute",
        json={
            "input_data": input_data,
            "execution_mode": execution_mode,
            "caller_deployment_id": str(caller_deployment_id) if caller_deployment_id else None,
        },
        max_attempts=3,
        initial_delay_seconds=0.25,
    )

    if response.status_code != 200:
        self._handle_error_response(response)

    return response.json()

execute_transform(transform_id, input_data, context, caller_deployment_id=None) async

Execute transform via platform API.

Parameters:

Name Type Description Default
transform_id UUID

UUID of transform to execute

required
input_data Any

Input data for transform

required
context dict[str, Any]

Execution context

required
caller_deployment_id str | UUID | None

Deployment ID of calling component (for ring inheritance)

None

Returns:

Type Description
Any

Transform execution result with transformed output

Raises:

Type Description
PlatformClientError

If execution fails

Source code in flow_sdk/platform_client.py
async def execute_transform(
    self,
    transform_id: UUID,
    input_data: Any,
    context: dict[str, Any],
    caller_deployment_id: str | UUID | None = None,
) -> Any:
    """Execute transform via platform API.

    Args:
        transform_id: UUID of transform to execute
        input_data: Input data for transform
        context: Execution context
        caller_deployment_id: Deployment ID of calling component (for ring inheritance)

    Returns:
        Transform execution result with transformed output

    Raises:
        PlatformClientError: If execution fails
    """
    logfire.info(
        "Executing transform via platform API",
        transform_id=str(transform_id),
    )

    response = await self._send_request(
        "POST",
        f"/api/v1/component-execution/transforms/{transform_id}/execute",
        json={
            "input_data": input_data,
            "caller_deployment_id": str(caller_deployment_id) if caller_deployment_id else None,
        },
        max_attempts=3,
        initial_delay_seconds=0.25,
    )

    if response.status_code != 200:
        self._handle_error_response(response)

    return response.json()

execute_workflow(workflow_id, input_data, context=None, config=None, auto_deploy=True, lifecycle_event='workflow', caller_deployment_id=None, ring_id=None, timeout_seconds=None) async

Execute workflow via platform API.

Parameters:

Name Type Description Default
workflow_id UUID

UUID of workflow to execute

required
input_data dict[str, Any]

Input data for workflow

required
context dict[str, Any] | None

Optional execution context for traceability

None
config dict[str, Any] | None

Optional configuration overrides

None
auto_deploy bool

Whether to auto-deploy workflow if not deployed

True
lifecycle_event str

Lifecycle event (test, workflow, agent_run, mcp_session)

'workflow'
caller_deployment_id str | UUID | None

Deployment ID of calling component (for ring inheritance)

None
timeout_seconds int | None

Optional workflow execution timeout override

None

Returns:

Type Description
dict[str, Any]

Workflow execution result with outputs, metrics, and errors

Raises:

Type Description
PlatformClientError

If execution fails

Source code in flow_sdk/platform_client.py
async def execute_workflow(
    self,
    workflow_id: UUID,
    input_data: dict[str, Any],
    context: dict[str, Any] | None = None,
    config: dict[str, Any] | None = None,
    auto_deploy: bool = True,
    lifecycle_event: str = "workflow",
    caller_deployment_id: str | UUID | None = None,
    ring_id: str | UUID | None = None,
    timeout_seconds: int | None = None,
) -> dict[str, Any]:
    """Execute workflow via platform API.

    Args:
        workflow_id: UUID of workflow to execute
        input_data: Input data for workflow
        context: Optional execution context for traceability
        config: Optional configuration overrides
        auto_deploy: Whether to auto-deploy workflow if not deployed
        lifecycle_event: Lifecycle event (test, workflow, agent_run, mcp_session)
        caller_deployment_id: Deployment ID of calling component (for ring inheritance)
        timeout_seconds: Optional workflow execution timeout override

    Returns:
        Workflow execution result with outputs, metrics, and errors

    Raises:
        PlatformClientError: If execution fails
    """
    logfire.info(
        "Executing workflow via platform API",
        workflow_id=str(workflow_id),
        lifecycle_event=lifecycle_event,
    )

    handle = await self.submit_workflow_execution(
        workflow_id=workflow_id,
        input_data=input_data,
        config=config or context or {},
        lifecycle_event=lifecycle_event,
        ring_id=ring_id,
        timeout_seconds=timeout_seconds or 300,
    )
    try:
        return await self.await_execution_result(
            UUID(str(handle["execution_id"])),
            timeout_seconds=timeout_seconds or 300,
        )
    except TimeoutError as exc:
        execution_id = UUID(str(handle["execution_id"]))
        try:
            await self.cancel_execution(execution_id)
        except Exception:
            pass
        raise PlatformExecutionError(
            f"Execution {execution_id} did not complete before the caller wait budget expired"
        ) from exc

submit_workflow_execution(workflow_id, input_data, config=None, lifecycle_event='workflow', ring_id=None, timeout_seconds=300) async

Submit a durable workflow execution and return its handle.

Source code in flow_sdk/platform_client.py
async def submit_workflow_execution(
    self,
    workflow_id: UUID,
    input_data: dict[str, Any],
    config: dict[str, Any] | None = None,
    lifecycle_event: str = "workflow",
    ring_id: str | UUID | None = None,
    timeout_seconds: int = 300,
) -> dict[str, Any]:
    """Submit a durable workflow execution and return its handle."""
    if not ring_id:
        raise PlatformClientError("ring_id is required for workflow execution")
    response = await self._send_request(
        "POST",
        f"/api/v1/orgs/{self.org_id}/workspaces/{self.workspace_id}/workflows/{workflow_id}/executions",
        json={
            "input_data": input_data,
            "config": config or {},
            "lifecycle_event": lifecycle_event,
            "ring_id": str(ring_id),
            "timeout_seconds": timeout_seconds,
        },
        max_attempts=3,
        initial_delay_seconds=0.25,
    )

    if response.status_code != 202:
        self._handle_error_response(response)
    return response.json()

submit_agent_execution(agent_id, input_data, config=None, lifecycle_event='agent_run', ring_id=None, timeout_seconds=300) async

Submit a durable isolated-agent execution and return its handle.

Source code in flow_sdk/platform_client.py
async def submit_agent_execution(
    self,
    agent_id: UUID,
    input_data: dict[str, Any],
    config: dict[str, Any] | None = None,
    lifecycle_event: str = "agent_run",
    ring_id: str | UUID | None = None,
    timeout_seconds: int = 300,
) -> dict[str, Any]:
    """Submit a durable isolated-agent execution and return its handle."""
    if not ring_id:
        raise PlatformClientError("ring_id is required for agent execution")
    response = await self._send_request(
        "POST",
        f"/api/v1/orgs/{self.org_id}/workspaces/{self.workspace_id}/agents/{agent_id}/executions",
        json={
            "input_data": input_data,
            "config": config or {},
            "lifecycle_event": lifecycle_event,
            "ring_id": str(ring_id),
            "timeout_seconds": timeout_seconds,
        },
        max_attempts=3,
        initial_delay_seconds=0.25,
    )

    if response.status_code != 202:
        self._handle_error_response(response)
    return response.json()

get_execution_status(execution_id) async

Fetch durable execution status.

Source code in flow_sdk/platform_client.py
async def get_execution_status(
    self,
    execution_id: UUID | str,
) -> dict[str, Any]:
    """Fetch durable execution status."""
    response = await self._send_request(
        "GET",
        f"/api/v1/orgs/{self.org_id}/workspaces/{self.workspace_id}/executions/{execution_id}",
        max_attempts=3,
        initial_delay_seconds=0.2,
    )
    if response.status_code != 200:
        self._handle_error_response(response)
    return response.json()

cancel_execution(execution_id) async

Request best-effort cancellation of an execution.

Source code in flow_sdk/platform_client.py
async def cancel_execution(
    self,
    execution_id: UUID | str,
) -> dict[str, Any]:
    """Request best-effort cancellation of an execution."""
    response = await self._send_request(
        "POST",
        f"/api/v1/orgs/{self.org_id}/workspaces/{self.workspace_id}/executions/{execution_id}/cancel",
        max_attempts=3,
        initial_delay_seconds=0.2,
    )
    if response.status_code != 200:
        self._handle_error_response(response)
    return response.json()

await_execution_result(execution_id, *, timeout_seconds=300, poll_interval_seconds=0.5, not_found_retry_seconds=30.0) async

Poll durable execution state until terminal.

Source code in flow_sdk/platform_client.py
async def await_execution_result(
    self,
    execution_id: UUID | str,
    *,
    timeout_seconds: int = 300,
    poll_interval_seconds: float = 0.5,
    not_found_retry_seconds: float = 30.0,
) -> dict[str, Any]:
    """Poll durable execution state until terminal."""
    now_ts = datetime.now(UTC).timestamp()
    deadline = now_ts + timeout_seconds
    not_found_deadline = now_ts + min(timeout_seconds, max(0.0, not_found_retry_seconds))
    while True:
        try:
            result = await self.get_execution_status(execution_id)
        except PlatformNotFoundError:
            now_ts = datetime.now(UTC).timestamp()
            if now_ts >= not_found_deadline:
                raise
            await asyncio.sleep(poll_interval_seconds)
            continue

        status = str(result.get("status") or "").lower()
        if status in {
            "completed",
            "failed",
            "timeout",
            "dead_letter",
            "cancelled",
        }:
            return result
        if datetime.now(UTC).timestamp() >= deadline:
            raise TimeoutError(
                f"Execution {execution_id} timed out after {timeout_seconds}s"
            )
        await asyncio.sleep(poll_interval_seconds)

fetch_execution_input(deployment_id, execution_id) async

Fetch input data for pull-based execution via NATS.

Subscribes to exec.input.{execution_id} (WorkQueue stream, DeliverPolicy.ALL) since the platform publishes input before the container starts.

Parameters:

Name Type Description Default
deployment_id str | UUID | None

Deployment ID

required
execution_id str | UUID

Execution ID

required

Returns:

Type Description
dict[str, Any]

Dict with input_data, session_id, messages, state

Source code in flow_sdk/platform_client.py
async def fetch_execution_input(
    self,
    deployment_id: str | UUID | None,
    execution_id: str | UUID,
) -> dict[str, Any]:
    """Fetch input data for pull-based execution via NATS.

    Subscribes to exec.input.{execution_id} (WorkQueue stream, DeliverPolicy.ALL)
    since the platform publishes input before the container starts.

    Args:
        deployment_id: Deployment ID
        execution_id: Execution ID

    Returns:
        Dict with input_data, session_id, messages, state
    """
    if not self._nats_url:
        raise PlatformClientError(
            "NATS_URL not configured. "
            "Set NATS_URL environment variable for execution I/O."
        )
    await self._ensure_nats()

    logfire.info(
        "Fetching execution input via NATS",
        deployment_id=str(deployment_id),
        execution_id=str(execution_id),
    )

    subject = f"exec.input.{execution_id}"
    sub = await self._js.subscribe(
        subject,
        # EXEC_INPUT is a WorkQueue stream — must use DeliverPolicy.ALL (default)
    )
    try:
        msg = await sub.next_msg(timeout=60)
        await msg.ack()
        data = json.loads(msg.data.decode())
        await self.publish_execution_status(
            execution_id,
            "running",
        )
        return {
            "input_data": data.get("input_data", {}),
            "session_id": data.get("session_id"),
            "messages": data.get("messages", []),
            "state": data.get("state", {}),
        }
    finally:
        await sub.unsubscribe()

push_execution_output(deployment_id, execution_id, output, usage=None, messages=None, state=None, error=None) async

Push execution output back to platform via NATS.

Publishes to exec.output.{execution_id}. The platform subscribes to this subject and deserializes with NATSExecutionOutput.

Parameters:

Name Type Description Default
deployment_id str | UUID | None

Deployment ID

required
execution_id str | UUID

Execution ID

required
output Any

Execution output data

required
usage dict[str, Any] | None

Token usage stats (optional)

None
messages list[dict] | None

New conversation messages (optional)

None
state dict[str, Any] | None

Updated session state (optional)

None
error str | None

Error message if failed (optional)

None
Source code in flow_sdk/platform_client.py
async def push_execution_output(
    self,
    deployment_id: str | UUID | None,
    execution_id: str | UUID,
    output: Any,
    usage: dict[str, Any] | None = None,
    messages: list[dict] | None = None,
    state: dict[str, Any] | None = None,
    error: str | None = None,
) -> None:
    """Push execution output back to platform via NATS.

    Publishes to exec.output.{execution_id}. The platform subscribes
    to this subject and deserializes with NATSExecutionOutput.

    Args:
        deployment_id: Deployment ID
        execution_id: Execution ID
        output: Execution output data
        usage: Token usage stats (optional)
        messages: New conversation messages (optional)
        state: Updated session state (optional)
        error: Error message if failed (optional)
    """
    if not self._nats_url:
        raise PlatformClientError(
            "NATS_URL not configured. "
            "Set NATS_URL environment variable for execution I/O."
        )
    await self._ensure_nats()

    logfire.info(
        "Pushing execution output via NATS",
        deployment_id=str(deployment_id),
        execution_id=str(execution_id),
        has_error=error is not None,
    )

    subject = f"exec.output.{execution_id}"
    payload = {
        "execution_id": str(execution_id),
        "deployment_id": str(deployment_id) if deployment_id is not None else None,
        "output": output,
        "usage": usage,
        "messages": messages,
        "state": state,
        "error": error,
        "timestamp": datetime.now(UTC).isoformat(),
    }
    await self._js.publish(subject, json.dumps(payload, default=str).encode())

push_execution_event(deployment_id, execution_id, event_type, event_data) async

Push streaming event during execution via NATS.

Publishes to exec.events.{execution_id}. The platform subscribes to this subject and deserializes with NATSExecutionEvent.

Parameters:

Name Type Description Default
deployment_id str | UUID | None

Deployment ID

required
execution_id str | UUID

Execution ID

required
event_type str

Event type (message, chunk, tool_call, done, error)

required
event_data dict[str, Any]

Event payload

required
Source code in flow_sdk/platform_client.py
async def push_execution_event(
    self,
    deployment_id: str | UUID | None,
    execution_id: str | UUID,
    event_type: str,
    event_data: dict[str, Any],
) -> None:
    """Push streaming event during execution via NATS.

    Publishes to exec.events.{execution_id}. The platform subscribes
    to this subject and deserializes with NATSExecutionEvent.

    Args:
        deployment_id: Deployment ID
        execution_id: Execution ID
        event_type: Event type (message, chunk, tool_call, done, error)
        event_data: Event payload
    """
    if not self._nats_url:
        raise PlatformClientError(
            "NATS_URL not configured. "
            "Set NATS_URL environment variable for execution I/O."
        )
    await self._ensure_nats()

    self._event_sequence += 1
    subject = f"exec.events.{execution_id}"
    payload = {
        "execution_id": str(execution_id),
        "sequence": self._event_sequence,
        "event_type": event_type,
        "event_data": event_data,
        "timestamp": datetime.now(UTC).isoformat(),
    }
    await self._js.publish(subject, json.dumps(payload, default=str).encode())

publish_execution_status(execution_id, status) async

Publish execution status via NATS.

Source code in flow_sdk/platform_client.py
async def publish_execution_status(
    self,
    execution_id: str | UUID,
    status: str,
) -> None:
    """Publish execution status via NATS."""
    if not self._nats_url:
        raise PlatformClientError(
            "NATS_URL not configured. "
            "Set NATS_URL environment variable for execution I/O."
        )
    await self._ensure_nats()

    subject = f"exec.status.{execution_id}"
    payload = {
        "execution_id": str(execution_id),
        "status": status,
        "timestamp": datetime.now(UTC).isoformat(),
    }
    await self._js.publish(subject, json.dumps(payload, default=str).encode())