Skip to content

GatewayClient

flow_sdk.gateway_client.GatewayClient(base_url=None, auth_token=None, timeout=60.0, on_auth_token_refresh=None)

Client for calling the platform AI gateway from workflow containers.

Provides methods for chat completions (streaming and non-streaming), embeddings, and model/provider discovery. All requests are authenticated with the same JWT token used by PlatformClient.

Attributes:

Name Type Description
base_url

Platform API base URL

auth_token

JWT authentication token

client

Async HTTP client instance

Initialize gateway client.

Parameters:

Name Type Description Default
base_url str | None

Platform API base URL (defaults to PLATFORM_API_URL env var)

None
auth_token str | None

JWT token (defaults to AUTH_TOKEN env var)

None
timeout float

Request timeout in seconds (default: 60)

60.0

Raises:

Type Description
ValueError

If base_url or auth_token not provided and not in environment

Source code in flow_sdk/gateway_client.py
def __init__(
    self,
    base_url: str | None = None,
    auth_token: str | None = None,
    timeout: float = 60.0,
    on_auth_token_refresh: Callable[[str], Awaitable[None]] | None = None,
):
    """Initialize gateway client.

    Args:
        base_url: Platform API base URL (defaults to PLATFORM_API_URL env var)
        auth_token: JWT token (defaults to AUTH_TOKEN env var)
        timeout: Request timeout in seconds (default: 60)

    Raises:
        ValueError: If base_url or auth_token not provided and not in environment
    """
    self.base_url = base_url or os.environ.get("PLATFORM_API_URL")
    if not self.base_url:
        raise ValueError(
            "Platform API URL not provided. "
            "Pass base_url parameter or set PLATFORM_API_URL environment variable."
        )

    self.auth_token = auth_token or os.environ.get("AUTH_TOKEN")
    if not self.auth_token:
        raise ValueError(
            "Authentication token not provided. "
            "Pass auth_token parameter or set AUTH_TOKEN environment variable."
        )

    self.client = httpx.AsyncClient(
        base_url=self.base_url,
        headers={"Authorization": f"Bearer {self.auth_token}"},
        timeout=timeout,
    )
    self._on_auth_token_refresh = on_auth_token_refresh

__aenter__() async

Async context manager entry.

Source code in flow_sdk/gateway_client.py
async def __aenter__(self) -> "GatewayClient":
    """Async context manager entry."""
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit - closes HTTP client.

