跳转至

Internal APIs

这部分文档记录了 RoundPipe 面向开发者的内部 API。用户通常不需要直接使用这些 API。

This section documents the internal APIs of RoundPipe that are intended for developers. Users typically do not need to interact with these APIs directly.

attribute

Module defining parameter attributes for RoundPipe.

LayerAttribute

Class storing layer attributes for RoundPipe. Events are used to synchronize layer param and grad between device, host, and optimizer threads.

Event flow (W&C = wait and clear): {wait param_copied -> W&C param_upload_started -> launch forward thread[-> set param_upload_started]} -> {wait grad_copied -> W&C grad_download_started -> launch backward thread[-> set grad_download_started]} -> {W&C param_copied -> W&C grad_copied -> launch optimizer thread[wait param_upload_started -> do copy -> set param_copied -> wait grad_download_started -> do copy -> set grad_copied]} -> repeat

Attributes:

  • param_copied (AnnotatedEvent) –

    Event indicating parameter copy from optimizer to model is done.

  • param_upload_started (AnnotatedEvent) –

    Event indicating parameter upload from CPU to GPU has started and CUDA event has been recorded.

  • param_uploaded (Event) –

    CUDA event indicating parameter upload to device is done.

  • grad_copied (AnnotatedEvent) –

    Event indicating gradient copy from model to optimizer is done.

  • grad_download_started (AnnotatedEvent) –

    Event indicating gradient download from GPU to CPU has started and CUDA event has been recorded.

  • grad_downloaded (Event) –

    CUDA event indicating gradient download to CPU is done.

backward_fence(clear=True)

Fence for backward pass to wait for parameter and gradient ready.

Parameters:

  • clear (bool, default: True ) –

    Whether to clear events for doing backward.

forward_backward_fence()

Fence for fused forward backward pass to wait for parameter and gradient ready.

forward_fence(clear=True)

Fence for forward pass to wait for parameter ready.

Parameters:

  • clear (bool, default: True ) –

    Whether to clear events for doing forward.

ParamAttribute

Class storing parameter attributes for RoundPipe. Note that parameters may be shared among multiple layers.

Attributes:

  • grad_cpu (Dict[int, Optional[Tensor]]) –

    Dictionary mapping layer IDs to gradient tensors stored on CPU.

  • grad_buffer (Dict[int, Optional[Tensor]]) –

    Dictionary mapping layer IDs to a temporary location that hold reference to the respective gradient. This is used to avoid memory re-allocation between gradient downloads.

  • optim (Optional[Parameter]) –

    Tensor storing a copy of the parameter for optimizer use.

  • optim_grad_buffer (Optional[Tensor]) –

    Hold reference to optimizer gradient to avoid reallocation.

__init__()

Initialize the ParamAttribute with the given data.

get(t) classmethod

Get the ParamAttribute attached to a tensor.

has(t) classmethod

Check if a tensor has a ParamAttribute attached.

set(t, layer_id) classmethod

Attach a ParamAttribute to a tensor.

Parameters:

  • t (Parameter) –

    Tensor to attach the attribute to.

  • layer_id (Optional[int]) –

    Object ID of the layer this parameter belongs to.

batch

Utilities for handling microbatches and host/device synchronization.

The helpers in this module are consumed by RoundPipe to convert arbitrary argument trees into iterable structures, split them into microbatches, keep track of transfer events, and merge results back on the host once transfers finish.

Attributes:

  • avg_reducer (AvgReducer) –

    Predefined reducer that averages scalar losses across microbatches.

AvgReducer

Bases: _CustomReducer

Reducer that averages scalar losses across microbatches.

Attributes:

  • sum (Tensor) –

    Running sum of scalar losses.

  • count (int) –

    Number of scalar losses accumulated.

reduce(reduced_val, new_val)

Update the running average with a new scalar loss.

Parameters:

  • reduced_val (Optional[Tensor]) –

    Current average loss tensor. A value of None will reset the reducer state.

  • new_val (Tensor) –

    New scalar loss tensor to incorporate.

Returns:

  • Tensor

    Updated average loss tensor.

Raises:

  • AssertionError

    If the same AvgReducer instance is used across multiple reductions at the same time.

Batch

Holds flattened microbatch state, labels, and CUDA events.

Attributes:

  • flatten_states (List[List[Any]]) –

    Flattened argument tensors per microbatch.

  • flatten_specs (List[TreeSpec]) –

    TreeSpec objects per microbatch for reconstruction.

  • forward_events (List[Sequence[Event]]) –

    CUDA events signaling when forward transfers complete.

  • backward_events (List[Sequence[Event]]) –

    CUDA events signaling when gradients arrive on host.

  • num_microbatch (Final[int]) –

    Actual number of microbatches generated.

  • label_list (List[Any]) –

    Labels aligned with each microbatch.

  • loss_list (List[Union[Sequence[Tensor], Tensor]]) –

    Loss tensors accumulated per microbatch.

  • loss_ready (Event) –

    CUDA event signaling when all losses are ready on host.

__init__(args, kwargs, run_config, label=None)

Split inputs and reconcile chained RoundPipePackedData sources.

Parameters:

  • args (Tuple) –

    Positional arguments provided to the wrapped model.

  • kwargs (Dict[str, Any]) –

    Keyword arguments provided to the wrapped model.

  • run_config (FullRoundPipeRunConfig) –

    Effective runtime configuration.

  • label (Any, default: None ) –

    Optional label payload for training flows.

Raises:

  • AssertionError

    If custom splitters return malformed structures.

dump(run_config)

Merge microbatch outputs according to the provided config.

When merge_output is False a RoundPipePackedData is returned so downstream RoundPipe models can pipeline directly without synchronizing to CPU. Otherwise this method blocks until the last CUDA event has completed and merges the flattened outputs.

Parameters:

Returns:

  • Any

    Pytree produced by merging the flattened buffers or the packed data wrapper in the same pytree structure when passthrough is requested.

RoundPipePackedData

Bases: list

Container that couples host tensors with CUDA transfer markers.

Attributes:

  • transfer_event (List[Tuple[Event, Event]]) –

    List of (forward_event, backward_event) tuples per microbatch that signals when the data and, optionally, gradients are ready on the host.

__init__(data, transfer_event)

Build the packed container.

Parameters:

  • data (List[Any]) –

    Microbatch outputs stored on the host.

  • transfer_event (List[Tuple[Event, Event]]) –

    CUDA events marking forward/ backward readiness for each microbatch result.

synchronize()

Block until all hosted tensors are fully transferred.

Returns:

  • None

    The call completes only after forward events finish.

guess_split_spec(data, expected_batchsize=None)

Infer how torch.distributed should chunk a nested argument tree.

Parameters:

  • data (Any) –

    Arbitrary pytree holding tensors and Python objects.

  • expected_batchsize (Optional[int], default: None ) –

    Optional batch-size hint used to identify tensors that should be chunked along dimension 0.

Returns:

  • Tuple[Any, Optional[int]]

    A tuple (spec, inferred_batch_size) where spec mirrors data and stores TensorChunkSpec or _Replicate entries, and inferred_batch_size is the common chunkable size if one could be identified, otherwise None.

context

"Context managers for RoundPipe."

Attributes:

  • flags (local) –

    Thread-local storage for context flags.

ForwardCtx

Context manager to mark this scope is doing forward pass.

Attributes:

  • save_for_recompute (Callable[..., None]) –

    A callable to save data for recomputation.

OptimizerCtx

Context manager to mark this scope is doing optimizer related operations. Under this scope, RoundPipe will redirect .parameters() and .named_parameters() to .optim_parameters() and .named_optim_parameters() respectively.

RecomputeCtx

Context manager to mark this scope is doing recompute.

Attributes:

  • recompute_data (Any) –

    Data saved for recomputation.

doing_optimizer()

Check if current scope is doing optimizer related operations.

doing_recompute()

Check if current scope is doing recompute.

get_recompute_data()

Get data saved for recomputation in current recompute context. This will always return a tuple even if you saved a single item.

save_for_recompute(*data)

Save data for recomputation in current forward context. Tensor to be saved cannot require gradients. This function can be called at most once from each layer. If forward gradients are not enabled, this is a no-op.

device

Managing each CUDA device on dedicated threads.

