Quick Start Guide
This guide will help you get started with Serverless GPU API (SGC API) quickly.
Basic Usage
Here’s a simple example of how to use Serverless GPU API:
from serverless_gpu.launcher import distributed
from serverless_gpu.compute import GPUType
from serverless_gpu import runtime as rt
@distributed(gpus=8, gpu_type=GPUType.A10, remote=True)
def hello_world(name: str):
if rt.get_local_rank() == 0:
print('hello', name)
return rt.get_global_rank()
hello_world.distributed(name='world')
Here’s another example of training a simple MLP model using PyTorch’s Distributed Data Parallel (DDP) module:
import os
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
import torch.nn as nn
import torch.optim as optim
from serverless_gpu import distributed
import mlflow
def setup():
dist.init_process_group("nccl")
torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
def cleanup():
dist.destroy_process_group()
class SimpleMLP(nn.Module):
def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.net(x)
@distributed(gpus=2, gpu_type='a10', remote=True)
def train(num_epochs=3, batch_size=64) -> None:
setup()
rank = dist.get_rank()
device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
with mlflow.start_run():
# Model: MLP with hidden layers
model = SimpleMLP().to(device)
model = DDP(model, device_ids=[device])
# Dummy regression dataset
x = torch.randn(5000, 10)
y = 3 * x.sum(dim=1, keepdim=True) + torch.randn(5000, 1) * 0.1
dataset = TensorDataset(x, y)
sampler = DistributedSampler(dataset)
dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_fn = nn.MSELoss()
for epoch in range(num_epochs):
sampler.set_epoch(epoch)
model.train()
total_loss = 0.0
for step, (xb, yb) in enumerate(dataloader):
xb, yb = xb.to(device), yb.to(device)
optimizer.zero_grad()
loss = loss_fn(model(xb), yb)
# Log per-step loss metrics to MLflow
mlflow.log_metric("mse_loss", loss.item(), step=step)
loss.backward()
optimizer.step()
total_loss += loss.item() * xb.size(0)
if rank == 0:
# Log average loss metrics
avg_loss = total_loss / len(dataloader.dataset)
mlflow.log_metric("avg_loss", avg_loss)
print(f"[Epoch {epoch}] Avg Loss: {avg_loss:.4f}")
if rank == 0:
# Save checkpoint()
ckpt = {
"MODEL_STATE": model.state_dict(),
"EPOCHS_RUN": epoch,
}
torch.save(ckpt, "/tmp/model.pt")
cleanup()
train.distributed(num_epochs=3, batch_size=64)
Pool Configuration
By default, SGC API uses reserved GPU pools. To use on-demand GPUs instead, set the environment variable:
import os
# Enable reserved pool usage
os.environ['DATABRICKS_USE_RESERVED_GPU_POOL'] = 'false'
To specify the reserved pool to use, set the environment variable:
import os
# Specify the reserved pool ID
os.environ['DATABRICKS_REMOTE_GPU_POOL_ID'] = 'your-pool-id'
These envrionment varaibles must be set outside of the @distributed call. Once configured, the subsequent @distributed call will use the specified reserved pool.
Ray Integration
Serverless GPU API also supports Ray for distributed computing. Here’s an example using the ray_launch decorator:
import os
import os
from serverless_gpu.ray import ray_launch
@ray_launch(gpus=16, remote=True, gpu_type='h100')
def foo():
import os
import ray
print(ray.state.available_resources_per_node())
return 1
foo.distributed()
The ray_launch decorator automatically sets up a Ray cluster across all allocated GPUs and provides
a Ray environment for your function to use. This is particularly useful for Ray-based distributed
machine learning workloads.
Advanced Ray example
The following example launches an 8xH100 Ray cluster for multi-GPU actor training. The Ray driver is run on rank 0, while every node joins the cluster and reports metrics back to Databricks.
import os
from serverless_gpu.ray import ray_launch
from serverless_gpu.compute import GPUType
@ray_launch(gpus=8, gpu_type=GPUType.H100, remote=True)
def ray_train(num_epochs: int = 2, steps_per_epoch: int = 50):
import ray
import torch
# Ensure the driver connects to the cluster started by ray_launch
ray.init(address="auto", ignore_reinit_error=True)
@ray.remote(num_gpus=1)
class Trainer:
def __init__(self, shard_id: int):
self.device = f"cuda:{shard_id}"
self.model = torch.nn.Linear(32, 8).to(self.device)
self.optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-3)
def train_epoch(self, steps: int):
losses = []
for _ in range(steps):
x = torch.randn(64, 32, device=self.device)
y = torch.randn(64, 8, device=self.device)
loss = torch.nn.functional.mse_loss(self.model(x), y)
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad(set_to_none=True)
losses.append(loss.item())
return sum(losses) / len(losses)
# Pin one actor per GPU and aggregate results on the driver
actors = [Trainer.remote(i) for i in range(8)]
for epoch in range(num_epochs):
losses = ray.get([actor.train_epoch.remote(steps_per_epoch) for actor in actors])
print(f"Epoch {epoch}: avg loss={sum(losses)/len(losses):.4f}")
ray_train.distributed(num_epochs=3, steps_per_epoch=25)
Next Steps
Read the Overview to understand Serverless GPU API’s architecture
Check out the API Reference for detailed API documentation
Explore other examples in the repository