Skip to content

FlowSDK

flow_sdk.flow_sdk.FlowSDK(*, org_id=None, workspace_id=None, auth_token=None, platform_url=None, gateway_url=None, service_id=None, execution_id=None, job_id=None, request_id=None, ring_id=None)

Unified SDK facade for HSL customer code blocks.

Initializes from environment variables when called with no arguments (the normal case inside platform execution contexts).

Source code in flow_sdk/flow_sdk.py
def __init__(
    self,
    *,
    org_id: str | UUID | None = None,
    workspace_id: str | UUID | None = None,
    auth_token: str | None = None,
    platform_url: str | None = None,
    gateway_url: str | None = None,
    service_id: str | UUID | None = None,
    execution_id: str | UUID | None = None,
    job_id: str | UUID | None = None,
    request_id: str | None = None,
    ring_id: str | UUID | None = None,
) -> None:
    self._request_context_var: ContextVar[dict[str, Any] | None] = ContextVar(
        "flow_sdk_request_context",
        default=None,
    )
    self._org_id = str(org_id or os.environ.get("ORG_ID", ""))
    self._workspace_id = str(workspace_id or os.environ.get("WORKSPACE_ID", ""))
    self._auth_token = auth_token or os.environ.get("AUTH_TOKEN", "")
    self._platform_url = platform_url or os.environ.get("PLATFORM_API_URL", "")
    self._gateway_url = gateway_url or os.environ.get("GATEWAY_URL", self._platform_url)
    self._service_id = str(service_id or os.environ.get("SERVICE_ID", ""))
    self._execution_id = str(execution_id or os.environ.get("EXECUTION_ID", ""))
    self._job_id = str(job_id or os.environ.get("JOB_ID", ""))
    self._request_id = request_id or os.environ.get("REQUEST_ID", "")
    self._ring_id = str(ring_id or os.environ.get("RING_ID", ""))

    self._platform_client: Any = None

    # Namespaces
    self.gateway = GatewayNamespace(self)
    self.code_blocks = CodeBlocksNamespace(self)
    self.agents = AgentsNamespace(self)
    self.workflows = WorkflowsNamespace(self)
    self.datasets = DatasetsNamespace(self)
    self.connectors = ConnectorsNamespace(self)
    self.secrets = SecretsNamespace(self)
    self.state = StateNamespace(self)
    self.conversations = ConversationsNamespace(self)
    self.storage = StorageNamespace(self)
    self.audit = AuditNamespace(self)
    self.auth = AuthNamespace(self)
    self.vector = VectorNamespace(self)
    self.memory = MemoryNamespace(self)
    self.service = ServiceNamespace(self)
    self.services = ServicesNamespace(self)
    self.jobs = JobsNamespace(self)
    self.service_tokens = ServiceTokensNamespace(self)

log(action, resource_type, resource_id, details=None) async

Backward-compatible alias for flow.audit.log(...).

Source code in flow_sdk/flow_sdk.py
async def log(
    self,
    action: str,
    resource_type: str,
    resource_id: str,
    details: dict[str, Any] | None = None,
) -> None:
    """Backward-compatible alias for ``flow.audit.log(...)``."""
    await self.audit.log(
        action=action,
        resource_type=resource_type,
        resource_id=resource_id,
        details=details,
    )

bind_request_context(*, auth=None, execution=None, headers=None, service=None)

Bind request-scoped HSL context for the current execution.

Source code in flow_sdk/flow_sdk.py
def bind_request_context(
    self,
    *,
    auth: dict[str, Any] | None = None,
    execution: dict[str, Any] | None = None,
    headers: dict[str, str] | None = None,
    service: dict[str, Any] | None = None,
) -> Token:
    """Bind request-scoped HSL context for the current execution."""
    normalized_headers: dict[str, str] = {}
    for key, value in (headers or {}).items():
        normalized_key = str(key).strip().lower()
        if not normalized_key:
            continue
        normalized_headers[normalized_key] = str(value)

    return self._request_context_var.set(
        {
            "auth": auth or {},
            "execution": execution or {},
            "headers": normalized_headers,
            "service": service or {},
        }
    )

reset_request_context(token)

Reset request-scoped HSL context.

Source code in flow_sdk/flow_sdk.py
def reset_request_context(self, token: Token) -> None:
    """Reset request-scoped HSL context."""
    self._request_context_var.reset(token)

refresh_auth_token(new_token) async

Replace the platform auth token and update cached clients in place.

Called by the background token-refresh loop in persistent runtimes so that the SDK always uses a valid (non-expired) JWT.

Source code in flow_sdk/flow_sdk.py
async def refresh_auth_token(self, new_token: str) -> None:
    """Replace the platform auth token and update cached clients in place.

    Called by the background token-refresh loop in persistent runtimes
    so that the SDK always uses a valid (non-expired) JWT.
    """
    await self._apply_auth_token_refresh(new_token)

close() async

Close cached clients held by this FlowSDK instance.

Source code in flow_sdk/flow_sdk.py
async def close(self) -> None:
    """Close cached clients held by this FlowSDK instance."""
    if self._platform_client is not None:
        await self._platform_client.close()
        self._platform_client = None

    if self.gateway._client is not None:
        await self.gateway._client.close()
        self.gateway._client = None

    state_client = self.state._client
    if state_client is not None:
        close = getattr(state_client, "close", None)
        if callable(close):
            close()
        self._reset_state_client_binding()

    secrets_client = self.secrets._client
    if secrets_client is not None:
        close = getattr(secrets_client, "close", None)
        if callable(close):
            close()
        self.secrets._client = None