Attributes:

  • device_list (List[DeviceManager]) –

    List of instantiated DeviceManager objects.

  • cur_device (int) –

    Index tracking the next device to schedule work on.

DeviceManager

Manages memory, CUDA streams and kernel launch of a single device.

Info

The device controller threads is designed to run multiple forward / backward jobs concurrently on different devices. It is NOT designed to run jobs while doing other work on the main thread at the same time. The main thread should wait for all device threads to return before proceeding to the next step.

Attributes:

  • id (int) –

    Integer identifier matching cuda:{id}.

  • device (device) –

    Corresponding Pytorch CUDA device handle.

  • shutdown (bool) –

    Flag to signal controller thread shutdown.

  • param_upstream (Stream) –

    Stream for model parameter uploads.

  • upstream (Stream) –

    Stream handling activation uploads.

  • compute_stream (Stream) –

    Default compute stream bound to device.

  • downstream (Stream) –

    Stream used for downloads to host.

  • mem_manager (InterStreamMemManager) –

    Inter-stream memory manager for this device.

  • upload_mark (List[Event]) –

    Outstanding events that track chunked transfers.

  • is_idle (AnnotatedSemaphore) –

    Semaphore indicating when the controller can accept work.

  • job_arrived (Semaphore) –

    Semaphore signaled when a new job is queued.

  • cur_job (Optional[Tuple[Callable[..., None], List[RoundPipeRunContext], Tuple]]) –

    Tuple describing the pending job type plus payload.

  • controller_thread (RoundPipeThread) –

    Background thread executing queued jobs.

__init__(id, device)

Initialize CUDA streams and spawn the controller thread.

Parameters:

  • id (int) –

    Sequential identifier assigned when enumerating devices.

  • device (device) –

    Corresponding Pytorch CUDA device handle.

controller()

Background loop that executes jobs as they arrive.

The loop blocks on job_arrived and dispatches to the appropriate runtime helper based on job_type.

flush_upload_marks()

Return outstanding upload markers and clear the queue.

Returns:

  • List[Event]

    List of still-pending upload events. The list captures gaps between activation uploads so parameter transfers can be staged safely.

launch_backward(layer_group_id, layer_attrs, run_context)

Schedule a backward-only job on this device.

Parameters:

  • layer_group_id (int) –

    Index into the execute plan's backward ordering.

  • layer_attrs (List[LayerAttribute]) –

    List of layer attributes for the layers in this group.

  • run_context (List[RoundPipeRunContext]) –

    Per-microbatch execution contexts.

launch_forward(layer_group_id, layer_attrs, batch, run_context)

Schedule a forward-only job on this device.

Parameters:

  • layer_group_id (int) –

    Index into the execute plan's forward ordering.

  • layer_attrs (List[LayerAttribute]) –

    List of layer attributes for the layers in this group.

  • batch (Batch) –

    Batch container shared across microbatches.

  • run_context (List[RoundPipeRunContext]) –

    Per-microbatch execution contexts.

launch_forward_backward(layer_attrs, batch, run_context, loss_fn, return_outputs)

Run forward + backward in the same pass for final layers.

Parameters:

  • batch (Batch) –

    Batch object generated for the training iteration.

  • run_context (List[RoundPipeRunContext]) –

    Per-microbatch execution contexts.

  • loss_fn (Callable[[Any, Any], Union[Sequence[Tensor], Tensor]]) –

    Callable that consumes outputs + labels and returns loss.

  • return_outputs (bool) –

    Whether to return model outputs from forward pass.

mark_upload()

Record an event on the upload stream after enqueuing H2D copies.

Returns:

  • None

    The recorded event is appended to upload_mark.

shutdown_controller()

Shut down the device controller thread.

wait_stream(waiter, wait_for)

Make waiter stream wait on wait_for stream.

Parameters:

  • waiter (Stream) –

    Stream that will wait.

  • wait_for (Stream) –

    Stream to wait on.

InterStreamMemManager

Handles tensor deallocation across multiple CUDA streams.

Attributes:

  • free_queue (Dict[Tuple[Stream, Stream], List[Tuple[UntypedStorage, Tuple[Stream, ...]]]]) –

    Maps stream pairs to lists of tensor storage waiting to be freed after synchronization.

  • in_use (Dict[UntypedStorage, List[Stream]]) –

    Maps tensor storages to the list of streams that have used them.

__init__(*streams)

Initialize tracking structures for inter-stream memory management.

Parameters:

  • streams (Stream, default: () ) –

    Streams that will be used by tensors.

flush()

Flush all tracked tensors into free queues.

free(storage, *use_streams)

Hold tensor storage reference before all streams used this storage synchronize back to the owning stream.

Parameters:

  • storage (UntypedStorage) –

    Tensor storage to be freed.

  • use_streams (Stream, default: () ) –

    Ordered streams that have used storage.

free_all()

Clear all tracked tensors from free queues

record_stream(tensor, from_stream, use_stream)

Record that tensor is used on use_stream after being used on from_stream.

Parameters:

  • tensor (Tensor) –

    Tensor whose usage is being tracked.

  • from_stream (Stream) –

    Stream where the tensor was last used.

  • use_stream (Stream) –

    Stream where the tensor will be used next.

Raises:

  • AssertionError

    If the last recorded stream for tensor does not match from_stream.

stream_synced(waiter, wait_for)

When waiter stream has synchronized on wait_for, release all tensors waiting on this synchronization.

Parameters:

  • waiter (Stream) –

    Stream that has synchronized.

  • wait_for (Stream) –

    Stream that waiter has synchronized on.

gc_collect()

Release all tracked tensors from inter-stream memory managers on all devices.

get_min_gpu_memory()

Return the minimum free GPU memory (in GB) across all managed devices.

Returns:

  • float

    Minimum GPU memory size in GB.

get_next_device()

Round-robin iterator over instantiated DeviceManager objects.

Returns:

get_num_devices()

Return the number of managed GPU devices.

Returns:

  • int

    Number of instantiated DeviceManager objects.

grad_scaler

A gradient scaler for mixed precision training in RoundPipe.

GradScaler

Helps perform the steps of gradient scaling conveniently. The GradScaler is designed to be API compatible with torch.amp.GradScaler.

Info

Object of this class will be access both from the main thread and from the optimizer stream. Care must be taken to avoid data corruption due to race conditions. All methods in this class must check which stream they are called from, and behave accordingly. The following object members access rules are applied:

  • Both: next_scale, scaler_updated
  • Optimizer stream only: main_scaler
  • Main thread only: scale_scaler
  • Read only: enabled

Attributes:

  • enabled (Final[bool]) –

    Whether gradient scaling is enabled.

  • main_scaler (GradScaler) –

    The main GradScaler tracks the scale used for unscaling and updating.

  • scale_scaler (GradScaler) –

    This GradScaler is only used for applying scaling to outputs.

  • next_scale (Tensor) –

    The next scale factor to be used.

  • scaler_updated (AnnotatedEvent) –

    An event to signal when the scaler has been updated.

__init__(init_scale=2.0 ** 16, growth_factor=2.0, backoff_factor=0.5, growth_interval=2000, enabled=True)

Parameters:

  • init_scale (float, default: 2.0 ** 16 ) –

    Initial scale factor.

  • growth_factor (float, default: 2.0 ) –

    Factor by which the scale is multiplied during update if no inf/NaN gradients occur for growth_interval consecutive iterations.

  • backoff_factor (float, default: 0.5 ) –

    Factor by which the scale is multiplied during update if inf/NaN gradients occur in an iteration.

  • growth_interval (int, default: 2000 ) –

    Number of consecutive iterations without inf/NaN gradients that must occur for the scale to be multiplied by growth_factor.

  • enabled (bool, default: True ) –

    If False, disables gradient scaling. step simply invokes the underlying optimizer.step(), and other methods become no-ops.

get_backoff_factor(up_to_date=False)

Parameters:

  • up_to_date (bool, default: False ) –

    If True, make sure to return the latest backoff factor, but will block and synchronize with the optimizer stream. Else, may return a stale value not before the previous GradScaler.update().

Returns:

  • float

    a Python float containing the scale backoff factor.

get_growth_factor(up_to_date=False)

Parameters:

  • up_to_date (bool, default: False ) –

    If True, make sure to return the latest growth factor, but will block and synchronize with the optimizer stream. Else, may return a stale value not before the previous GradScaler.update().

