Skip to content

CLIClient

Coming Soon

The Python SDK for local development is not yet publicly available.

flow_sdk.cli_client.CLIClient(config)

Synchronous HTTP client backed by :class:FlowConfig credentials.

Source code in flow_sdk/cli_client.py
def __init__(self, config: FlowConfig) -> None:
    self._config = config
    raw_client = httpx.Client(
        base_url=config.platform_url,
        headers={"Authorization": f"Bearer {config.access_token}"},
        timeout=30.0,
    )
    self._client = _SafeSyncClient(raw_client)

    # Component type namespaces
    self.agents = ComponentTypeNamespace(self, "agent")
    self.models = ComponentTypeNamespace(self, "model")
    self.tools = ToolsNamespace(self)
    self.prompts = ComponentTypeNamespace(self, "prompt")
    self.code_blocks = CodeBlocksNamespace(self)
    self.workflows = WorkflowsNamespace(self)
    self.transforms = ComponentTypeNamespace(self, "transform")
    self.validations = ComponentTypeNamespace(self, "validation")
    self.schemas = ComponentTypeNamespace(self, "schema")
    self.states = ComponentTypeNamespace(self, "state")
    self.observability = ComponentTypeNamespace(self, "observability")

    # Resource namespaces
    self.solutions = SolutionsNamespace(self)
    self.connectors = CLIConnectorsNamespace(self)
    self.connector_instances = CLIConnectorInstancesNamespace(self)
    self.datasets = CLIDatasetsNamespace(self)

    # Canonical deployment + hosted-service namespaces
    self.component_deployments = CLIComponentDeploymentsNamespace(self)
    self.workflow_deployments = CLIWorkflowDeploymentsNamespace(self)
    self.rings = CLIRingsNamespace(self)
    self.component_backups = CLIComponentBackupsNamespace(self)
    self.hosted_services = CLIHostedServicesNamespace(self)

    self.policies = CLIPoliciesNamespace(self)
    self.webhooks = CLIWebhooksNamespace(self)
    self.mcp_servers = CLIMcpServersNamespace(self)
    self.experiments = CLIExperimentsNamespace(self)

exchange_api_key(platform_url, api_key, ttl_minutes=60) staticmethod

Exchange an org API key (fl_...) for a JWT access token.

Source code in flow_sdk/cli_client.py
@staticmethod
def exchange_api_key(platform_url: str, api_key: str, ttl_minutes: int = 60) -> str:
    """Exchange an org API key (fl_...) for a JWT access token."""
    try:
        resp = httpx.post(
            f"{platform_url}/api/v1/auth/token",
            json={"api_key": api_key, "ttl_minutes": ttl_minutes},
            timeout=15.0,
        )
    except httpx.HTTPError as exc:
        raise CLIAuthenticationError(f"API key exchange failed: {exc}") from exc
    if resp.status_code == 200:
        return resp.json()["access_token"]
    try:
        detail = resp.json().get("detail", resp.text)
    except Exception:
        detail = resp.text
    raise CLIAuthenticationError(f"API key exchange failed ({resp.status_code}): {detail}")

create_cli_login_request(platform_url) staticmethod

Create a request-bound CLI login request (unauthenticated).

Source code in flow_sdk/cli_client.py
@staticmethod
def create_cli_login_request(platform_url: str) -> dict:
    """Create a request-bound CLI login request (unauthenticated)."""
    try:
        resp = httpx.post(
            f"{platform_url}/api/v1/auth/cli-login-requests",
            timeout=15.0,
        )
    except httpx.HTTPError as exc:
        raise CLIClientError(f"CLI login request failed: {exc}") from exc
    if resp.is_success:
        return resp.json()
    try:
        detail = resp.json().get("detail", resp.text)
    except Exception:
        detail = resp.text
    raise CLIClientError(
        f"CLI login request failed ({resp.status_code}): {detail}"
    )

get_current_user_with_token(platform_url, access_token) staticmethod

