Datasets (Beta)¶
Datasets are managed collections of structured data in Manifest Platform. They serve as evaluation test suites for agents, live-query interfaces to external systems, and reference data for enrichment and lookup.
Creating a Dataset¶
Coming Soon
The Python SDK for local development is not yet publicly available.
from flow_sdk.cli_client import CLIClient
client = CLIClient(config)
# Optionally validate metadata before persisting
check = client.datasets.validate({
"name": "Support Ticket Evaluation",
"slug": "support-ticket-eval",
})
assert check.valid, check.errors
dataset = client.datasets.create({
"name": "Support Ticket Evaluation",
"slug": "support-ticket-eval",
"description": "Test cases for the customer support agent",
"status": "draft",
"tags": ["evaluation", "support"],
"categories": ["agent-testing"],
})
print(f"Created dataset: {dataset.id}")
Navigate to Datasets > New. Enter name, description, status, tags. Click Create.
Dataset Fields¶
| Field | Required | Description |
|---|---|---|
name |
Yes | Human-readable name (3-200 chars) |
slug |
Yes | URL-safe identifier (3-100 chars, lowercase, hyphens allowed) |
description |
No | What this dataset contains |
connector_id |
No | Link to a connector for source data |
external_id |
No | External identifier from the source system (up to 500 chars) |
owner_id |
No | UUID of the dataset owner (defaults to the creating user) |
team |
No | Owning team name (up to 200 chars) |
status |
No | draft, active, published, deprecated, or archived |
certification_level |
No | none, bronze, silver, or gold |
tags |
No | Categorization tags |
categories |
No | Category labels for grouping |
policy_tags |
No | Governance tags (pii, hipaa, etc.) |
data_residency |
No | Region constraint for data storage (up to 50 chars) |
Updating a Dataset¶
Use PATCH /datasets/{dataset_id} to update metadata. In addition to most create fields, the update request also accepts:
| Field | Description |
|---|---|
quality_score |
Numeric quality score (decimal) |
record_count |
Number of records in the dataset (integer) |
size_bytes |
Dataset size in bytes (integer) |
Schema Management¶
Every dataset has a versioned schema that defines its field structure. Schemas can be defined manually or discovered automatically from connector sources.
Defining a Schema¶
Coming Soon
The Python SDK for local development is not yet publicly available.
client.datasets.register_schema(dataset_id, {
"schema_definition": {
"fields": [
{"name": "input", "type": "string", "description": "User prompt"},
{"name": "expected_output", "type": "string", "description": "Expected agent response"},
{"name": "category", "type": "string", "description": "Query category"},
{"name": "difficulty", "type": "string", "description": "easy, medium, hard"},
]
},
"change_type": "non_breaking",
"change_summary": "Initial schema definition",
})
Schema Versioning¶
Schemas are versioned automatically. Each update creates a new version with a change type:
| Change Type | Description | Impact |
|---|---|---|
non_breaking |
Added optional fields, updated descriptions | Safe for existing consumers |
breaking |
Removed fields, changed types, renamed fields | May break existing queries |
Schema Discovery¶
When a dataset has connector sources, the platform can infer the schema from the source data:
Coming Soon
The Python SDK for local development is not yet publicly available.
inferred = client.datasets.infer_schema(dataset_id)
for field in inferred.fields:
print(f" {field.name}: {field.type}")
if inferred.merge_conflicts:
print(f"Conflicts: {inferred.merge_conflicts}")
If multiple sources have conflicting field types, the response includes merge_conflicts to help you resolve them.
Connector Sources¶
Link connector instances as data sources to pull live data into a dataset.
Adding a Source¶
Coming Soon
The Python SDK for local development is not yet publicly available.
source = client.datasets.add_source(dataset_id, {
"connector_instance_id": "uuid-of-jira-instance",
"operation_id": "list_issues",
"display_name": "Jira Open Tickets",
"operation_config": {
"jql": "status = Open",
"maxResults": 100,
},
"credential_scope_type": "organization",
"credential_scope_id": "uuid-of-org",
"field_mappings": {
"key": "ticket_id",
"fields.summary": "title",
"fields.status.name": "status",
"fields.assignee.displayName": "assignee",
},
})
Field Mappings¶
Field mappings transform source data to match the dataset schema:
{
"key": "ticket_id",
"fields.summary": "title",
"fields.status.name": "status",
"fields.created": "created_at"
}
The left side is the path in the source data; the right side is the field name in the dataset schema.
Testing a Source¶
Before relying on a source, test it:
Coming Soon
The Python SDK for local development is not yet publicly available.
result = client.datasets.test_source(dataset_id, source_id)
if result.success:
print(f"Got {result.record_count} rows in {result.execution_time_ms}ms")
for row in result.sample_rows:
print(row)
else:
print(f"Error: {result.error}")
Previewing Data¶
The preview endpoint shows raw source data alongside transformed data, so you can verify field mappings and privacy filters:
Coming Soon
The Python SDK for local development is not yet publicly available.
preview = client.datasets.preview(dataset_id)
for source in preview.sources:
print(f"Source: {source.source_name}")
print(f" Raw: {source.source_data}")
print(f" Transformed: {source.transformed_data}")
print(f" Transformations: {source.transformations_applied}")
Querying Datasets¶
The Query API provides a flexible interface for filtering, sorting, and paginating dataset data.
Basic Query¶
result = sdk.datasets.query(
dataset_id="uuid-of-dataset",
filters=[
{"field": "status", "operator": "eq", "value": "open"},
{"field": "priority", "operator": "in", "value": ["high", "critical"]},
],
sort_by="created_at",
sort_order="desc",
limit=50,
offset=0,
)
print(f"Total matching: {result.total}")
print(f"Returned: {result.returned}")
print(f"Query time: {result.execution_time_ms}ms")
for row in result.rows:
print(row)
Filter Operators¶
| Operator | Description | Example |
|---|---|---|
eq |
Equals | {"field": "status", "operator": "eq", "value": "open"} |
ne |
Not equals | {"field": "status", "operator": "ne", "value": "closed"} |
gt |
Greater than | {"field": "score", "operator": "gt", "value": 0.8} |
gte |
Greater than or equal | {"field": "count", "operator": "gte", "value": 10} |
lt |
Less than | {"field": "age_days", "operator": "lt", "value": 30} |
lte |
Less than or equal | {"field": "priority", "operator": "lte", "value": 3} |
in |
Value in list | {"field": "type", "operator": "in", "value": ["bug", "task"]} |
nin |
Value not in list | {"field": "status", "operator": "nin", "value": ["archived"]} |
contains |
String contains | {"field": "title", "operator": "contains", "value": "error"} |
startswith |
String starts with | {"field": "key", "operator": "startswith", "value": "PROJ-"} |
endswith |
String ends with | {"field": "email", "operator": "endswith", "value": "@company.com"} |
Field Projection¶
Return only specific fields to reduce response size:
result = sdk.datasets.query(
dataset_id="uuid-of-dataset",
fields=["ticket_id", "title", "status"],
limit=100,
)
Source Filtering¶
Query specific sources within a multi-source dataset:
result = sdk.datasets.query(
dataset_id="uuid-of-dataset",
source_ids=["uuid-of-jira-source"],
limit=50,
)
Dataset Lifecycle¶
stateDiagram-v2
[*] --> Draft: Create
Draft --> Active: Add sources & schema
Active --> Published: Certify
Published --> Active: Update
Published --> Deprecated: Sunset
Deprecated --> Archived: Archive
Active --> Archived: Archive
| Status | Description |
|---|---|
draft |
Initial creation, schema and sources being configured |
active |
Ready for use, accepting queries |
published |
Certified and stable, recommended for production use |
deprecated |
Marked for retirement, still queryable |
archived |
No longer queryable, retained for lineage |
Data Lineage¶
Track the provenance of datasets through transformation chains:
Coming Soon
The Python SDK for local development is not yet publicly available.
lineage = client.datasets.get_lineage(dataset_id)
print("Upstream datasets:")
for node in lineage.upstream:
print(f" {node.name} ({node.slug}) - {node.relationship_type}")
print("Downstream datasets:")
for node in lineage.downstream:
print(f" {node.name} ({node.slug}) - {node.relationship_type}")
Lineage relationships are created automatically when datasets are derived from other datasets through transforms or preparation steps.
Data Preparation¶
Transform dataset data into different formats for export or downstream processing:
Coming Soon
The Python SDK for local development is not yet publicly available.
prep = client.datasets.prepare(dataset_id, {
"target_format": "jsonl", # json, jsonl, or csv
"cleaning_config": {
"remove_nulls": True,
"trim_whitespace": True,
},
})
print(f"Preparation ID: {prep.preparation_id}")
print(f"Output dataset: {prep.output_dataset_id}")
print(f"Execution time: {prep.execution_time_ms}ms")
Using Datasets with Agents¶
Datasets are a key part of the agent development lifecycle:
- Create evaluation datasets with
inputandexpected_outputfields - Run agents against datasets in the Playground evaluation mode
- Score results using semantic similarity, LLM judges, or custom scorers
- Build test suites from production traces using trace-to-test conversion
- Gate deployments on dataset evaluation pass rates
Grow your evaluation dataset over time
Start with 50 hand-written test cases. As your agent enters production, use trace-to-test conversion to add real-world examples. A mature agent evaluation dataset typically has 200-500 samples covering normal cases, edge cases, and adversarial inputs.