Returns: a Python float containing the scale growth factor.

get_growth_interval(up_to_date=False)

Parameters:

  • up_to_date (bool, default: False ) –

    If True, make sure to return the latest growth interval, but will block and synchronize with the optimizer stream. Else, may return a stale value not before the previous GradScaler.update().

Returns:

  • int

    a Python int containing the growth interval.

get_scale()

Return the current scale factor. Result will adapt to which stream this is called from.

Returns:

  • float

    a Python float containing the current scale, or 1.0 if scaling is disabled.

scale(outputs)

Multiplies ('scales') a tensor or list of tensors by the scale factor.

Returns scaled outputs. If this instance of GradScaler is not enabled, outputs are returned unmodified.

Parameters:

  • outputs (Union[Tensor, Iterable[Tensor]]) –

    Outputs to scale.

set_backoff_factor(new_factor)

Set a new scale backoff factor.

Parameters:

  • new_factor (float) –

    Value to use as the new scale backoff factor.

set_growth_factor(new_factor)

Set a new scale growth factor.

Parameters:

  • new_factor (float) –

    Value to use as the new scale growth factor.

set_growth_interval(new_interval)

Set a new growth interval.

Parameters:

  • new_interval (int) –

    Value to use as the new growth interval.

step(optimizer, *args, **kwargs)

Invoke unscale_(optimizer) followed by parameter update, if gradients are not infs/NaN.

step carries out the following two operations:

  1. Internally invokes unscale_(optimizer) (unless unscale_ was explicitly called for optimizer earlier in the iteration). As part of the unscale_, gradients are checked for infs/NaNs.
  2. If no inf/NaN gradients are found, invokes optimizer.step() using the unscaled gradients. Otherwise, optimizer.step() is skipped to avoid corrupting the params.

*args and **kwargs are forwarded to optimizer.step().

Parameters:

  • optimizer (Optimizer) –

    Optimizer that applies the gradients.

  • args (Any, default: () ) –

    Any arguments.

  • kwargs (Any, default: {} ) –

    Any keyword arguments.

Returns:

  • Optional[float]

    If it's disabled, returns the return value of optimizer.step(*args, **kwargs). If enabled, it returns the value only if we are on the optim stream.

unscale_(optimizer)

Divides ("unscales") the optimizer's gradient tensors by the scale factor. unscale_ is optional, serving cases where you need to modify or inspect gradients between the backward pass(es) and step. If unscale_ is not called explicitly, gradients will be unscaled automatically during step.

If this is called from the main thread, the unscale operation will be launched and synchronized on the optimizer stream.

Parameters:

  • optimizer (Optimizer) –

    Optimizer that owns the gradients to be unscaled.

update(new_scale=None)

Update the scale factor. This function must be called from the main thread.

If any optimizer steps were skipped the scale is multiplied by backoff_factor to reduce it. If growth_interval unskipped iterations occurred consecutively, the scale is multiplied by growth_factor to increase it.

Passing new_scale sets the new scale value manually. (new_scale is not used directly, it's used to fill GradScaler's internal scale tensor. So if new_scale was a tensor, later in-place changes to that tensor will not further affect the scale GradScaler uses internally.)

Parameters:

  • new_scale (Optional[Union[float, Tensor]], default: None ) –

    New scale factor.

update_kernel(new_scale)

Kernel function to update the scale factor on the optimizer stream.

Parameters:

  • new_scale (Optional[Union[float, Tensor]]) –

    New scale factor.

memory

CPU memory managing utilities for RoundPipe.

pin_module_alloc(mod)

Pin module parameters and buffers to CPU memory.

Parameters:

  • mod (Module) –

    Module to pin.

pin_module_register(mod)

Pin module parameters and buffers to CPU memory using cudaHostRegister.

Parameters:

  • mod (Module) –

    Module to pin.

models

Model wrappers for RoundPipe-supported models.

This module provides utilities to wrap supported model architectures with RoundPipe's sequential execution presets. Each supported model type has a corresponding wrapper module that implements the necessary integration.

Attributes:

  • DISABLE_TORCH_COMPILE (bool) –

    flag indicating whether to disable torch.compile in models.

  • SUPPORTED_MODELS (Dict[str, str]) –

    dictionary mapping model type names to their corresponding wrapper module paths.

wrap_model(model, **roundpipe_kwargs)

Wrap a supported model with RoundPipe's sequential preset.

Parameters:

  • model (Any) –

    The model instance to be wrapped.

  • **roundpipe_kwargs (Any, default: {} ) –

    Additional keyword arguments specific to the model wrapper.

Returns:

Raises:

  • NotImplementedError

    If the model type is not supported or if the model class does not match the expected class for its type.

optim

RoundPipe's implementation of CPU optimizers. The behavior is consistent to that of PyTorch optimizers, and optimized for fp32 stepping on CPU.

Adam

Bases: Optimizer

Implements Adam algorithm with fp32 stepping on CPU.

__init__(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.0, amsgrad=False, *, foreach=None, maximize=False, capturable=False, differentiable=False, fused=None, decoupled_weight_decay=False)

Parameters:

  • params (ParamsT) –

    iterable of parameters or named_parameters to optimize or iterable of dicts defining parameter groups. When using named_parameters, all parameters in all groups should be named

  • lr (Union[float, Tensor], default: 0.001 ) –

    learning rate.

  • betas (Tuple[Union[float, Tensor], Union[float, Tensor]], default: (0.9, 0.999) ) –

    coefficients used for computing running averages of gradient and its square

  • eps (float, default: 1e-08 ) –

    term added to the denominator to improve numerical stability

  • weight_decay (float, default: 0.0 ) –

    weight decay coefficient

  • amsgrad (bool, default: False ) –

    whether to use the AMSGrad variant of this algorithm from the paper On the Convergence of Adam and Beyond

  • maximize (bool, default: False ) –

    maximize the objective with respect to the params, instead of minimizing

  • decoupled_weight_decay (bool, default: False ) –

    if True, this optimizer is equivalent to AdamW and the algorithm will not accumulate weight decay in the momentum nor variance.

  • foreach (Optional[bool], default: None ) –

    Compatible placeholder for PyTorch's Adam optimizer.

  • capturable (bool, default: False ) –

    Compatible placeholder for PyTorch's Adam optimizer.

  • differentiable (bool, default: False ) –

    Compatible placeholder for PyTorch's Adam optimizer.

  • fused (Optional[bool], default: None ) –

    Compatible placeholder for PyTorch's Adam optimizer.

__setstate__(state)

Sets the state of the optimizer. This method is used when loading a saved optimizer state. It ensures that all necessary keys are present in the state dictionary

Parameters:

  • state (Dict[str, Any]) –

    The state dictionary to set.

step(closure=None)

Performs a single optimization step.

Parameters:

  • closure (Optional[Callable[[], Any]], default: None ) –

    A closure that reevaluates the model and returns the loss.

Returns:

  • Any

    The loss value if a closure is provided, otherwise None.

optim_builder

Build, load and cache CPP implementation of optimizer functions.

Attributes:

  • loaded_optim_functions (Dict[str, Callable]) –

    A dictionary caching loaded optimizer functions.

get_cpu_flags_hash()

Get a hash string representing the CPU flags.

Returns:

  • str

    A string hash of the CPU flags.

get_optim_function(name)

Get the optimizer function by name.

Parameters:

  • name (str) –

    The name of the optimizer function.

Returns:

  • Callable

    The optimizer function.

load_optim_function(name)

Compile and load the optimizer function from source by name.

Parameters:

  • name (str) –

    The name of the optimizer function.

optim_stream

Optimizer execute stream and related functions.

Attributes:

  • KernelQueueType

    queue.Queue[Tuple[Callable, Tuple, Dict[str, Any]]]

  • kernel_queue (KernelQueueType) –

    Queue of optimizer kernel tasks.

  • OPTIM_STOP

    Sentinel object to signal optimizer stream shutdown.

  • optim_thread (RoundPipeThread) –

    Daemon thread that executes optimizer tasks.

controller()

Optimizer Stream thread function.

launch_optim_kernel(fn, *args, **kwargs)

Launch an optimizer kernel on the optimizer stream.

