Skip to main content

ChamberClient

The main class for interacting with the Chamber API.

Initialization

from chamber_sdk import ChamberClient

# From environment variable (CHAMBER_TOKEN)
client = ChamberClient()

# From CLI config (~/.chamber/token.json)
client = ChamberClient.from_config()

# Direct token
client = ChamberClient(
    token="ch.your-api-token",
    organization_id="org-123",  # Optional: for multi-org users
    api_url="https://custom.api.example.com/v1",  # Optional: override endpoint
    timeout=60  # Optional: request timeout in seconds
)
Default API URL: https://api.usechamber.io/v1

Auto-Containerize & Run

run()

Auto-containerize and submit a GPU workload in one call. See the full guide for detailed documentation.
# Configure registries once
ChamberClient.add_registry("prod", "us-east1-docker.pkg.dev/my-project/prod", set_default=True)
ChamberClient.add_registry("dev", "us-east1-docker.pkg.dev/my-project/dev")

# Submit using default registry
job = client.run("./my-project", gpus=4, team="ml-research")

# Submit to specific registry by name
job = client.run("./my-project", gpus=4, team="ml-research", registry="dev")

# Full URLs still work
job = client.run("./my-project", registry="123456.dkr.ecr.us-east-1.amazonaws.com", team="ml-research")

# Dry run (preview without executing)
result = client.run("./my-project", dry_run=True)
print(result.dockerfile)
print(result.manifest)
Requires installation with pip install chamber-sdk[run]
Key Parameters:
ParameterTypeDescription
directorystrPath to project directory (required)
gpusintNumber of GPUs (default: 1)
gpu_typestrGPU type (default: “H100”)
teamstrTeam ID (required for submission)
registrystrRegistry name (e.g., “prod”) or URL. Uses default if not specified.
distributedstr”auto”, “ray”, “deepspeed”, or “none”
dry_runboolPreview without executing
waitboolBlock until workload completes
on_progresscallableProgress callback(stage, message)
Returns: Workload object (or DryRunResult if dry_run=True) See Auto-Containerize & Run for the complete parameter reference.

Registry Management

Static methods for managing container registries. Configuration is persisted to ~/.chamber/config.json.

list_registries()

List all configured registries.
registries = ChamberClient.list_registries()
# {'prod': 'us-east1-docker.pkg.dev/my-project/prod', 'dev': '...'}
Returns: dict[str, str] mapping names to URLs

add_registry()

Add or update a named registry.
ChamberClient.add_registry(
    "prod",
    "us-east1-docker.pkg.dev/my-project/prod",
    set_default=True  # Optional: also set as default
)
Parameters:
ParameterTypeDescription
namestrRegistry name (e.g., “prod”, “dev”)
urlstrRegistry URL
set_defaultboolIf True, also set as default (default: False)

set_default_registry()

Set the default registry by name.
ChamberClient.set_default_registry("dev")
Raises: ValueError if the registry name doesn’t exist

get_default_registry()

Get the current default registry.
name, url = ChamberClient.get_default_registry()
# ('prod', 'us-east1-docker.pkg.dev/my-project/prod')
Returns: tuple[str, str] of (name, url) or None if not configured

Workload Submission

submit_job()

Submit a new GPU workload.
job = client.submit_job(
    name="training-job",
    initiative_id="team-id",
    gpu_type="H100",
    requested_gpus=8,
    job_class=JobClass.RESERVED,
    priority=50,
    tags={"experiment": "v1", "owner": "ml-team"},
    external_id="my-tracking-id"
)
Parameters:
ParameterTypeRequiredDescription
namestrYesHuman-readable workload name (1-255 chars)
initiative_idstrYesTeam ID
gpu_typestrYesGPU model (e.g., “H100”, “A100”)
requested_gpusint/floatNoNumber of GPUs (optional if using template)
job_classJobClassNoRESERVED or ELASTIC (default: RESERVED)
priorityintNo0-100 (higher = more important)
tagsdictNoKey-value pairs for tagging
metadatadictNoCustom metadata dictionary
external_idstrNoYour own tracking ID (max 255 chars)
k8s_manifeststrNoCustom Kubernetes manifest YAML (max 65536 chars)
template_idstrNoTemplate ID to use for configuration
allocation_idstrNoForce specific capacity allocation
Returns: Workload object

Distributed Training