Validate a bearer token and return /auth/me payload.

Source code in flow_sdk/cli_client.py
@staticmethod
def get_current_user_with_token(platform_url: str, access_token: str) -> dict:
    """Validate a bearer token and return /auth/me payload."""
    try:
        resp = httpx.get(
            f"{platform_url}/api/v1/auth/me",
            headers={"Authorization": f"Bearer {access_token}"},
            timeout=15.0,
        )
    except httpx.HTTPError as exc:
        raise CLIClientError(f"Token validation failed: {exc}") from exc
    if resp.is_success:
        return resp.json()
    try:
        detail = resp.json().get("detail", resp.text)
    except Exception:
        detail = resp.text
    if resp.status_code == 401:
        raise CLIAuthenticationError(f"Token validation failed: {detail}")
    raise CLIClientError(f"Token validation failed ({resp.status_code}): {detail}")

create_component(component_type, data)

POST a new component to the catalog.

Source code in flow_sdk/cli_client.py
def create_component(self, component_type: str, data: dict) -> dict:
    """POST a new component to the catalog."""
    resp = self._client.post(self._tenant_url(f"components/{component_type}"), json=data)
    return self._handle_response(resp)

list_orgs()

GET accessible organizations for the current user.

Source code in flow_sdk/cli_client.py
def list_orgs(self) -> list[dict]:
    """GET accessible organizations for the current user."""
    resp = self._client.get("/api/v1/tenants/orgs")
    return self._handle_response(resp)

list_workspaces(org_id)

GET workspaces within an organization.

Source code in flow_sdk/cli_client.py
def list_workspaces(self, org_id: str) -> list[dict]:
    """GET workspaces within an organization."""
    resp = self._client.get(f"/api/v1/tenants/orgs/{org_id}/workspaces")
    return self._handle_response(resp)

verify_credentials()

Light call to verify that configured credentials work.

Source code in flow_sdk/cli_client.py
def verify_credentials(self) -> bool:
    """Light call to verify that configured credentials work."""
    self.list_orgs()
    return True

get_current_user()

GET current user identity from /api/v1/auth/me.

Source code in flow_sdk/cli_client.py
def get_current_user(self) -> dict:
    """GET current user identity from /api/v1/auth/me."""
    resp = self._client.get("/api/v1/auth/me")
    return self._handle_response(resp)

execute_ddl(instance_id, sql, *, confirm=False, scope_type='workspace', scope_id=None, reason=None)

Preview or execute DDL against a connector instance.

Source code in flow_sdk/cli_client.py
def execute_ddl(
    self,
    instance_id: str,
    sql: str,
    *,
    confirm: bool = False,
    scope_type: str = "workspace",
    scope_id: str | None = None,
    reason: str | None = None,
) -> dict:
    """Preview or execute DDL against a connector instance."""
    body: dict[str, Any] = {
        "scope_type": scope_type,
        "scope_id": scope_id or self._config.workspace_id,
        "sql": sql,
        "confirm": confirm,
    }
    if reason:
        body["reason"] = reason
    resp = self._client.post(
        f"/api/v1/connectors/instances/{instance_id}/execute-ddl",
        json=body,
    )
    return self._handle_response(resp)

invoke_service(service_id, method, path, *, body=None, query=None, version=None)

Invoke an HSL service endpoint.

Source code in flow_sdk/cli_client.py
def invoke_service(
    self,
    service_id: str,
    method: str,
    path: str,
    *,
    body: dict | None = None,
    query: dict | None = None,
    version: int | None = None,
) -> dict | list:
    """Invoke an HSL service endpoint."""
    org = self._config.org_id
    ep = path.lstrip("/")
    if version is not None:
        url = f"/api/v1/orgs/{org}/services/{service_id}/v{version}/{ep}"
    else:
        url = f"/api/v1/orgs/{org}/services/{service_id}/{ep}"
    resp = self._client.request(method.upper(), url, json=body, params=query)
    return self._handle_response(resp)