Parameters:

  • fn (Callable) –

    Callable that launches the optimizer kernel.

  • *args (Any, default: () ) –

    Positional arguments forwarded to fn.

  • **kwargs (Any, default: {} ) –

    Keyword arguments forwarded to fn.

on_optim_stream()

Check if the current thread is the optimizer stream.

Returns:

  • bool

    True if the current thread is the optimizer stream, False otherwise.

shutdown_optim()

Shut down the optimizer stream.

synchronize_optim()

Synchronize the optimizer stream with the main thread.

profile

Profiling utilities for RoundPipe.

Attributes:

  • PROFILER_TYPE (Optional[str]) –

    Type of profiler detected from environment variables.

annotate(name, color=None)

Return a context manager that instruments profiler annotations.

Parameters:

  • name (str) –

    Label that appears in profiling timelines.

  • color (Optional[str], default: None ) –

    Color that appear in profiling timelines.

Returns:

  • ContextManager

    The annotation context when profiling is enabled, otherwise contextlib.nullcontext so callers can use with uniformly.

roundpipe

The RoundPipe model wrapper and execution runtime.

AutoRoundPipe

Bases: RoundPipeBase

Provides partial RoundPipe's features over an arbitrary model. This includes optimizer parameter management and async step execution.

Attributes:

  • module_param_uploaded_events

    Events signaling params copied to gpu. This collects all RoundPipe submodules' event lists.

  • module_gradient_ready_events

    Events signaling gradients copied to cpu. This collects all RoundPipe submodules' event lists.

__init__(model, name=None, optim_dtype=None, **kwargs)

Initialize AutoRoundPipe over an arbitrary model.

Parameters:

  • model (Module) –

    Module to wrap.

  • name (Optional[str], default: None ) –

    Optional friendly identifier. Defaults to file:line.

  • optim_dtype (Optional[dtype], default: None ) –

    Data type for optimizer parameters. Defaults to the same as the parameter data type.

  • **kwargs (Any, default: {} ) –

    Placeholder for unused keyword arguments.

forward(*args, **kwargs)

Execute a forward pass.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments forwarded to the underlying model.

  • **kwargs (Any, default: {} ) –

    Keyword arguments forwarded to model.

Returns:

  • Any

    Output produced by the underlying model.

sync_optim_param()

Ensure optimizer updated results are copied back to parameters. This fuction can run in either the main thread or optimizer thread.

RoundPipe

Bases: RoundPipeBase

Wraps an nn.Module with RoundPipe's pipelined execution runtime.

Attributes:

  • model_run_config (RoundPipeRunConfig) –

    Default run configuration used when callers do not override parameters per invocation.

  • layers (List[Module]) –

    Sequence of model layers.

  • num_layers (int) –

    Total number of layers in the pipeline.

  • layer_attrs (List[LayerAttribute]) –

    List of LayerAttribute storing per-layer events.

  • layer_size (List[float]) –

    List of layer sizes in GB.

  • model_timer (ModelTimer) –

    ModelTimer measuring per-layer latency.

__init__(model, optim_dtype=None, name=None, model_run_config=RoundPipeRunConfig(), pin_model='alloc')

Convert model storage to pinned tensors and determine pipeline cuts.

A nn.Sequential model is split into layers directly. Arbitrary models are wrapped as a single layer.

Parameters:

  • model (Module) –

    Module to wrap. Can be nn.Sequential or arbitrary model.

  • optim_dtype (Optional[dtype], default: None ) –

    Data type for optimizer parameters. Defaults to the same as param type.

  • name (Optional[str], default: None ) –

    Optional friendly identifier. Defaults to file:line.

  • model_run_config (RoundPipeRunConfig, default: RoundPipeRunConfig() ) –

    Baseline configuration inherited by invocations.

  • pin_model (Literal[alloc, register, off], default: 'alloc' ) –

    alloc: Use torch's pin_memory to pin memory. This is the default and usually has better host-device transfer performance. But it may lead to up to 2x CPU memory usage because pytorch pads all allocation to the power of 2. register: Use cudaHostRegister to pin memory instead of. This reduces CPU memory usage on very large models, but at the cost of ~10% host-device transfer performance. Only available with CUDA. off: Do not pin memory. This is useful when LoRA fine-tuning models larger than CPU memory. Combined with mmap during model loading, it allows linux load data as needed from disk and auto evict used data when OOM happens.

forward(*args, roundpipe_run_config=RoundPipeRunConfig(), **kwargs)

Execute a forward pass, optionally enabling gradients per call.

Parameters:

  • *args (Any, default: () ) –

    Positional arguments forwarded to the underlying model.

  • roundpipe_run_config (RoundPipeRunConfig, default: RoundPipeRunConfig() ) –

    Per-call overrides applied on top of the model-level run configuration.

  • **kwargs (Any, default: {} ) –

    Keyword arguments forwarded to model.

Returns:

  • Any

    Output pytree produced by merging or packing all microbatches.

Raises:

  • RuntimeError

    If gradients are required but disabled globally.

forward_backward(input_args=(), input_kwargs={}, label=None, loss_fn=lambda outputs, labels: outputs, return_outputs=False, run_config=RoundPipeRunConfig())

Run a fused forward and backward pass over all microbatches.

Parameters:

  • input_args (Tuple[Any, ...], default: () ) –

    Positional forward arguments.

  • input_kwargs (Dict[str, Any], default: {} ) –

    Keyword forward arguments.

  • label (Any, default: None ) –

    Label payload aligned with loss_fn expectations.

  • loss_fn (Callable[[Any, Any], Union[Sequence[Tensor], Tensor]], default: lambda outputs, labels: outputs ) –

    Callable that consumes (outputs, labels) and produces a loss tensor or sequence of loss tensors.

  • return_outputs (bool, default: False ) –

    Whether to return the model outputs along with loss.

  • run_config (RoundPipeRunConfig, default: RoundPipeRunConfig() ) –

    Optional per-call overrides for runtime behavior.

Returns:

  • Union[Tuple[Union[List[Tensor], Tensor], Any], List[Tensor], Tensor]

    If return_outputs is False, returns the sum of loss tensor(s) across all microbatches.

    If return_outputs is True, returns a tuple of (loss_sum, merged_outputs) where merged_outputs is the output pytree produced by merging or packing all microbatches.

Raises:

  • AssertionError

    If gradients are not enabled.

sync_optim_param()

Ensure optimizer updated results are copied back to parameters. This fuction can run in either the main thread or optimizer thread.

RoundPipeBase

Bases: Module

Common attributes and methods of RoundPipe and AutoRoundPipe

Attributes:

  • name (str) –

    Human-readable identifier shown in traces/logs.

  • model (Module) –

    The provided module wrapped for RoundPipe execution.

  • original_model (Optional[Module]) –

    Reference to the pre-wrapped module when shimming attribute access.

  • layer_attrs (List[LayerAttribute]) –

    List of LayerAttribute storing per-layer events.

  • optim_dtype (Optional[dtype]) –

    Data type for optimizer parameters.

  • optim_updated (AnnotatedEvent) –

    Event signaling optimizer have updated.

__delattr__(name)

Ensure attribute deletions propagate to wrapped/original modules.

__getattr__(name)

Delegate missing attributes to the wrapped or original module.

__init__(model, name=None, optim_dtype=None)

Initialize the RoundPipe base wrapper.

Parameters:

  • model (Module) –

    Module to wrap.

  • name (Optional[str], default: None ) –

    Optional friendly identifier. Defaults to file:line.

  • optim_dtype (Optional[dtype], default: None ) –

    Data type for optimizer parameters. Defaults to the same as the parameter data type.

__setattr__(name, value)

Mirror attribute writes to wrapped/original models post-initialization.

named_parameters(prefix='', recurse=True, remove_duplicate=True)

Iterator over named parameters. Overrides to warn against direct use, and redirect to optim_named_parameters under optimizer context.

optim_named_parameters(prefix='', remove_duplicate=True)

Iterator over named parameters suitable for optimizer consumption.

Parameters:

  • prefix (str, default: '' ) –

    Prefix to prepend to parameter names.

  • remove_duplicate (bool, default: True ) –

    Whether to skip duplicate parameters.

Yields:

  • Tuple[str, Parameter]

    Tuples of parameter names and their optimizer-ready tensors.

optim_parameters()

