Skip to content

Datasets (Beta)

Datasets are managed collections of structured data in Manifest Platform. They serve as evaluation test suites for agents, live-query interfaces to external systems, and reference data for enrichment and lookup.


Creating a Dataset

Coming Soon

The Python SDK for local development is not yet publicly available.

from flow_sdk.cli_client import CLIClient

client = CLIClient(config)

# Optionally validate metadata before persisting
check = client.datasets.validate({
    "name": "Support Ticket Evaluation",
    "slug": "support-ticket-eval",
})
assert check.valid, check.errors

dataset = client.datasets.create({
    "name": "Support Ticket Evaluation",
    "slug": "support-ticket-eval",
    "description": "Test cases for the customer support agent",
    "status": "draft",
    "tags": ["evaluation", "support"],
    "categories": ["agent-testing"],
})

print(f"Created dataset: {dataset.id}")

Navigate to Datasets > New. Enter name, description, status, tags. Click Create.

Dataset Fields

Field Required Description
name Yes Human-readable name (3-200 chars)
slug Yes URL-safe identifier (3-100 chars, lowercase, hyphens allowed)
description No What this dataset contains
connector_id No Link to a connector for source data
external_id No External identifier from the source system (up to 500 chars)
owner_id No UUID of the dataset owner (defaults to the creating user)
team No Owning team name (up to 200 chars)
status No draft, active, published, deprecated, or archived
certification_level No none, bronze, silver, or gold
tags No Categorization tags
categories No Category labels for grouping
policy_tags No Governance tags (pii, hipaa, etc.)
data_residency No Region constraint for data storage (up to 50 chars)

Updating a Dataset

Use PATCH /datasets/{dataset_id} to update metadata. In addition to most create fields, the update request also accepts:

Field Description
quality_score Numeric quality score (decimal)
record_count Number of records in the dataset (integer)
size_bytes Dataset size in bytes (integer)

Schema Management

Every dataset has a versioned schema that defines its field structure. Schemas can be defined manually or discovered automatically from connector sources.

Defining a Schema

Coming Soon

The Python SDK for local development is not yet publicly available.

client.datasets.register_schema(dataset_id, {
    "schema_definition": {
        "fields": [
            {"name": "input", "type": "string", "description": "User prompt"},
            {"name": "expected_output", "type": "string", "description": "Expected agent response"},
            {"name": "category", "type": "string", "description": "Query category"},
            {"name": "difficulty", "type": "string", "description": "easy, medium, hard"},
        ]
    },
    "change_type": "non_breaking",
    "change_summary": "Initial schema definition",
})

Schema Versioning

Schemas are versioned automatically. Each update creates a new version with a change type:

Change Type Description Impact
non_breaking Added optional fields, updated descriptions Safe for existing consumers
breaking Removed fields, changed types, renamed fields May break existing queries

Schema Discovery

When a dataset has connector sources, the platform can infer the schema from the source data:

Coming Soon

The Python SDK for local development is not yet publicly available.

inferred = client.datasets.infer_schema(dataset_id)

for field in inferred.fields:
    print(f"  {field.name}: {field.type}")

if inferred.merge_conflicts:
    print(f"Conflicts: {inferred.merge_conflicts}")

If multiple sources have conflicting field types, the response includes merge_conflicts to help you resolve them.


Connector Sources

Link connector instances as data sources to pull live data into a dataset.

Adding a Source

Coming Soon

The Python SDK for local development is not yet publicly available.

source = client.datasets.add_source(dataset_id, {
    "connector_instance_id": "uuid-of-jira-instance",
    "operation_id": "list_issues",
    "display_name": "Jira Open Tickets",
    "operation_config": {
        "jql": "status = Open",
        "maxResults": 100,
    },
    "credential_scope_type": "organization",
    "credential_scope_id": "uuid-of-org",
    "field_mappings": {
        "key": "ticket_id",
        "fields.summary": "title",
        "fields.status.name": "status",
        "fields.assignee.displayName": "assignee",
    },
})

Field Mappings

Field mappings transform source data to match the dataset schema:

{
  "key": "ticket_id",
  "fields.summary": "title",
  "fields.status.name": "status",
  "fields.created": "created_at"
}

The left side is the path in the source data; the right side is the field name in the dataset schema.

Testing a Source

Before relying on a source, test it:

Coming Soon

The Python SDK for local development is not yet publicly available.

result = client.datasets.test_source(dataset_id, source_id)

if result.success:
    print(f"Got {result.record_count} rows in {result.execution_time_ms}ms")
    for row in result.sample_rows:
        print(row)
else:
    print(f"Error: {result.error}")