Source code in flow_sdk/gateway_client.py
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Async context manager exit - closes HTTP client."""
    await self.client.aclose()

close() async

Close the HTTP client.

Source code in flow_sdk/gateway_client.py
async def close(self) -> None:
    """Close the HTTP client."""
    await self.client.aclose()

chat_completion(model, messages, *, temperature=None, max_tokens=None, top_p=None) async

Send a non-streaming chat completion request.

Parameters:

Name Type Description Default
model str

Model identifier (e.g., "openai/gpt-4o")

required
messages list[dict[str, Any]]

List of message dicts with "role" and "content"

required
temperature float | None

Sampling temperature (optional)

None
max_tokens int | None

Maximum tokens to generate (optional)

None
top_p float | None

Nucleus sampling probability (optional)

None

Returns:

Type Description
dict[str, Any]

OpenAI-shaped response dict with id, object, model, choices, usage

Raises:

Type Description
GatewayAuthError

If authentication fails

GatewayModelNotFoundError

If model not found

GatewayRateLimitError

If rate limited

GatewayProviderError

If upstream provider fails

GatewayClientError

For other errors

Source code in flow_sdk/gateway_client.py
async def chat_completion(
    self,
    model: str,
    messages: list[dict[str, Any]],
    *,
    temperature: float | None = None,
    max_tokens: int | None = None,
    top_p: float | None = None,
) -> dict[str, Any]:
    """Send a non-streaming chat completion request.

    Args:
        model: Model identifier (e.g., "openai/gpt-4o")
        messages: List of message dicts with "role" and "content"
        temperature: Sampling temperature (optional)
        max_tokens: Maximum tokens to generate (optional)
        top_p: Nucleus sampling probability (optional)

    Returns:
        OpenAI-shaped response dict with id, object, model, choices, usage

    Raises:
        GatewayAuthError: If authentication fails
        GatewayModelNotFoundError: If model not found
        GatewayRateLimitError: If rate limited
        GatewayProviderError: If upstream provider fails
        GatewayClientError: For other errors
    """
    payload: dict[str, Any] = {
        "model": model,
        "messages": messages,
        "stream": False,
    }
    if temperature is not None:
        payload["temperature"] = temperature
    if max_tokens is not None:
        payload["max_tokens"] = max_tokens
    if top_p is not None:
        payload["top_p"] = top_p

    try:
        response = await self._send_request(
            "POST",
            "/api/v1/gateway/v1/chat/completions",
            json_payload=payload,
        )
    except httpx.HTTPError as e:
        raise GatewayClientError(f"HTTP error: {self._format_http_error(e)}") from e

    if response.status_code != 200:
        self._handle_error_response(response)

    return response.json()  # type: ignore[return-value]

stream_chat_completion(model, messages, *, temperature=None, max_tokens=None, top_p=None) async

Send a streaming chat completion request.

Parses server-sent events (SSE) and yields each chunk as a dict.

Parameters:

Name Type Description Default
model str

Model identifier (e.g., "openai/gpt-4o")

required
messages list[dict[str, Any]]

List of message dicts with "role" and "content"

required
temperature float | None

Sampling temperature (optional)

None
max_tokens int | None

Maximum tokens to generate (optional)

None
top_p float | None

Nucleus sampling probability (optional)

None

Yields:

Type Description
AsyncGenerator[dict[str, Any], None]

OpenAI-shaped streaming chunk dicts with delta content

Raises:

Type Description
GatewayAuthError

If authentication fails

GatewayModelNotFoundError

If model not found

GatewayRateLimitError

If rate limited

GatewayProviderError

If upstream provider fails

GatewayClientError

For other errors

Source code in flow_sdk/gateway_client.py
async def stream_chat_completion(
    self,
    model: str,
    messages: list[dict[str, Any]],
    *,
    temperature: float | None = None,
    max_tokens: int | None = None,
    top_p: float | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
    """Send a streaming chat completion request.

    Parses server-sent events (SSE) and yields each chunk as a dict.

    Args:
        model: Model identifier (e.g., "openai/gpt-4o")
        messages: List of message dicts with "role" and "content"
        temperature: Sampling temperature (optional)
        max_tokens: Maximum tokens to generate (optional)
        top_p: Nucleus sampling probability (optional)

    Yields:
        OpenAI-shaped streaming chunk dicts with delta content

    Raises:
        GatewayAuthError: If authentication fails
        GatewayModelNotFoundError: If model not found
        GatewayRateLimitError: If rate limited
        GatewayProviderError: If upstream provider fails
        GatewayClientError: For other errors
    """
    payload: dict[str, Any] = {
        "model": model,
        "messages": messages,
        "stream": True,
    }
    if temperature is not None:
        payload["temperature"] = temperature
    if max_tokens is not None:
        payload["max_tokens"] = max_tokens
    if top_p is not None:
        payload["top_p"] = top_p

    auth_refresh_attempted = False
    attempt = 0

    while True:
        attempt += 1
        try:
            async with self.client.stream(
                "POST",
                "/api/v1/gateway/v1/chat/completions",
                json=payload,
            ) as response:
                if (
                    response.status_code == 401
                    and not auth_refresh_attempted
                    and await self._refresh_runtime_auth_token()
                ):
                    auth_refresh_attempted = True
                    await response.aread()
                    continue

                if response.status_code != 200:
                    await response.aread()
                    self._handle_error_response(response)

                async for line in response.aiter_lines():
                    if not line:
                        continue
                    if line.startswith("data:"):
                        data = line[5:]
                        if data.startswith(" "):
                            data = data[1:]
                        if data == "[DONE]":
                            return
                        yield json.loads(data)
                return
        except httpx.HTTPError as e:
            if attempt < 2 and self._is_retryable_http_error(e):
                await asyncio.sleep(0.2 * attempt)
                continue
            raise GatewayClientError(f"HTTP error: {self._format_http_error(e)}") from e

embedding(model, input) async

Generate embeddings for the given input.

Parameters:

Name Type Description Default
model str

Embedding model identifier

required
input str | list[str]

Text string or list of strings to embed

required

Returns:

Type Description
dict[str, Any]

OpenAI-shaped response with object, data (list of embedding vectors),

dict[str, Any]

and usage

Raises:

Type Description
GatewayAuthError

If authentication fails

GatewayModelNotFoundError

If model not found

GatewayRateLimitError

If rate limited

GatewayProviderError

If upstream provider fails

GatewayClientError

For other errors

Source code in flow_sdk/gateway_client.py
async def embedding(
    self,
    model: str,
    input: str | list[str],
) -> dict[str, Any]:
    """Generate embeddings for the given input.

    Args:
        model: Embedding model identifier
        input: Text string or list of strings to embed

    Returns:
        OpenAI-shaped response with object, data (list of embedding vectors),
        and usage

    Raises:
        GatewayAuthError: If authentication fails
        GatewayModelNotFoundError: If model not found
        GatewayRateLimitError: If rate limited
        GatewayProviderError: If upstream provider fails
        GatewayClientError: For other errors
    """
    try:
        response = await self._send_request(
            "POST",
            "/api/v1/gateway/v1/embeddings",
            json_payload={"model": model, "input": input},
        )
    except httpx.HTTPError as e:
        raise GatewayClientError(f"HTTP error: {self._format_http_error(e)}") from e

    if response.status_code != 200:
        self._handle_error_response(response)

    return response.json()  # type: ignore[return-value]

list_models() async

List all available models through the gateway.

Returns:

Type Description
list[dict[str, Any]]

List of model dicts from the OpenAI-shaped models response

Raises:

Type Description
GatewayAuthError

If authentication fails

GatewayClientError

For other errors

Source code in flow_sdk/gateway_client.py
async def list_models(self) -> list[dict[str, Any]]:
    """List all available models through the gateway.

    Returns:
        List of model dicts from the OpenAI-shaped models response

    Raises:
        GatewayAuthError: If authentication fails
        GatewayClientError: For other errors
    """
    try:
        response = await self._send_request(
            "GET",
            "/api/v1/gateway/v1/models",
        )
    except httpx.HTTPError as e:
        raise GatewayClientError(f"HTTP error: {self._format_http_error(e)}") from e

    if response.status_code != 200:
        self._handle_error_response(response)

    data: dict[str, Any] = response.json()
    return data.get("data", [])  # type: ignore[return-value]

list_provider_models(provider_name, scope='organization') async

List models available from a specific provider.

Parameters:

Name Type Description Default
provider_name str

Provider identifier (e.g., "openai", "anthropic")

required
scope str

Scope for model listing (default: "organization")

'organization'

Returns:

Type Description
dict[str, Any]

Dict with provider, models list, and total count

Raises:

Type Description
GatewayAuthError

If authentication fails

GatewayModelNotFoundError

If provider not found

GatewayClientError

For other errors

Source code in flow_sdk/gateway_client.py
async def list_provider_models(
    self,
    provider_name: str,
    scope: str = "organization",
) -> dict[str, Any]:
    """List models available from a specific provider.

    Args:
        provider_name: Provider identifier (e.g., "openai", "anthropic")
        scope: Scope for model listing (default: "organization")

    Returns:
        Dict with provider, models list, and total count

    Raises:
        GatewayAuthError: If authentication fails
        GatewayModelNotFoundError: If provider not found
        GatewayClientError: For other errors
    """
    try:
        response = await self._send_request(
            "GET",
            f"/api/v1/gateway/models/{provider_name}",
            params={"scope": scope},
        )
    except httpx.HTTPError as e:
        raise GatewayClientError(f"HTTP error: {self._format_http_error(e)}") from e

    if response.status_code != 200:
        self._handle_error_response(response)

    return response.json()  # type: ignore[return-value]

list_known_providers() async

List all known LLM providers.

Returns:

Type Description
list[str]

List of provider name strings

Raises:

Type Description
GatewayAuthError

If authentication fails

GatewayClientError

For other errors

Source code in flow_sdk/gateway_client.py
async def list_known_providers(self) -> list[str]:
    """List all known LLM providers.

    Returns:
        List of provider name strings

    Raises:
        GatewayAuthError: If authentication fails
        GatewayClientError: For other errors
    """
    try:
        response = await self._send_request(
            "GET",
            "/api/v1/gateway/models/providers/list",
        )
    except httpx.HTTPError as e:
        raise GatewayClientError(f"HTTP error: {self._format_http_error(e)}") from e

    if response.status_code != 200:
        self._handle_error_response(response)

    data: dict[str, Any] = response.json()
    return data.get("providers", [])  # type: ignore[return-value]