Iterator over parameters suitable for optimizer consumption.

Yields:

  • Parameter

    Parameters stored in their optimizer-ready format.

parameters(recurse=True)

Iterator over parameters. Overrides to redirect to optim_parameters under optimizer context.

set_original_model(original_model)

Attach the pre-wrap model for attribute shimming.

Parameters:

  • original_model (Module) –

    Module that should mirror attribute updates.

step(step_fn, is_async=True, *args, **kwargs)

Run an optimizer step using the provided step function. The non-async version ensures optimizer updates are complete before returning. This ensures every training iteration uses the latest parameters. But it will greatly reduce performance, usually not recommended. The async version returns immediately after scheduling the step function. The training iteration will use 1-step-old parameters, which usually works fine in practice.

Warning

Data access in the step function should be limited to optimizer parameters only. Otherwise, you should be aware of potential data races.

Parameters:

  • step_fn (Callable[..., None]) –

    Callable that performs an optimization step.

  • is_async (bool, default: True ) –

    Whether to run the step asynchronously.

  • *args (Any, default: () ) –

    Positional arguments forwarded to step_fn.

  • **kwargs (Any, default: {} ) –

    Keyword arguments forwarded to step_fn.

sync_optim_param()

Ensure optimizer updated results are copied back to parameters.

synchronize()

Synchronize optimizer parameters and backward gradients back to model parameters.

run

Core runtime helpers for executing RoundPipe forward/backward passes.

RoundPipeBatchedBackward

Bases: Function

Autograd node that launches backward passes for all microbatches.

backward(ctx, _, *grad_outputs) staticmethod

Launch backward passes for all microbatches from saved state.

Parameters:

  • ctx (Any) –

    Autograd context populated in forward.

  • *grad_outputs (Any, default: () ) –

    Gradients for the outputs that required grad.

Returns:

  • Tuple[None, None, None, Unpack[Tuple[Optional[Tensor], ...]]]

    Gradients mapping back to all_inputs in the forward path.

Raises:

  • RuntimeError

    If a double backward is attempted.

forward(ctx, roundpipe_context, batch, tag, *all_inputs) staticmethod

Prepare shared backward state for all microbatches.

Parameters:

  • ctx (Any) –

    Autograd context (provided by PyTorch).

  • roundpipe_context (List[RoundPipeRunContext]) –

    Execution contexts per microbatch.

  • batch (Batch) –

    Batch object carrying flattened outputs and events.

  • tag (Tensor) –

    Gradient anchor tensor to ensure output requires grad.

  • *all_inputs (Any, default: () ) –

    Flattened tensors produced during forward.

Returns:

  • Tuple[List[Tuple[int, int]], Unpack[Tuple[Tensor, ...]]]

    Tuple with gradient indices followed by tensors requiring grad.

RoundPipeInputBackward

Bases: Function

Autograd node that reconnects RoundPipe gradients to user inputs.

backward(ctx, *grad_outputs) staticmethod

Return gradients captured in each RoundPipeRunContext.

Parameters:

  • ctx (Any) –

    Autograd context populated during forward.

  • *grad_outputs (Any, default: () ) –

    Gradients w.r.t. the dummy scalar (unused).

Returns:

  • Tuple[None, Unpack[Tuple[Optional[Tensor], ...]]]

    Tuple matching (None, *flattened_input_grads) so that upstream PyTorch graphs receive gradients for their original inputs.

forward(ctx, roundpipe_context, *all_inputs) staticmethod

Anchor upstream gradients spanning multiple microbatches.

Parameters:

  • ctx (Any) –

    Autograd context provided by PyTorch.

  • roundpipe_context (List[RoundPipeRunContext]) –

    List of contexts participating in training.

  • *all_inputs (Any, default: () ) –

    Flattened inputs to track gradients for.

Returns:

  • Tensor

    Dummy scalar tensor that participates in autograd graphs.

RoundPipeMicrobatchBackward

Bases: Function

Autograd node that launches backward pass for a single microbatch.

backward(ctx, device_id, _, *grad_outputs) staticmethod

Kick off backward recomputation for a single microbatch.

Parameters:

  • ctx (Any) –

    Autograd context populated in forward.

  • device_id (Tensor) –

    Tensor encoding which CUDA device to reuse.

  • *grad_outputs (Any, default: () ) –

    Gradients for the tracked outputs.

Returns:

  • Tuple[None, None, Tensor, Unpack[Tuple[Optional[Tensor], ...]]]

    Gradients corresponding to the saved forward inputs.

Raises:

  • RuntimeError

    If a double backward is attempted.

forward(ctx, roundpipe_context, batch, tag, *all_inputs) staticmethod

Prepare backward state for a single microbatch.

Parameters:

  • ctx (Any) –

    Autograd context (provided by PyTorch).

  • roundpipe_context (RoundPipeRunContext) –

    Execution context tied to one microbatch.

  • batch (Batch) –

    Batch with flattened outputs + events for this microbatch.

  • tag (Tensor) –

    Gradient anchor to create dependency between microbatches thus ensuring correct backward order.

  • *all_inputs (Any, default: () ) –

    Flattened arguments provided during forward.

Returns:

  • Tuple[Tensor, List[int], Unpack[Tuple[Tensor, ...]]]

    Tuple (tag, grad_indices, *output_tensors) consumed later.

RoundPipeRunContext

Per-microbatch state shared between forward and backward passes.

Attributes:

  • model (RoundPipe) –

    The running RoundPipe instance.

  • gpu_fwd_layers (List[Module]) –

    GPU-resident layers for current forward batch.

  • gpu_bwd_layers (List[Module]) –

    GPU-resident layers for current backward batch.

  • timer (IterTimer) –

    Iteration timer shared across microbatches.

  • tracker (ModelTracker) –

    Tracker describing fwd/bwd ordering across layers.

  • enable_grad (bool) –

    Whether to store data for backward pass.

  • microbatch_id (int) –

    Index of the microbatch this context tracks.

  • num_microbatches (int) –

    Total number of microbatches scheduled.

  • preserve_rng_state (bool) –

    Whether to snapshot/restore RNG streams.

  • recompute_grain (Literal[stage, layer]) –

    Backward recompute granularity.

  • device_autocast_kwargs (dict) –

    Settings applied to CUDA autocast.

  • cpu_autocast_kwargs (dict) –

    Settings applied to CPU autocast.

  • flatten_inputs (List[List[Any]]) –

    Cached flattened inputs for recompute.

  • flatten_specs (List[Optional[TreeSpec]]) –

    Tree specs that rebuild flattened inputs.

  • recompute_data (List[List[Any]]) –

    Saved data for backward recompute.

  • recompute_data_specs (List[Optional[TreeSpec]]) –

    Tree specs that rebuild recompute data.

  • saved_buffers (Dict[Tensor, Tensor]) –

    Buffers saved for recomputation. Only applied at microbatch 0.

  • download_event (List[Event]) –

    Events to signal when saved data download is done.

  • device_rng_states (List[Optional[Tensor]]) –

    Saved CUDA RNG states per layer when requested.

  • cpu_rng_states (List[Optional[Tensor]]) –

    Saved CPU RNG states per layer when requested.

  • input_backward_events (Sequence[Event]) –

    Events to record when gradients are ready.

  • output_backward_events (Sequence[Event]) –

    Events to wait on before backward compute.

  • grad_states (List[Optional[Tensor]]) –

    Gradient tensors for current backward pass.

__init__(model, gpu_fwd_layers, gpu_bwd_layers, timer, tracker, enable_grad, microbatch_id, num_microbatches, preserve_rng_state, recompute_grain)

Initialize per-microbatch caches and RNG bookkeeping.

Parameters:

  • model (RoundPipe) –

    The running RoundPipe instance.

  • gpu_fwd_layers (List[Module]) –

    A list for sharing GPU-resident layers among forward microbatches.

  • gpu_bwd_layers (List[Module]) –

    A list for sharing GPU-resident layers among backward microbatches.

  • timer (IterTimer) –

    Iteration timer shared across microbatches.

  • tracker (ModelTracker) –

    Tracker describing fwd/bwd ordering across layers.

  • enable_grad (bool) –

    Whether to store data for backward pass.

  • microbatch_id (int) –

    Microbatch index for this context.

  • num_microbatches (int) –

    Total number of microbatches in the batch.

  • preserve_rng_state (bool) –

    Whether to snapshot RNG for recomputation.

  • recompute_grain (Literal[stage, layer]) –

    Backward recompute granularity.

