Model APIs
roundpipe.RoundPipe
class roundpipe.RoundPipe(
model: nn.Module,
optim_dtype: Optional[torch.dtype] = None,
name: Optional[str] = None,
model_run_config: RoundPipeRunConfig = RoundPipeRunConfig(),
pin_model: Literal["alloc", "register", "off"] = "alloc",
)
Wraps an nn.Module with RoundPipe's pipelined execution runtime. This is the core class of RoundPipe, responsible for splitting a model into layers, managing parameter transfers between CPU and GPU, and coordinating pipelined forward and backward passes.
Parameters:
model: The model to wrap. Can benn.Sequentialor an arbitrary model. Annn.Sequentialmodel is split directly into individual layers; a non-Sequential model is treated as a single layer.optim_dtype: Data type for optimizer parameters. Defaults to the same type as the model parameters. A common setup is to keep model parameters intorch.float16and usetorch.float32for optimizer parameters to ensure numerical stability.name: Optional identifier for display in logs and traces. Defaults to an auto-generated string based on the call site (format:filename:line).model_run_config: Model-level default run configuration, used as the baseline for each call toforward()orforward_backward(). See Run Config for details.pin_model: Memory pinning strategy for model parameters:"alloc": Use PyTorch'spin_memoryfor pinned memory. This is the default and usually provides the best host-to-device transfer performance, but may cause up to 2x CPU memory usage because PyTorch pads all allocations to a power of 2."register": UsecudaHostRegisterfor pinned memory. Reduces CPU memory usage for very large models, but host-to-device transfer performance degrades by approximately 10%. Only available with CUDA."off": Do not pin memory. Suitable for LoRA fine-tuning models that exceed CPU memory. Combined withmmapduring model loading, Linux can load data on demand from disk and automatically evict used data when out of memory.
RoundPipe.forward
RoundPipe.forward(
*args: Any,
roundpipe_run_config: RoundPipeRunConfig = RoundPipeRunConfig(),
**kwargs: Any,
) -> Any
Execute a forward pass. The input data is automatically split into microbatches, executed in parallel through the pipeline, and the outputs from all microbatches are merged before being returned.
Parameters:
*args: Positional arguments forwarded to the underlying model.roundpipe_run_config: Per-call run configuration that overrides the model-level defaults.**kwargs: Keyword arguments forwarded to the underlying model.
Returns:
- The merged output. The specific merging behavior is determined by the
merge_outputconfiguration.
RoundPipe.forward_backward
RoundPipe.forward_backward(
input_args: Tuple[Any, ...] = (),
input_kwargs: Dict[str, Any] = {},
label: Any = None,
loss_fn: Callable[[Any, Any], Union[Sequence[torch.Tensor], torch.Tensor]] = lambda outputs, labels: outputs,
return_outputs: bool = False,
run_config: RoundPipeRunConfig = RoundPipeRunConfig(),
) -> Union[Tuple[Union[List[torch.Tensor], torch.Tensor], Any], List[torch.Tensor], torch.Tensor]
Execute a fused forward and backward pass. Compared to calling forward() followed by a manual backward(), this method uses an optimized scheduling strategy that allows forward and backward passes to execute concurrently in the pipeline, eliminating pipeline bubbles.
Parameters:
input_args: Positional arguments for the forward pass.input_kwargs: Keyword arguments for the forward pass.label: Label data, provided in the format expected byloss_fn.loss_fn: Loss function. Takes(outputs, labels)as arguments and returns a loss tensor or a sequence of loss tensors.return_outputs: Whether to also return model outputs.run_config: Per-call run configuration.
Returns:
- If
return_outputs=False(default), returns the sum of losses across all microbatches. - If
return_outputs=True, returns a(loss_sum, merged_outputs)tuple.
Calling this function will calculate loss and perform backward for each microbatch seperately. This is equivalent to using the sum of losses from each microbatch as the loss of the input training sample. Specifically, the semantics of this function is:
def forward_backward(
input_args: Tuple[Any, ...] = (),
input_kwargs: Dict[str, Any] = {},
label: Any = None,
loss_fn: Callable[[Any, Any], Union[Sequence[torch.Tensor], torch.Tensor]] = lambda outputs, labels: outputs,
return_outputs: bool = False,
run_config: RoundPipeRunConfig = RoundPipeRunConfig(),
) -> Union[Tuple[Union[List[torch.Tensor], torch.Tensor], Any], List[torch.Tensor], torch.Tensor]:
split_input_args, split_input_kwargs = split_input(
input_args, input_kwargs
)
split_labels = split_label(labels)
losses, outputs = [], []
for input_args_mb, input_kwargs_mb, label_mb in zip(
split_input_args, split_input_kwargs, split_labels
):
output_mb = model.forward(input_args_mb, input_kwargs_mb)
loss_mb = loss_fn(output_mb, label_mb)
torch.autograd.backward(loss_mb)
# When loss_mb is a tensor, the above line is the same as
# loss_mb.backward()
losses.append(loss_mb)
outputs.append(output_mb)
if return_outputs:
return sum(losses), merge_output(outputs)
else:
return sum(losses)
RoundPipe.optim_parameters
RoundPipe.optim_parameters() -> Iterator[torch.nn.Parameter]
Return an iterator over parameters suitable for optimizer consumption.
RoundPipe internally manages the storage location and data type of model parameters. Parameters obtained through this method are stored in the format required by the optimizer (as specified by optim_dtype), and can be passed directly to an optimizer.
Returns:
- A parameter iterator, where each parameter is an optimizer-specific copy.
RoundPipe.optim_named_parameters
RoundPipe.optim_named_parameters(
prefix: str = "",
remove_duplicate: bool = True,
) -> Iterator[Tuple[str, torch.nn.Parameter]]
Return an iterator over named optimizer parameters.
Parameters:
prefix: Prefix to prepend to parameter names.remove_duplicate: Whether to skip duplicate parameters.
Returns:
- An iterator yielding
(name, parameter)tuples.
RoundPipe.step
RoundPipe.step(
step_fn: Callable[..., None],
is_async: bool = True,
*args: Any,
**kwargs: Any,
) -> None
Execute an optimizer update using the provided step function.
In the default asynchronous mode, step returns immediately and the optimizer update runs in a background thread. Training iterations will use parameters that are one step behind, which typically does not affect convergence in practice. Synchronous mode waits for the optimizer update to complete before returning, ensuring each iteration uses the latest parameters, but significantly reduces performance and is generally not recommended.
Data Races
Data access in step_fn should be limited to optimizer parameters only. Accessing other data requires awareness of potential data race issues.
Parameters:
step_fn: A callable that performs one optimization step.is_async: Whether to execute asynchronously. Defaults toTrue.*args: Positional arguments forwarded tostep_fn.**kwargs: Keyword arguments forwarded tostep_fn.
RoundPipe.synchronize
RoundPipe.synchronize() -> None
Synchronize optimizer parameters and backward gradients back to model parameters.
After calling this method, model parameters will reflect the latest optimizer update results, and the .grad attribute of parameters will contain the accumulated gradients. This is useful when you need to inspect parameter values or gradients (e.g., before evaluation or saving a checkpoint).
roundpipe.wrap_model_to_roundpipe
roundpipe.wrap_model_to_roundpipe(
model: nn.Module,
use_sequential_preset: Optional[bool] = None,
lower_threshold: int = 16 * 1024,
upper_threshold: Optional[int] = None,
skip_modules: Container[nn.Module] = [],
override_config: Dict[nn.Module, RoundPipeRunConfig] = {},
model_run_config: RoundPipeRunConfig = RoundPipeRunConfig(),
name: Optional[str] = None,
**roundpipe_kwargs: Any,
) -> Union[RoundPipe, AutoRoundPipe]
Automatically wrap a model into a RoundPipe instance using recursive heuristics or built-in presets.
This function attempts the following strategies:
- If
use_sequential_presetis notFalse, it first tries to use a built-in model preset to convert the model into an equivalent Sequential structure. - If no preset is available, it recursively traverses the model's submodules, deciding how to wrap each based on size thresholds and module types.
- For models that ultimately cannot be split into a Sequential structure, it returns an
AutoRoundPipeinstance.
Parameters:
model: The root module to wrap.use_sequential_preset: Whether to use a built-in Sequential preset.NoneorTrueattempts to use a preset;Falseskips preset lookup. When set toNone, a message is printed if a matching preset is found.lower_threshold: Minimum module size (in bytes) for direct wrapping as aRoundPipe. Modules smaller than this threshold are wrapped withnum_microbatch=1.upper_threshold: Maximum size threshold for splitting submodules. Defaults to the total model size divided by(number of GPUs + 1).skip_modules: A list of modules that should not be wrapped.override_config: Run configuration overrides for specific modules.model_run_config: Default run configuration forRoundPipeinstances.name: Module name. IfNone, auto-generated based on the call site.**roundpipe_kwargs: Additional keyword arguments forwarded to theRoundPipeconstructor.
Wrapping Strategy Details:
When no matching preset is found, the function recursively traverses the model's submodule tree. For each module encountered, it decides the action based on the following priority:
-
Skip: If the module is in the
skip_moduleslist, the original module is returned without any wrapping. This module will execute on CPU. -
Wrap directly as a multi-layer
RoundPipe: If the module is annn.Sequential, it is wrapped directly as aRoundPipe, with each child module as an independent pipeline layer. -
Wrap directly as a single-layer
RoundPipe: If any of the following conditions are met:- The module's own parameters (non-recursive) are >=
lower_threshold(default 16 KB) - The module's total size (recursive) is <=
upper_threshold
and the module implements its own
forwardmethod, the entire module is wrapped as a single-layerRoundPipe.Special case: If the module's total size is <
lower_threshold, it is wrapped withnum_microbatchset to 1, since splitting extremely small modules into microbatches provides no performance benefit. - The module's own parameters (non-recursive) are >=
-
Wrap
nn.ModuleListelement-wise: If the module is annn.ModuleList, each element is wrapped as a separateRoundPipeinstance. By default, ModuleList modules are assumed to be called sequentially, somerge_outputis set toFalsefor non-final elements to enable pipelined parallel computation. If you need to access intermediate layer outputs, you can setmerge_output=Truefor the corresponding module inoverride_config, but this introduces additional synchronization and prevents the framework from parallelizing computation. -
Recurse into submodules: If none of the above conditions are met (i.e., the module is too large, not a Sequential/ModuleList, and does not have enough of its own parameters), the function recurses into
named_children(), repeating the above checks for each child. The module's ownforwardwill execute on CPU. -
Other special handling: For HuggingFace
PreTrainedModelinstances, theloss_functionis additionally wrapped.
Returns:
- If the model is successfully converted using a preset or recursive wrapping, returns a
RoundPipeinstance; otherwise returns anAutoRoundPipeinstance.AutoRoundPipeis a placeholder class indicating the model has been wrapped but could not be split into a Sequential structure. Users can still use RoundPipe's forward pass and optimizer features, but cannot benefit from layer-level splitting performance gains and cannot callforward_backward().
Raises:
NotImplementedError: Ifuse_sequential_preset=Truebut no preset exists for the model type.
RoundPipe.set_original_model
RoundPipe.set_original_model(original_model: nn.Module) -> None
Set a reference to the original model for proxied attribute access.
When a model is converted into a Sequential structure by wrap_model_to_roundpipe, attributes on the original model (such as HuggingFace model's vocab_size, config, etc.) can still be accessed through this mechanism. wrap_model_to_roundpipe calls this method automatically, so manual invocation is typically unnecessary.
Parameters:
original_model: The original unwrapped model.
Attribute Forwarding
RoundPipe overrides __getattr__, __setattr__, and __delattr__ to provide transparent attribute access to the wrapped model:
- Read: When accessing an attribute that does not exist on the RoundPipe instance, it is looked up on
original_modelif set, else on the model used to create the RoundPipe instance. - Write: After initialization, attribute writes on the RoundPipe instance are forwarded to
original_modelormodel. - Delete: Attribute deletions are similarly propagated to
original_modelormodel.
This mechanism allows a RoundPipe-wrapped model to be used just like the original model, without needing to be aware of the wrapping layer.