For multi-node distributed training:
job = client.submit_job(
    name="distributed-training",
    initiative_id="team-id",
    gpu_type="H100",
    requested_gpus=32,
    gpus_per_pod=8,
    requested_pods=4,
    scaling_mode=ScalingMode.GANG,  # All-or-nothing scheduling
    distribution_mode="auto"
)
Additional Parameters:
ParameterTypeDescription
gpus_per_podintGPUs per pod
requested_podsintNumber of pods
scaling_modeScalingModeGANG or ELASTIC
min_podsintMinimum pods for elastic scaling
distribution_modestr”auto” or “manual”

Workload Management

get_workload()

Retrieve details of a specific workload.
job = client.get_workload("workload-id")
print(f"Status: {job.status}")
print(f"GPUs: {job.requested_gpus}")
Returns: Workload object

list_workloads()

List workloads with optional filters.
response = client.list_workloads(
    status=JobStatus.RUNNING,
    initiative_id="team-id",
    submitted_by="user-123",
    limit=50
)

for job in response.items:
    print(f"{job.name}: {job.status}")
Parameters:
ParameterTypeDescription
statusJobStatusFilter by status
initiative_idstrFilter by team
submitted_bystrFilter by user
is_managedboolTrue for Chamber workloads, False for discovered
submitted_afterstrISO8601 datetime filter
submitted_beforestrISO8601 datetime filter
sort_orderstr”asc” or “desc”
limitintMax results (1-100, default: 20)
next_tokenstrPagination cursor
Returns: PaginatedResponse with items, next_token, total_count

iter_workloads()

Iterate through all workloads with automatic pagination.
for job in client.iter_workloads(status=JobStatus.COMPLETED):
    print(f"{job.name}: {job.requested_gpus} GPUs")

cancel_workload()

Cancel a running or pending workload.
cancelled_job = client.cancel_workload("workload-id")
print(f"Status: {cancelled_job.status}")  # CANCELLED
Returns: Workload object with updated status

search_workloads()

Search workloads with advanced filtering.
results = client.search_workloads(
    status=["RUNNING", "PENDING"],
    gpu_type=["H100"],
    priority_min=50,
    submitted_from="2024-01-01T00:00:00Z",
    query="training",
    sort_by="submitted_at",
    sort_order="desc",
    page_size=25
)

print(f"Found {results.total_count} workloads")
for job in results.items:
    print(f"{job.name}: {job.status}")

# Paginate through results
if results.has_more:
    next_page = client.search_workloads(cursor=results.next_cursor)
Parameters:
ParameterTypeDescription
statuslist[str]Filter by status(es)
job_classlist[str]Filter by workload class(es)
gpu_typelist[str]Filter by GPU type(s)
initiative_idlist[str]Filter by team ID(s)
submitted_bylist[str]Filter by user ID(s)
priority_minintMinimum priority
priority_maxintMaximum priority
requested_gpus_minintMinimum GPU count
requested_gpus_maxintMaximum GPU count
submitted_fromstrStart date (ISO8601)
submitted_tostrEnd date (ISO8601)
querystrFull-text search query
sort_bystrSort field (default: submitted_at)
sort_orderstr”asc” or “desc” (default: desc)
page_sizeintResults per page (1-100, default: 25)
cursorstrPagination cursor
Returns: WorkloadSearchResult with items, total_count, has_more, next_cursor

get_workload_aggregations()

Get workload counts grouped by a dimension.
agg = client.get_workload_aggregations(
    dimension="status",
    initiative_id=["team-ml"]
)

print(f"Total: {agg.total}")
for bucket in agg.buckets:
    print(f"  {bucket.key}: {bucket.count}")
Parameters:
ParameterTypeDescription
dimensionstrGroup by: status, job_class, gpu_type, initiative_id, submitted_by, capacity_pool_id
statuslist[str]Filter by status(es)
job_classlist[str]Filter by workload class(es)
initiative_idlist[str]Filter by team ID(s)
submitted_fromstrStart date (ISO8601)
submitted_tostrEnd date (ISO8601)
Returns: AggregationResult with dimension, buckets, total

wait_for_completion()

Block until a workload reaches a terminal status.
result = client.wait_for_completion(
    workload_id="job-id",
    poll_interval=30,  # Seconds between checks
    timeout=3600       # Max wait time in seconds
)
Returns: Workload object with final status

Metrics and Statistics

get_workload_metrics()

Retrieve GPU metrics for a specific workload.
metrics = client.get_workload_metrics(
    "workload-id",
    time_range="job_lifetime"
)