cut_recompute_data(layer_id)

Retrieve and clear data saved for backward recompute.

Parameters:

  • layer_id (int) –

    Layer index whose data should be retrieved.

Returns:

  • Any

    The data previously saved via save_for_recompute.

fetch_recompute_data(layer_id, device)

Load data saved for backward recompute to GPU.

Parameters:

  • layer_id (int) –

    Layer index whose data should be retrieved.

  • device (DeviceManager) –

    Device manager whose streams guard the transfer.

restore_rng_state(layer_id, device)

Restore the RNG snapshot captured during save_input.

Parameters:

  • layer_id (int) –

    Layer index whose RNG states should be restored.

  • device (DeviceManager) –

    Device manager that owns the CUDA stream.

Raises:

  • AssertionError

    If RNG state was not captured as expected.

save_buffer(layer)

Save layer buffers for recomputation. Only called at microbatch 0.

Parameters:

  • layer (Module) –

    Layer whose buffers should be saved.

save_for_recompute(layer_id, device, *data)

Save data for backward recompute. Tensor to be saved cannot require gradients. This function can be called at most once from each layer. If forward gradients are not enabled, this is a no-op.

Parameters:

  • layer_id (int) –

    Layer index whose data should be cached.

  • device (DeviceManager) –

    Device manager whose streams guard the transfer.

  • *data (Any, default: () ) –

    Data to save for recomputation.

save_input(layer_id, batch, device)

Stash flattened inputs (and optionally RNG) for backward recompute.

If gradients are not enabled or the layer is not the first layer of a backward stage, this is a no-op.

Parameters:

  • layer_id (int) –

    Layer index whose inputs should be cached.

  • batch (Batch) –

    Batch holding the flattened tensors to snapshot.

  • device (DeviceManager) –

    Device manager whose streams guard the transfer.

run_backward(device, context, layer_group_id)

Recompute saved inputs, propagate gradients, and ship grads to CPU.

Info

This function will run on a separate thread managed by the DeviceManager or Pytorch autograd. Multiple threads may run concurrently on different devices. Be aware of thread-safety when using and modifying this function. All data access must limit to the input parameters and the specified model layers.

Parameters:

  • layer_group_id (int) –

    Index of the backward layer group to execute.

  • context (RoundPipeRunContext) –

    Microbatch-specific execution context.

  • device (DeviceManager) –

    Device manager providing streams for recompute/backward.

Returns:

  • None

    Results are written into context.grad_states in-place.

Raises:

  • RuntimeError

    If checkpoint semantics are violated by the caller.

run_forward(device, context, layer_group_id, batch)

Upload layers, execute forward compute, and copy outputs back to host.

Info

This function will run on a separate thread managed by the DeviceManager. Multiple threads may run concurrently on different devices. Be aware of thread-safety when using and modifying this function. All data access must limit to the input parameters and the specified model layers.

Parameters:

  • layer_group_id (int) –

    Index of the layer group being executed.

  • batch (Batch) –

    Batch object containing flattened microbatch inputs.

  • context (RoundPipeRunContext) –

    Microbatch-specific execution context.

  • device (DeviceManager) –

    Device manager that streams data and compute.

Returns:

  • None

    Results are written into batch.flatten_states in-place.

run_forward_backward(device, context, batch, loss_fn, return_outputs)

Execute a fused forward/backward pass for training workloads.

Info

This function will run on a separate thread managed by the DeviceManager. Multiple threads may run concurrently on different devices. Be aware of thread-safety when using and modifying this function. All data access must limit to the input parameters and the specified model layers.

Parameters:

  • batch (Batch) –

    Batch object containing flattened activations and labels.

  • context (RoundPipeRunContext) –

    Microbatch-specific execution context.

  • loss_fn (Callable[[Any, Any], Union[Sequence[Tensor], Tensor]]) –

    Callable that calculates loss from outputs and labels.

  • device (DeviceManager) –

    Device manager providing compute/transfer streams.

Returns:

  • None

    Results are written into batch.flatten_states, batch.loss_list and context.grad_states in-place.

run_config

Runtime configuration objects shared across RoundPipe components.

FullRoundPipeRunConfig

Resolved configuration combining model defaults and call overrides.

See RoundPipeRunConfig for parameter details.

__init__(function_run_config, model_run_config)

Merge a per-call config with the model's baseline config.

Parameters:

RoundPipeRunConfig

Shallow user-facing configuration applied per forward/train call.

__init__(requires_grad=None, output_device=None, preserve_rng_state=None, recompute_grain=None, num_microbatch=None, split_input=None, split_label=None, merge_output=None, execute_plan=None)

Configuration for running RoundPipe models. User may specify model-level configuration when initializing RoundPipe, and/or function-level configuration when calling forward().

Parameters:

  • requires_grad (Optional[bool], default: None ) –

    Whether to enable gradient computation. If None, defaults to the global setting.

  • output_device (Optional[device], default: None ) –

    The device where the output tensors will be placed. If None, defaults to CPU.

  • preserve_rng_state (Optional[bool], default: None ) –

    Whether to preserve the random number generator state. If None, defaults to True.

  • recompute_grain (Optional[Literal[stage, layer]], default: None ) –

    Backward recompute granularity. 'stage': recompute per backward stage. 'layer': recompute and backward per layer within a stage. If None, defaults to 'stage'.

  • num_microbatch (Optional[int], default: None ) –

    The number of microbatches to split the input into. If None, defaults to the number of available CUDA devices plus one.

  • split_input (-, default: None ) –

    Specifies how to split input arguments into microbatches. If None, defaults to automatic splitting.

  • split_label (-, default: None ) –

    Specifies how to split labels into microbatches. If None, defaults to automatic splitting.

  • merge_output (-, default: None ) –

    Specifies how to merge output microbatches back into a single output. If None, defaults to automatic merging.

  • execute_plan (Optional[ModelExecutePlan], default: None ) –

    An optional ModelExecutePlan to dictate execution strategy. If None, defaults to auto tuned execution.

scheduler

Scheduling utilities that orchestrate forward/backward ordering.

Attributes:

BackwardScheduleSimulator

Mimics async backward tagging to coordinate microbatch ordering.

This class simulates the behavior of pipelined backward scheduling by maintaining a set of gradient anchor tensors (tags) that are rotated among devices. Each device uses its assigned tag to build the autograd graph for its microbatch during backward passes. By rotating the tags, we ensure that the backward passes across microbatches and layers are scheduled as desired.

Attributes:

  • tags (List[Tensor]) –

    Gradient anchor tensors tracked per device.

  • cur_device (int) –

    Index of the device used in the latest scheduling step.

  • n_devices (int) –

    Total number of CUDA devices detected.

__init__()

Pre-allocate gradient anchor tensors per CUDA device.

get_next_tag()

Return the tag tensor assigned to the next device in rotation.

reset()

Reset rotation state so unrelated runs do not share graphs.

update_current_tag(new_tag)

Cache the tag produced by the most recent backward pass.

ModelExecutePlan

Execution plans for a RoundPipe model.

Attributes:

  • fwd_plan (List[range]) –

    List of layers execution orders during forward.

  • bwd_plan (List[range]) –

    List of layers execution orders during backward.

__init__()

Initialize empty execution plans.

__repr__()

Return string representation of the execution plans.

auto(run_type, /, *models, min_stages=get_num_devices(), upper_threshold=1.1, model_memory_limit=get_min_gpu_memory() * 0.6) classmethod

Generate automatic execution plans based on model timings.

Parameters:

  • run_type (Literal[infer, train, fused]) –

    Type of model run.

  • models (RoundPipe, default: () ) –

    One or more RoundPipe models to base the plans on.

  • min_stages (int, default: get_num_devices() ) –

    Minimum number of pipeline stages to use. This is a hint for the planner, and the actual number of stages could be lower depending on the model size.

  • upper_threshold (float, default: 1.1 ) –

    Upper threshold for stage balancing. This limits the maximum allowed ratio between stages and the slowest layer. Increasing this value provides more flexibility in balancing stages at the cost of consuming more GPU memory.

  • model_memory_limit (float, default: get_min_gpu_memory() * 0.6 ) –

    Estimated GPU memory (in GB) available for model parameters and grads. Increasing this value allows more flexibility in balancing stages, while decreasing it could allow larger batch sizes to run. Note that RoundPipe will prefetch model parameters to GPU memory, so the limit for each stage is model_memory_limit / 2.

