ChamberClient
The main class for interacting with the Chamber API.
Initialization
from chamber_sdk import ChamberClient
# From environment variable (CHAMBER_TOKEN)
client = ChamberClient()
# From CLI config (~/.chamber/token.json)
client = ChamberClient.from_config()
# Direct token
client = ChamberClient(
token="ch.your-api-token",
organization_id="org-123", # Optional: for multi-org users
api_url="https://custom.api.example.com/v1", # Optional: override endpoint
timeout=60 # Optional: request timeout in seconds
)
Default API URL: https://api.usechamber.io/v1
Auto-Containerize & Run
run()
Auto-containerize and submit a GPU workload in one call. See the full guide for detailed documentation.
# Configure registries once
ChamberClient.add_registry("prod", "us-east1-docker.pkg.dev/my-project/prod", set_default=True)
ChamberClient.add_registry("dev", "us-east1-docker.pkg.dev/my-project/dev")
# Submit using default registry
job = client.run("./my-project", gpus=4, team="ml-research")
# Submit to specific registry by name
job = client.run("./my-project", gpus=4, team="ml-research", registry="dev")
# Full URLs still work
job = client.run("./my-project", registry="123456.dkr.ecr.us-east-1.amazonaws.com", team="ml-research")
# Dry run (preview without executing)
result = client.run("./my-project", dry_run=True)
print(result.dockerfile)
print(result.manifest)
Requires installation with pip install chamber-sdk[run]
Key Parameters:
| Parameter | Type | Description |
|---|
directory | str | Path to project directory (required) |
gpus | int | Number of GPUs (default: 1) |
gpu_type | str | GPU type (default: “H100”) |
team | str | Team ID (required for submission) |
registry | str | Registry name (e.g., “prod”) or URL. Uses default if not specified. |
distributed | str | ”auto”, “ray”, “deepspeed”, or “none” |
dry_run | bool | Preview without executing |
wait | bool | Block until workload completes |
on_progress | callable | Progress callback(stage, message) |
Returns: Workload object (or DryRunResult if dry_run=True)
See Auto-Containerize & Run for the complete parameter reference.
Registry Management
Static methods for managing container registries. Configuration is persisted to ~/.chamber/config.json.
list_registries()
List all configured registries.
registries = ChamberClient.list_registries()
# {'prod': 'us-east1-docker.pkg.dev/my-project/prod', 'dev': '...'}
Returns: dict[str, str] mapping names to URLs
add_registry()
Add or update a named registry.
ChamberClient.add_registry(
"prod",
"us-east1-docker.pkg.dev/my-project/prod",
set_default=True # Optional: also set as default
)
Parameters:
| Parameter | Type | Description |
|---|
name | str | Registry name (e.g., “prod”, “dev”) |
url | str | Registry URL |
set_default | bool | If True, also set as default (default: False) |
set_default_registry()
Set the default registry by name.
ChamberClient.set_default_registry("dev")
Raises: ValueError if the registry name doesn’t exist
get_default_registry()
Get the current default registry.
name, url = ChamberClient.get_default_registry()
# ('prod', 'us-east1-docker.pkg.dev/my-project/prod')
Returns: tuple[str, str] of (name, url) or None if not configured
Workload Submission
submit_job()
Submit a new GPU workload.
job = client.submit_job(
name="training-job",
initiative_id="team-id",
gpu_type="H100",
requested_gpus=8,
job_class=JobClass.RESERVED,
priority=50,
tags={"experiment": "v1", "owner": "ml-team"},
external_id="my-tracking-id"
)
Parameters:
| Parameter | Type | Required | Description |
|---|
name | str | Yes | Human-readable workload name (1-255 chars) |
initiative_id | str | Yes | Team ID |
gpu_type | str | Yes | GPU model (e.g., “H100”, “A100”) |
requested_gpus | int/float | No | Number of GPUs (optional if using template) |
job_class | JobClass | No | RESERVED or ELASTIC (default: RESERVED) |
priority | int | No | 0-100 (higher = more important) |
tags | dict | No | Key-value pairs for tagging |
metadata | dict | No | Custom metadata dictionary |
external_id | str | No | Your own tracking ID (max 255 chars) |
k8s_manifest | str | No | Custom Kubernetes manifest YAML (max 65536 chars) |
template_id | str | No | Template ID to use for configuration |
allocation_id | str | No | Force specific capacity allocation |
Returns: Workload object
Distributed Training
For multi-node distributed training:
job = client.submit_job(
name="distributed-training",
initiative_id="team-id",
gpu_type="H100",
requested_gpus=32,
gpus_per_pod=8,
requested_pods=4,
scaling_mode=ScalingMode.GANG, # All-or-nothing scheduling
distribution_mode="auto"
)
Additional Parameters:
| Parameter | Type | Description |
|---|
gpus_per_pod | int | GPUs per pod |
requested_pods | int | Number of pods |
scaling_mode | ScalingMode | GANG or ELASTIC |
min_pods | int | Minimum pods for elastic scaling |
distribution_mode | str | ”auto” or “manual” |
Workload Management
get_workload()
Retrieve details of a specific workload.
job = client.get_workload("workload-id")
print(f"Status: {job.status}")
print(f"GPUs: {job.requested_gpus}")
Returns: Workload object
list_workloads()
List workloads with optional filters.
response = client.list_workloads(
status=JobStatus.RUNNING,
initiative_id="team-id",
submitted_by="user-123",
limit=50
)
for job in response.items:
print(f"{job.name}: {job.status}")
Parameters:
| Parameter | Type | Description |
|---|
status | JobStatus | Filter by status |
initiative_id | str | Filter by team |
submitted_by | str | Filter by user |
is_managed | bool | True for Chamber workloads, False for discovered |
submitted_after | str | ISO8601 datetime filter |
submitted_before | str | ISO8601 datetime filter |
sort_order | str | ”asc” or “desc” |
limit | int | Max results (1-100, default: 20) |
next_token | str | Pagination cursor |
Returns: PaginatedResponse with items, next_token, total_count
iter_workloads()
Iterate through all workloads with automatic pagination.
for job in client.iter_workloads(status=JobStatus.COMPLETED):
print(f"{job.name}: {job.requested_gpus} GPUs")
cancel_workload()
Cancel a running or pending workload.
cancelled_job = client.cancel_workload("workload-id")
print(f"Status: {cancelled_job.status}") # CANCELLED
Returns: Workload object with updated status
search_workloads()
Search workloads with advanced filtering.
results = client.search_workloads(
status=["RUNNING", "PENDING"],
gpu_type=["H100"],
priority_min=50,
submitted_from="2024-01-01T00:00:00Z",
query="training",
sort_by="submitted_at",
sort_order="desc",
page_size=25
)
print(f"Found {results.total_count} workloads")
for job in results.items:
print(f"{job.name}: {job.status}")
# Paginate through results
if results.has_more:
next_page = client.search_workloads(cursor=results.next_cursor)
Parameters:
| Parameter | Type | Description |
|---|
status | list[str] | Filter by status(es) |
job_class | list[str] | Filter by workload class(es) |
gpu_type | list[str] | Filter by GPU type(s) |
initiative_id | list[str] | Filter by team ID(s) |
submitted_by | list[str] | Filter by user ID(s) |
priority_min | int | Minimum priority |
priority_max | int | Maximum priority |
requested_gpus_min | int | Minimum GPU count |
requested_gpus_max | int | Maximum GPU count |
submitted_from | str | Start date (ISO8601) |
submitted_to | str | End date (ISO8601) |
query | str | Full-text search query |
sort_by | str | Sort field (default: submitted_at) |
sort_order | str | ”asc” or “desc” (default: desc) |
page_size | int | Results per page (1-100, default: 25) |
cursor | str | Pagination cursor |
Returns: WorkloadSearchResult with items, total_count, has_more, next_cursor
get_workload_aggregations()
Get workload counts grouped by a dimension.
agg = client.get_workload_aggregations(
dimension="status",
initiative_id=["team-ml"]
)
print(f"Total: {agg.total}")
for bucket in agg.buckets:
print(f" {bucket.key}: {bucket.count}")
Parameters:
| Parameter | Type | Description |
|---|
dimension | str | Group by: status, job_class, gpu_type, initiative_id, submitted_by, capacity_pool_id |
status | list[str] | Filter by status(es) |
job_class | list[str] | Filter by workload class(es) |
initiative_id | list[str] | Filter by team ID(s) |
submitted_from | str | Start date (ISO8601) |
submitted_to | str | End date (ISO8601) |
Returns: AggregationResult with dimension, buckets, total
wait_for_completion()
Block until a workload reaches a terminal status.
result = client.wait_for_completion(
workload_id="job-id",
poll_interval=30, # Seconds between checks
timeout=3600 # Max wait time in seconds
)
Returns: Workload object with final status
Metrics and Statistics
get_workload_metrics()
Retrieve GPU metrics for a specific workload.
metrics = client.get_workload_metrics(
"workload-id",
time_range="job_lifetime"
)
print(f"GPU Utilization: {metrics.gpu_utilization.avg:.1f}%")
print(f"Memory: {metrics.memory_utilization.avg:.1f}%")
print(f"Temperature: {metrics.temperature.avg:.1f}C")
print(f"Power: {metrics.power_usage.avg:.1f} W")
Parameters:
| Parameter | Type | Description |
|---|
workload_id | str | The workload ID |
time_range | str | last_1h, last_6h, last_24h, or job_lifetime |
metrics | list[str] | Specific metrics to retrieve |
Returns: WorkloadMetrics with gpu_utilization, memory_utilization, temperature, power_usage
get_global_metrics()
Get aggregated metrics for your organization.
metrics = client.get_global_metrics(
time_range="last_24h",
initiative_id="team-ml"
)
print(f"Active workloads: {metrics.active_workloads}")
print(f"Total GPU hours: {metrics.total_gpu_hours}")
print(f"Avg GPU utilization: {metrics.gpu_utilization.avg:.1f}%")
Parameters:
| Parameter | Type | Description |
|---|
time_range | str | last_1h, last_6h, last_24h, last_7d, last_30d |
initiative_id | str | Filter by team |
cluster_id | str | Filter by cluster |
group_by | str | Group by field (e.g., “initiative_id”) |
Returns: GlobalMetrics with aggregated data
get_batch_workload_metrics()
Get metrics for multiple workloads at once.
data = client.get_batch_workload_metrics(
status=["RUNNING"],
time_range="last_24h",
sort_by="gpu_utilization",
limit=10
)
Returns: Dict with workload metrics ranked by the specified metric
get_workload_stats()
Get aggregated workload statistics.
stats = client.get_workload_stats(
time_range="last_7_days",
initiative_id="team-id"
)
print(f"Total workloads: {stats.total}")
for status, count in stats.by_status.items():
print(f" {status}: {count}")
Returns: WorkloadStats with total, by_status, by_job_class
Teams
list_teams()
List all teams accessible to the current user.
teams = client.list_teams()
for team in teams:
print(f"{team.name} ({team.id})")
Returns: List of Team objects
create_team()
Create a new team.
team = client.create_team(
name="ML Research",
description="Machine learning research team",
tags={"department": "engineering"}
)
print(f"Created team: {team.id}")
Parameters:
| Parameter | Type | Description |
|---|
name | str | Team name (1-255 chars) |
description | str | Team description (max 1000 chars) |
parent_id | str | Parent team ID for sub-teams |
tags | dict | Key-value pairs for tagging |
Returns: Team object
get_team()
Get details of a specific team.
team = client.get_team("team-id")
print(f"Team: {team.name}")
print(f"Description: {team.description}")
Returns: Team object
Templates
list_templates()
List available workload templates.
templates = client.list_templates(
scope="ORGANIZATION",
include_system=True
)
for template in templates:
print(f"{template.name}: {template.gpu_type}")
Parameters:
| Parameter | Type | Description |
|---|
name | str | Filter by name (partial match) |
scope | str | PERSONAL, PROJECT, or ORGANIZATION |
include_system | bool | Include system templates (default: True) |
Returns: List of Template objects
get_template()
Get details of a specific template.
template = client.get_template("template-id")
print(f"Template: {template.name}")
print(f"GPU Type: {template.gpu_type}")
print(f"GPUs: {template.requested_gpus}")
Returns: Template object
Allocations
list_allocations()
List capacity allocations for a team.
allocations = client.list_allocations(initiative_id="team-id")
for alloc in allocations:
print(f"{alloc.id}: {alloc.allocated_instances} instances ({alloc.status})")
Returns: List of Allocation objects
create_allocation()
Create a new capacity allocation.
allocation = client.create_allocation(
initiative_id="team-id",
reservation_id="reservation-123",
allocated_instances=4
)
print(f"Allocated: {allocation.id}")
Returns: Allocation object
get_allocation()
Get details of a specific allocation.
allocation = client.get_allocation("allocation-id")
print(f"Status: {allocation.status}")
print(f"Instances: {allocation.allocated_instances}")
Returns: Allocation object
Capacity
get_capacity()
Check available GPU capacity and budget.
capacity = client.get_capacity()
# Budget info
print(f"Allocated: {capacity.budget.allocated} GPU-hours")
print(f"Used: {capacity.budget.used} GPU-hours")
print(f"Available: {capacity.budget.available} GPU-hours")
# Capacity pools
for pool in capacity.pools:
print(f"{pool.name} ({pool.gpu_type}):")
print(f" Available: {pool.available_gpus}/{pool.total_gpus} GPUs")
health()
Check API health status.
health = client.health()
print(f"Status: {health.status}")
print(f"Version: {health.version}")
Enums
JobStatus
from chamber_sdk import JobStatus
JobStatus.PENDING # Submitted, awaiting scheduling
JobStatus.QUEUED # Scheduled, waiting for resources
JobStatus.STARTING # Resources allocated, starting
JobStatus.RUNNING # Currently executing
JobStatus.COMPLETED # Finished successfully
JobStatus.FAILED # Workload failed
JobStatus.PREEMPTED # Preempted (elastic jobs)
JobStatus.CANCELLED # Cancelled by user
JobClass
from chamber_sdk import JobClass
JobClass.RESERVED # Guaranteed capacity, non-preemptible
JobClass.ELASTIC # Uses idle capacity, can be preempted
JobClass.DISCOVERED # External workload discovered by Chamber
ScalingMode
from chamber_sdk import ScalingMode
ScalingMode.GANG # All-or-nothing scheduling
ScalingMode.ELASTIC # Can scale between min/max pods
AllocationStatus
from chamber_sdk import AllocationStatus
AllocationStatus.ACTIVE # Currently active
AllocationStatus.RELEASED # Released by user
AllocationStatus.EXPIRED # Expired
Data Models
Workload
| Field | Type | Description |
|---|
id | str | Unique workload ID |
name | str | Workload name |
status | JobStatus | Current status |
requested_gpus | float | Number of GPUs |
gpu_type | str | GPU model |
job_class | JobClass | RESERVED, ELASTIC, or DISCOVERED |
priority | int | Priority level |
tags | dict | Key-value tags |
metadata | dict | Custom metadata |
template_id | str | Template ID if used |
initiative_id | str | Team ID |
submitted_by | str | User ID |
submitted_at | datetime | Submission timestamp |
started_at | datetime | Start timestamp |
completed_at | datetime | Completion timestamp |
failure_reason | str | Failure message (if failed) |
Team
| Field | Type | Description |
|---|
id | str | Team ID |
name | str | Team name |
description | str | Team description |
parent_id | str | Parent team ID |
tags | dict | Key-value tags |
created_at | datetime | Creation timestamp |
Template
| Field | Type | Description |
|---|
id | str | Template ID |
name | str | Template name |
scope | str | PERSONAL, PROJECT, or ORGANIZATION |
description | str | Template description |
gpu_type | str | GPU model |
requested_gpus | int | Number of GPUs |
k8s_manifest | str | Kubernetes manifest |
configuration | dict | Additional configuration |
Allocation
| Field | Type | Description |
|---|
id | str | Allocation ID |
initiative_id | str | Team ID |
reservation_id | str | Reservation ID |
allocated_instances | int | Number of instances |
status | AllocationStatus | Allocation status |
created_at | datetime | Creation timestamp |
Exceptions
All exceptions inherit from ChamberError:
from chamber_sdk import (
ChamberError,
AuthenticationError,
AuthorizationError,
NotFoundError,
ValidationError,
RateLimitError,
ServerError,
DockerError, # For run() errors
)
from chamber_sdk.run import RegistryAuthError # Specific registry auth failures
try:
job = client.get_workload("invalid-id")
except NotFoundError:
print("Workload not found")
except AuthenticationError:
print("Invalid or expired token")
except RateLimitError as e:
print(f"Rate limited: {e.message}")
except ChamberError as e:
print(f"API error: {e}")
| Exception | HTTP Code | Description |
|---|
AuthenticationError | 401 | Invalid or expired token |
AuthorizationError | 403 | Insufficient permissions |
NotFoundError | 404 | Resource not found |
ValidationError | 400 | Invalid request parameters |
RateLimitError | 429 | Rate limit exceeded |
ServerError | 5xx | Server-side error |
DockerError | N/A | Docker build/push error (for run()) |
RegistryAuthError | N/A | Registry authentication/authorization failure (subclass of DockerError) |
Handling run() Errors
from chamber_sdk import ChamberClient, DockerError
from chamber_sdk.run import RegistryAuthError
try:
job = client.run("./my-project", registry="...", team="...")
except RegistryAuthError as e:
# Registry authentication failed (401/403 from registry)
# e.registry contains the registry URL
# e.detail contains the error details
print(f"Registry auth failed for {e.registry}: {e.detail}")
except DockerError as e:
# Docker not installed, daemon not running, build failed, or push failed
print(f"Docker error: {e}")
except FileNotFoundError as e:
# No Python entrypoint found in project
print(f"Project error: {e}")
except ValueError as e:
# Missing required parameters (registry, team)
print(f"Configuration error: {e}")