print(f"GPU Utilization: {metrics.gpu_utilization.avg:.1f}%")
print(f"Memory: {metrics.memory_utilization.avg:.1f}%")
print(f"Temperature: {metrics.temperature.avg:.1f}C")
print(f"Power: {metrics.power_usage.avg:.1f} W")
Parameters:
ParameterTypeDescription
workload_idstrThe workload ID
time_rangestrlast_1h, last_6h, last_24h, or job_lifetime
metricslist[str]Specific metrics to retrieve
Returns: WorkloadMetrics with gpu_utilization, memory_utilization, temperature, power_usage

get_global_metrics()

Get aggregated metrics for your organization.
metrics = client.get_global_metrics(
    time_range="last_24h",
    initiative_id="team-ml"
)

print(f"Active workloads: {metrics.active_workloads}")
print(f"Total GPU hours: {metrics.total_gpu_hours}")
print(f"Avg GPU utilization: {metrics.gpu_utilization.avg:.1f}%")
Parameters:
ParameterTypeDescription
time_rangestrlast_1h, last_6h, last_24h, last_7d, last_30d
initiative_idstrFilter by team
cluster_idstrFilter by cluster
group_bystrGroup by field (e.g., “initiative_id”)
Returns: GlobalMetrics with aggregated data

get_batch_workload_metrics()

Get metrics for multiple workloads at once.
data = client.get_batch_workload_metrics(
    status=["RUNNING"],
    time_range="last_24h",
    sort_by="gpu_utilization",
    limit=10
)
Returns: Dict with workload metrics ranked by the specified metric

get_workload_stats()

Get aggregated workload statistics.
stats = client.get_workload_stats(
    time_range="last_7_days",
    initiative_id="team-id"
)

print(f"Total workloads: {stats.total}")
for status, count in stats.by_status.items():
    print(f"  {status}: {count}")
Returns: WorkloadStats with total, by_status, by_job_class

Teams

list_teams()

List all teams accessible to the current user.
teams = client.list_teams()

for team in teams:
    print(f"{team.name} ({team.id})")
Returns: List of Team objects

create_team()

Create a new team.
team = client.create_team(
    name="ML Research",
    description="Machine learning research team",
    tags={"department": "engineering"}
)

print(f"Created team: {team.id}")
Parameters:
ParameterTypeDescription
namestrTeam name (1-255 chars)
descriptionstrTeam description (max 1000 chars)
parent_idstrParent team ID for sub-teams
tagsdictKey-value pairs for tagging
Returns: Team object

get_team()

Get details of a specific team.
team = client.get_team("team-id")
print(f"Team: {team.name}")
print(f"Description: {team.description}")
Returns: Team object

Templates

list_templates()

List available workload templates.
templates = client.list_templates(
    scope="ORGANIZATION",
    include_system=True
)

for template in templates:
    print(f"{template.name}: {template.gpu_type}")
Parameters:
ParameterTypeDescription
namestrFilter by name (partial match)
scopestrPERSONAL, PROJECT, or ORGANIZATION
include_systemboolInclude system templates (default: True)
Returns: List of Template objects

get_template()

Get details of a specific template.
template = client.get_template("template-id")
print(f"Template: {template.name}")
print(f"GPU Type: {template.gpu_type}")
print(f"GPUs: {template.requested_gpus}")
Returns: Template object

Allocations

list_allocations()

List capacity allocations for a team.
allocations = client.list_allocations(initiative_id="team-id")

for alloc in allocations:
    print(f"{alloc.id}: {alloc.allocated_instances} instances ({alloc.status})")
Returns: List of Allocation objects

create_allocation()

Create a new capacity allocation.
allocation = client.create_allocation(
    initiative_id="team-id",
    reservation_id="reservation-123",
    allocated_instances=4
)

print(f"Allocated: {allocation.id}")
Returns: Allocation object

get_allocation()

Get details of a specific allocation.
allocation = client.get_allocation("allocation-id")
print(f"Status: {allocation.status}")
print(f"Instances: {allocation.allocated_instances}")
Returns: Allocation object

Capacity

get_capacity()

Check available GPU capacity and budget.
capacity = client.get_capacity()

# Budget info
print(f"Allocated: {capacity.budget.allocated} GPU-hours")
print(f"Used: {capacity.budget.used} GPU-hours")
print(f"Available: {capacity.budget.available} GPU-hours")

# Capacity pools
for pool in capacity.pools:
    print(f"{pool.name} ({pool.gpu_type}):")
    print(f"  Available: {pool.available_gpus}/{pool.total_gpus} GPUs")

health()