Returns:

check_valid(num_layers, run_type)

Validate that the execution plans cover all layers exactly once.

Parameters:

  • num_layers (int) –

    Total number of layers in the model.

  • run_type (Literal[infer, train, fused]) –

    Type of model run.

Raises:

  • ValueError

    If the plans do not cover all layers exactly once.

ModelTracker

Tracks forward and backward execution plans for a RoundPipe model. Contains semaphores to coordinate layer groups execution.

Attributes:

  • fwd_plan (List[range]) –

    List of layer ranges executed during forward.

  • bwd_plan (List[range]) –

    List of layer ranges executed during backward.

  • fwd_sem (List[AnnotatedSemaphore]) –

    Per-layer semaphores used to gate forward progress.

  • bwd_sem (List[AnnotatedSemaphore]) –

    Per-layer semaphores used to gate backward progress.

  • fused_fwd_sem (AnnotatedSemaphore) –

    Semaphore to signal forward part completion of fused forward backward.

__init__(execute_plan)

Initialize ModelTracker based on the model configuration.

Parameters:

  • execute_plan (ModelExecutePlan) –

    ModelExecutePlan instance with execution plans.

backward_need_input(layer_id)

Return whether backward execution requires inputs for layer_id.

backward_notify(layer_group_id)

Signal that the given backward group completed.

backward_wait_complete(num_microbatch)

Wait for backward completion across all microbatches.

Parameters:

  • num_microbatch (int) –

    Number of microbatches that must finish.

backward_wait_for(layer_group_id)

Block until the given backward group completes.

forward_notify(layer_group_id)

Signal that the given forward group completed.

forward_wait_complete(num_microbatch)

Wait for the last forward group to finish num_microbatch times.

Parameters:

  • num_microbatch (int) –

    Number of microbatches that must finish.

forward_wait_for(layer_group_id)

Block until the previous forward group finishes.

Parameters:

  • layer_group_id (int) –

    Index of the group to wait on minus one.

fused_forward_notify()

Signal that forward part of fused forward backward completed.

fused_forward_wait_complete(num_microbatch)

Wait for fused forward part of fused forward backward completion across all microbatches.

Parameters:

  • num_microbatch (int) –

    Number of microbatches that must finish.

chunk_layer_params(tensor_pair, n_chunks)

Group tensor copies into balanced chunks for overlapped transfers.

Here we use a greedy number partitioning algorithm to distribute tensor copy work across multiple chunks. This helps balance the workload when transferring model parameters.

Parameters:

  • tensor_pair (List[Tuple[Tensor, Tensor]]) –

    List of (src, dst) tensors representing copy work.

  • n_chunks (int) –

    Number of chunks/events to distribute the work across.

Returns:

  • List[List[Tuple[Tensor, Tensor]]]

    List of chunks where each chunk is a list of tensor copy pairs.

threads

Thread helpers that keep RoundPipe worker threads observable and safe.

Attributes:

  • roundpipe_threads (List[RoundPipeThread]) –

    List of all RoundPipe threads created so far.

  • thread_exception_print_lock (LockType) –

    Lock to prevent interleaved exception prints.

  • KeyboardInterruptRoundPipeThreads (KeyboardInterrupt) –

    KeyboardInterrupt raised when RoundPipe waits for its worker threads to finish.

AnnotatedEvent

Bases: Event

Event that annotates waits for easier profile and debugging.

Attributes:

  • name

    Friendly name to help annotate waits.

AnnotatedSemaphore

Bases: Semaphore

Semaphore that annotates waits for easier profile and debugging.

Attributes:

  • name

    Friendly name to help annotate waits.

RoundPipeThread

Bases: Thread

Daemon thread wrapper that reports uncaught exceptions before exit.

Attributes:

  • is_active (bool) –

    Flag indicating whether the thread currently executes user work (used for debugging dumps).

__init__(target, name, **kwargs)

Wrap a target callable so crashes are surfaced immediately.

Parameters:

  • target (Callable) –

    Callable that performs the thread's work.

  • name (str) –

    Friendly name to help when dumping thread stacks.

  • **kwargs (Any, default: {} ) –

    Additional threading.Thread keyword arguments.

dump_all_active_threads()

Print trimmed stack traces for all currently active RoundPipe threads.

is_threading_internal(frame)

Return whether frame originates from Python or RoundPipe threading.

Parameters:

  • frame (FrameType) –

    Frame to inspect.

Returns:

  • bool

    True if the frame belongs to threading internals, else False.

print_trimmed_traceback(frame)

Print a traceback that omits internal threading frames.

Parameters:

  • frame (Optional[FrameType]) –

    Frame whose stack should be printed.

timer

CUDA event helpers for measuring per-layer forward/backward latency.

IterTimer

Tracks per-layer forward/recompute/backward timing events for one iteration.

Attributes:

  • parent (ModelTimer) –

    Parent ModelTimer that this IterTimer belongs to.

  • fwd_events (TimerFwdEventsType) –

    Recorded per-layer forward/recompute events.

  • bwd_events (TimerBwdEventsType) –

    Recorded per-layer-group backward events.

  • bwd_dict_lock (LockType) –

    Lock to protect access to bwd_events.

__del__()

On deletion, push results to parent's iter_results queue.

time_bwd(layer_ids, stream)

Create a context manager to time a backward layer.

Parameters:

  • layer_ids (range) –

    Range of layer indices being timed.

  • stream (Stream) –

    CUDA stream on which to record events.

Returns:

time_fwd(action, layer_idx, stream)

Create a context manager to time a forward/recompute layer.

Parameters:

  • action (Literal[fwd, re]) –

    Either 'fwd' or 're' to indicate forward or recompute.

  • layer_idx (int) –

    Index of the layer being timed.

  • stream (Stream) –

    CUDA stream on which to record events.

Returns:

LayerTimingContext

Context manager that records CUDA events around a code block.

Attributes:

  • start_event (Event) –

    CUDA event recorded on entry.

  • end_event (Event) –

    CUDA event recorded on exit.

  • stream (Stream) –

    Stream used for event recording.

__enter__()

Record start_event on the configured stream.

__exit__(*args)

Record end_event when the context exits.

__init__(start_event, end_event, stream)

Store events/stream for later use.

Parameters:

  • start_event (Event) –

    Event recorded at context entry.

  • end_event (Event) –

    Event recorded at context exit.

  • stream (Stream) –

    CUDA stream on which to record events.

ModelTimer

Tracks per-layer forward/recompute/backward timing using CUDA events. Before having any records, the timer uses model size-based estimates. After one warm-up run, the timer updates its estimates using an exponential moving average from 0.

Attributes:

  • SMOOTH_RATE (float) –

    Smoothing factor for time estimates.

  • BACKWARD_MULTIPLIER (float) –

    Initial multiplier to estimate backward time from forward time.

  • VERBOSE (bool) –

    Whether to print timing updates.

  • n_layers (int) –

    Number of layers being timed.

  • stage (Dict[Literal[fwd, re, bwd], Literal[0, 1, 2]]) –

    0 if no events recorded yet, 1 if first result dropped, 2 if time-based estimate has been computed. Stage of recompute and backward should be the same.

  • scale (Dict[Literal[fwd, re, bwd], float]) –

    Scaling factors for each type. Since moving average starts from 0, this tracks the ineffective weight of averages. Scale of recompute and backward should be the same.

  • estimate (Dict[Literal[fwd, re, bwd], List[float]]) –

    Smoothed time estimates for each layer and type.

  • iter_results (TimerQueueType) –

    Recorded timing events from completed iterations.

  • pending_fwd (Optional[TimerFwdEventsType]) –

    Not yet completed forward/recompute events.

  • pending_bwd (Optional[TimerBwdEventsType]) –

    Not yet completed backward events.

get_estimate(run_type)

Get current time and memory estimates for all layers.

Parameters:

  • run_type (Literal[infer, train, fused]) –

    Type of model run.