Previewing Data

The preview endpoint shows raw source data alongside transformed data, so you can verify field mappings and privacy filters:

Coming Soon

The Python SDK for local development is not yet publicly available.

preview = client.datasets.preview(dataset_id)

for source in preview.sources:
    print(f"Source: {source.source_name}")
    print(f"  Raw: {source.source_data}")
    print(f"  Transformed: {source.transformed_data}")
    print(f"  Transformations: {source.transformations_applied}")

Querying Datasets

The Query API provides a flexible interface for filtering, sorting, and paginating dataset data.

Basic Query

result = sdk.datasets.query(
    dataset_id="uuid-of-dataset",
    filters=[
        {"field": "status", "operator": "eq", "value": "open"},
        {"field": "priority", "operator": "in", "value": ["high", "critical"]},
    ],
    sort_by="created_at",
    sort_order="desc",
    limit=50,
    offset=0,
)

print(f"Total matching: {result.total}")
print(f"Returned: {result.returned}")
print(f"Query time: {result.execution_time_ms}ms")

for row in result.rows:
    print(row)

Filter Operators

Operator Description Example
eq Equals {"field": "status", "operator": "eq", "value": "open"}
ne Not equals {"field": "status", "operator": "ne", "value": "closed"}
gt Greater than {"field": "score", "operator": "gt", "value": 0.8}
gte Greater than or equal {"field": "count", "operator": "gte", "value": 10}
lt Less than {"field": "age_days", "operator": "lt", "value": 30}
lte Less than or equal {"field": "priority", "operator": "lte", "value": 3}
in Value in list {"field": "type", "operator": "in", "value": ["bug", "task"]}
nin Value not in list {"field": "status", "operator": "nin", "value": ["archived"]}
contains String contains {"field": "title", "operator": "contains", "value": "error"}
startswith String starts with {"field": "key", "operator": "startswith", "value": "PROJ-"}
endswith String ends with {"field": "email", "operator": "endswith", "value": "@company.com"}

Field Projection

Return only specific fields to reduce response size:

result = sdk.datasets.query(
    dataset_id="uuid-of-dataset",
    fields=["ticket_id", "title", "status"],
    limit=100,
)

Source Filtering

Query specific sources within a multi-source dataset:

result = sdk.datasets.query(
    dataset_id="uuid-of-dataset",
    source_ids=["uuid-of-jira-source"],
    limit=50,
)

Dataset Lifecycle

stateDiagram-v2
    [*] --> Draft: Create
    Draft --> Active: Add sources & schema
    Active --> Published: Certify
    Published --> Active: Update
    Published --> Deprecated: Sunset
    Deprecated --> Archived: Archive
    Active --> Archived: Archive
Status Description
draft Initial creation, schema and sources being configured
active Ready for use, accepting queries
published Certified and stable, recommended for production use
deprecated Marked for retirement, still queryable
archived No longer queryable, retained for lineage

Data Lineage

Track the provenance of datasets through transformation chains:

Coming Soon

The Python SDK for local development is not yet publicly available.

lineage = client.datasets.get_lineage(dataset_id)

print("Upstream datasets:")
for node in lineage.upstream:
    print(f"  {node.name} ({node.slug}) - {node.relationship_type}")

print("Downstream datasets:")
for node in lineage.downstream:
    print(f"  {node.name} ({node.slug}) - {node.relationship_type}")

Lineage relationships are created automatically when datasets are derived from other datasets through transforms or preparation steps.


Data Preparation

Transform dataset data into different formats for export or downstream processing:

Coming Soon

The Python SDK for local development is not yet publicly available.

prep = client.datasets.prepare(dataset_id, {
    "target_format": "jsonl",          # json, jsonl, or csv
    "cleaning_config": {
        "remove_nulls": True,
        "trim_whitespace": True,
    },
})

print(f"Preparation ID: {prep.preparation_id}")
print(f"Output dataset: {prep.output_dataset_id}")
print(f"Execution time: {prep.execution_time_ms}ms")

Using Datasets with Agents

Datasets are a key part of the agent development lifecycle:

  1. Create evaluation datasets with input and expected_output fields
  2. Run agents against datasets in the Playground evaluation mode
  3. Score results using semantic similarity, LLM judges, or custom scorers
  4. Build test suites from production traces using trace-to-test conversion
  5. Gate deployments on dataset evaluation pass rates

Grow your evaluation dataset over time

Start with 50 hand-written test cases. As your agent enters production, use trace-to-test conversion to add real-world examples. A mature agent evaluation dataset typically has 200-500 samples covering normal cases, edge cases, and adversarial inputs.