Check API health status.
health = client.health()
print(f"Status: {health.status}")
print(f"Version: {health.version}")

Enums

JobStatus

from chamber_sdk import JobStatus

JobStatus.PENDING      # Submitted, awaiting scheduling
JobStatus.QUEUED       # Scheduled, waiting for resources
JobStatus.STARTING     # Resources allocated, starting
JobStatus.RUNNING      # Currently executing
JobStatus.COMPLETED    # Finished successfully
JobStatus.FAILED       # Workload failed
JobStatus.PREEMPTED    # Preempted (elastic jobs)
JobStatus.CANCELLED    # Cancelled by user

JobClass

from chamber_sdk import JobClass

JobClass.RESERVED   # Guaranteed capacity, non-preemptible
JobClass.ELASTIC    # Uses idle capacity, can be preempted
JobClass.DISCOVERED # External workload discovered by Chamber

ScalingMode

from chamber_sdk import ScalingMode

ScalingMode.GANG     # All-or-nothing scheduling
ScalingMode.ELASTIC  # Can scale between min/max pods

AllocationStatus

from chamber_sdk import AllocationStatus

AllocationStatus.ACTIVE    # Currently active
AllocationStatus.RELEASED  # Released by user
AllocationStatus.EXPIRED   # Expired

Data Models

Workload

FieldTypeDescription
idstrUnique workload ID
namestrWorkload name
statusJobStatusCurrent status
requested_gpusfloatNumber of GPUs
gpu_typestrGPU model
job_classJobClassRESERVED, ELASTIC, or DISCOVERED
priorityintPriority level
tagsdictKey-value tags
metadatadictCustom metadata
template_idstrTemplate ID if used
initiative_idstrTeam ID
submitted_bystrUser ID
submitted_atdatetimeSubmission timestamp
started_atdatetimeStart timestamp
completed_atdatetimeCompletion timestamp
failure_reasonstrFailure message (if failed)

Team

FieldTypeDescription
idstrTeam ID
namestrTeam name
descriptionstrTeam description
parent_idstrParent team ID
tagsdictKey-value tags
created_atdatetimeCreation timestamp

Template

FieldTypeDescription
idstrTemplate ID
namestrTemplate name
scopestrPERSONAL, PROJECT, or ORGANIZATION
descriptionstrTemplate description
gpu_typestrGPU model
requested_gpusintNumber of GPUs
k8s_manifeststrKubernetes manifest
configurationdictAdditional configuration

Allocation

FieldTypeDescription
idstrAllocation ID
initiative_idstrTeam ID
reservation_idstrReservation ID
allocated_instancesintNumber of instances
statusAllocationStatusAllocation status
created_atdatetimeCreation timestamp

Exceptions

All exceptions inherit from ChamberError:
from chamber_sdk import (
    ChamberError,
    AuthenticationError,
    AuthorizationError,
    NotFoundError,
    ValidationError,
    RateLimitError,
    ServerError,
    DockerError,  # For run() errors
)
from chamber_sdk.run import RegistryAuthError  # Specific registry auth failures

try:
    job = client.get_workload("invalid-id")
except NotFoundError:
    print("Workload not found")
except AuthenticationError:
    print("Invalid or expired token")
except RateLimitError as e:
    print(f"Rate limited: {e.message}")
except ChamberError as e:
    print(f"API error: {e}")
ExceptionHTTP CodeDescription
AuthenticationError401Invalid or expired token
AuthorizationError403Insufficient permissions
NotFoundError404Resource not found
ValidationError400Invalid request parameters
RateLimitError429Rate limit exceeded
ServerError5xxServer-side error
DockerErrorN/ADocker build/push error (for run())
RegistryAuthErrorN/ARegistry authentication/authorization failure (subclass of DockerError)

Handling run() Errors

from chamber_sdk import ChamberClient, DockerError
from chamber_sdk.run import RegistryAuthError

try:
    job = client.run("./my-project", registry="...", team="...")
except RegistryAuthError as e:
    # Registry authentication failed (401/403 from registry)
    # e.registry contains the registry URL
    # e.detail contains the error details
    print(f"Registry auth failed for {e.registry}: {e.detail}")
except DockerError as e:
    # Docker not installed, daemon not running, build failed, or push failed
    print(f"Docker error: {e}")
except FileNotFoundError as e:
    # No Python entrypoint found in project
    print(f"Project error: {e}")
except ValueError as e:
    # Missing required parameters (registry, team)
    print(f"Configuration error: {e}")