Returns:

  • Literal[time, memory]

    Source of estimates

  • List[float]

    List of per-layer forward time estimates (ms / bytes)

  • List[float]

    List of per-layer backward time estimates (ms / bytes)

update_times()

Update time estimates based on recorded events.

transfer

Tensor transfer helpers that move activations/params between host/DEVICE.

Attributes:

  • chunked_upload (bool) –

    Whether to split large params into chunks during upload.

  • CHUNK_UPLOAD_SIZE (int) –

    Size threshold (in bytes) for chunked uploads.

PinnedUpload

Bases: Function

Autograd helper that enforces pinned host tensors before H2D copies.

backward(ctx, g) staticmethod

Move gradients back to pinned host memory.

Parameters:

  • ctx (Any) –

    Autograd context (unused).

  • g (Tensor) –

    Gradient tensor on the device.

Returns:

  • Tuple[Tensor, None]

    Gradient tensor on CPU and None for the device argument.

forward(ctx, t, d) staticmethod

Ensure t resides in pinned memory before copying to device.

Parameters:

  • ctx (Any) –

    Autograd context (unused).

  • t (Tensor) –

    Host tensor to transfer.

  • d (device) –

    Destination device.

Returns:

  • Tensor

    Tensor residing on device d.

RegisterBackwardEvent

Bases: Function

Records an event so backward consumers can synchronize on it.

backward(ctx, grad_outputs) staticmethod

Synchronize on the recorded event before returning gradients.

Parameters:

  • ctx (Any) –

    Autograd context containing the event handle.

  • grad_outputs (Tensor) –

    Incoming gradient tensor.

Returns:

  • Tuple[Tensor, None]

    Tuple containing the guarded gradient and None for event.

forward(ctx, input, event) staticmethod

Stash event so its completion gates backward consumption.

Parameters:

  • ctx (Any) –

    Autograd context storing the event handle.

  • input (Tensor) –

    Tensor passed through untouched.

  • event (Event) –

    CUDA event to signal when backward starts.

Returns:

  • Tensor

    The original input tensor.

async_d2h(device, transfer_finish_event, device_tensors, keep_requires_grad=False)

Copy tensors from device to host streams while preserving ordering.

Parameters:

  • device (DeviceManager) –

    Device manager owning the downstream stream.

  • transfer_finish_event (Iterable[Event]) –

    Events to record when the copy completes.

  • device_tensors (Iterable[Union[Tensor, Any]]) –

    Iterable of tensors/objects residing on the device.

  • keep_requires_grad (bool, default: False ) –

    Whether to preserve requires_grad flags.

Returns:

  • List[Union[Tensor, Any]]

    List of tensors/objects now resident on the host.

async_h2d(device, host_ready_event, host_tensors, keep_requires_grad=False)

Copy tensors from host to device using the device's upload stream.

Parameters:

  • device (DeviceManager) –

    Device manager orchestrating the upload.

  • host_ready_event (Iterable[Event]) –

    Events the host waited on before the copy.

  • host_tensors (Iterable[Union[Tensor, Any]]) –

    Iterable of host tensors/objects.

  • keep_requires_grad (bool, default: False ) –

    Whether to preserve requires_grad flags.

Returns:

  • List[Union[Tensor, Any]]

    List of tensors/objects now resident on the device.

create_upload_pair(tensor_pair, src, device)

Allocate destination buffers and register chunk copies when needed.

Parameters:

  • tensor_pair (List[Tuple[Tensor, Tensor]]) –

    Collector that receives (src, dst) pairs.

  • src (Tensor) –

    Source tensor currently resident in host memory.

  • device (device) –

    Target CUDA device.

Returns:

  • Tensor

    Destination tensor allocated on device.

download_layer(cpu_layer, gpu_layer, layer_attr, device, with_buffer, with_grad)

Copy layer params/buffers/grads back to the host asynchronously. Note that this only issues the copies; synchronization must be handled externally.

Parameters:

  • cpu_layer (Module) –

    Original layer residing on the host.

  • gpu_layer (Module) –

    Layer copy residing on the device.

  • layer_attr (LayerAttribute) –

    Layer attribute tracking the transfer events.

  • device (DeviceManager) –

    Device manager orchestrating the transfer.

  • with_buffer (bool) –

    Whether to copy buffers back to host.

  • with_grad (bool) –

    Whether to copy gradients back to host.

upload_layers(layers, layer_attrs, device, with_grad, saved_buffers=None)

Copy layers onto the target device.

Parameters:

  • layers (List[Module]) –

    Sequence of layers to upload.

  • layer_attrs (List[LayerAttribute]) –

    Sequence of layer attributes corresponding to layers.

  • device (DeviceManager) –

    Device manager orchestrating the transfer.

  • with_grad (bool) –

    Whether to copy gradient buffers alongside parameters.

  • saved_buffers (Optional[Dict[Tensor, Tensor]], default: None ) –

    Mapping to the saved buffers for doing recomputation.

Returns:

  • List[Module]

    Layer copies now resident on the target device.

utils

Generic helpers shared across RoundPipe components.

get_call_location(depth)

Get the call location in 'filename:lineno' format.

Parameters:

  • depth (int) –

    The depth in the call stack to inspect.

Returns:

  • str

    A string representing the call location.

get_model_active_size(model, recurse=True)

Return the parameter bytes for model that require grad.

Parameters:

  • model (Module) –

    Module whose parameters are measured.

  • recurse (bool, default: True ) –

    Whether to include children recursively.

Returns:

  • int

    Total size in bytes.

get_model_size(model, recurse=True)

Return the combined parameter + buffer bytes for model.

Parameters:

  • model (Module) –

    Module whose parameters/buffers are measured.

  • recurse (bool, default: True ) –

    Whether to include children recursively.

Returns:

  • int

    Total size in bytes.

wrapper

Utilities that wrap user models with RoundPipe sequential presets.

wrap_model_recursive(model, lower_threshold, upper_threshold, skip_modules, override_config, model_run_config, name, **roundpipe_kwargs)

Recursively wrap modules in RoundPipe using heuristics/presets.

Parameters:

  • model (Module) –

    Root module to evaluate.

  • lower_threshold (int) –

    Minimum size (bytes) to consider wrapping directly.

  • upper_threshold (int) –

    Maximum size before splitting submodules. Defaults to model size divided by num_devices + 1.

  • skip_modules (Container[Module]) –

    Modules that should remain untouched.

  • override_config (Dict[Module, RoundPipeRunConfig]) –

    Mapping from module objects to specific configs.

  • model_run_config (RoundPipeRunConfig) –

    Default run config for RoundPipe instances.

  • name (str) –

    Name of the current module.

  • **roundpipe_kwargs (Any, default: {} ) –

    Additional kwargs forwarded to RoundPipe.

Returns:

  • Module

    Either the original module (possibly modified in-place) or a RoundPipe instance wrapping the selected submodules.

wrap_model_to_roundpipe(model, use_sequential_preset=None, lower_threshold=16 * 1024, upper_threshold=None, skip_modules=[], override_config={}, model_run_config=RoundPipeRunConfig(), name=None, **roundpipe_kwargs)

Wrap a model into RoundPipe instance using recursive heuristics/presets.

Parameters:

  • model (Module) –

    Root module to evaluate.

  • use_sequential_preset (Optional[bool], default: None ) –

    None/True attempts to replace with a packaged sequential preset, False skips the preset lookup.

  • lower_threshold (int, default: 16 * 1024 ) –

    Minimum size (bytes) to consider wrapping directly.

  • upper_threshold (Optional[int], default: None ) –

    Maximum size before splitting submodules. Defaults to model size divided by num_devices + 1.

  • skip_modules (Container[Module], default: [] ) –

    Modules that should remain untouched.

  • override_config (Dict[Module, RoundPipeRunConfig], default: {} ) –

    Mapping from module objects to specific configs.

  • model_run_config (RoundPipeRunConfig, default: RoundPipeRunConfig() ) –

    Default run config for RoundPipe instances.

  • name (Optional[str], default: None ) –

    Name of the current module. If None, a name is generated based on the call site.

  • **roundpipe_kwargs (Any, default: {} ) –

    Additional kwargs forwarded to RoundPipe.

Returns:

Raises:

  • NotImplementedError

    If use_sequential_preset is explicitly True but no preset exists for the model type.