Distributed
graph
- anemoi.models.distributed.graph.ensure_sharded(x: Tensor, dim: int, shard_sizes: list[int] | None, model_comm_group: ProcessGroup | None = None) tuple[Tensor, list[int] | None]
Ensure that the input tensor is sharded along the specified dimension.
If
shard_sizesis not None the tensor is assumed to already be sharded and a consistency check is performed. Otherwise the tensor is sharded using balanced partitioning and the resulting sizes are returned.- Parameters:
x (Tensor) – Input tensor.
dim (int) – Dimension along which to shard.
shard_sizes (ShardSizes) – Per-rank partition sizes, or
Noneif the tensor is replicated.model_comm_group (ProcessGroup, optional) – Model communication group.
- Returns:
The (possibly sharded) tensor and the shard sizes.
- Return type:
tuple[Tensor, ShardSizes]
- anemoi.models.distributed.graph.shard_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup, gather_in_backward: bool = True) Tensor
Shard tensor.
Keeps only part of the tensor that is relevant for the current rank.
- anemoi.models.distributed.graph.gather_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup) Tensor
Gather tensor.
Gathers tensor shards from ranks.
- Parameters:
input (Tensor) – Input
dim (int) – dimension along which to gather
sizes (ShardSizes) – Per-rank shard sizes
mgroup (ProcessGroup) – model communication group
- Returns:
Gathered tensor.
- Return type:
Tensor
- anemoi.models.distributed.graph.reduce_tensor(input_: Tensor, mgroup: ProcessGroup) Tensor
Reduce tensor.
Reduces tensor across ranks.
- Parameters:
input (Tensor) – Input
mgroup (ProcessGroup) – model communication group
- Returns:
Reduced tensor.
- Return type:
Tensor
- anemoi.models.distributed.graph.sync_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup, gather_in_fwd: bool = True) Tensor
Sync tensor.
Perform a gather in the forward pass and an allreduce followed by a split in the backward pass.
- Parameters:
input (Tensor) – Input
dim (int) – dimension along which to gather
sizes (ShardSizes) – Per-rank shard sizes
mgroup (ProcessGroup) – model communication group
- Returns:
Synced tensor.
- Return type:
Tensor
- anemoi.models.distributed.graph.reduce_shard_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup) Tensor
Reduces and then shards tensor.
Perform an allreduce followed by a split in the forward pass and a gather in the backward pass.
- Parameters:
input (Tensor) – Input
dim (int) – dimension along which to gather
sizes (ShardSizes) – Per-rank shard sizes
mgroup (ProcessGroup) – model communication group
- Returns:
Reduced sharded tensor.
- Return type:
Tensor
- anemoi.models.distributed.graph.all_to_all_transpose(input_: Tensor, dim_split: int, split_sizes: list[int] | None, dim_concat: int, concat_sizes: list[int] | None, mgroup: ProcessGroup) Tensor
All-to-all transpose.
Switch the tensor from a dim_concat-sharded to a dim_split-sharded tensor via all-to-all transpose, reverse all-to-all in the backwards pass.
- Parameters:
input (Tensor) – Input tensor to be transposed.
dim_split (int) – Dimension along which to split the input tensor.
split_sizes (ShardSizes) – Shapes of the split tensors.
dim_concat (int) – Dimension along which to concatenate the transposed tensors.
concat_sizes (ShardSizes) – Shapes of the concatenated tensors.
mgroup (ProcessGroup) – Model communication group.
- Returns:
Transposed tensor.
- Return type:
Tensor
khop_edges
- anemoi.models.distributed.khop_edges.sort_edge_index_by_dst(edge_index: Tensor | SparseTensor, max_value: int = None) Tuple[Tensor | SparseTensor, Tensor]
Sort edge indices by destination node.
- anemoi.models.distributed.khop_edges.is_edge_index_dst_sorted(edge_index: Tensor | SparseTensor) bool
Check whether edge_index is sorted by destination node (edge_index[1]).
- class anemoi.models.distributed.khop_edges.GraphPartition(num_nodes: tuple[int, int], num_edges: int, num_parts: int, dst_splits: list[int], edge_splits: list[int])
Bases:
objectPrecomputed partitioning metadata for a graph with dst-sorted edges.
Enables O(1) slicing for both distributed sharding and local chunking by exploiting the fact that edges are already sorted by destination node.
- Parameters:
num_nodes (tuple[int, int]) – Number of (src, dst) nodes in the full graph.
num_edges (int) – Total number of edges.
num_parts (int) – Number of partitions (= world size for sharding, or num_parts for local chunking).
dst_splits (list[int]) – Per-partition destination node counts.
edge_splits (list[int]) – Per-partition edge counts (derived from dst-sorted edge structure).
- materialise(partition_id: int, x: Tuple[Tensor, Tensor], edge_attr: Tensor, edge_index: Tensor | SparseTensor, cond: Tuple[Tensor, Tensor] | None = None) tuple[Tuple[Tensor, Tensor], Tensor, Tensor | SparseTensor, Tensor, Tuple[Tensor, Tensor] | None]
Materialise a single partition by slicing nodes, edges and conditioning.
Pure local operation — no communication. Suitable for chunking within a single device.
- Parameters:
- Returns:
(x_src_subset, x_dst_subset), edge_attr_subset, edge_index_relabeled, cond subset (or None).
- Return type:
tuple[PairTensor, Tensor, Adj, Optional[PairTensor]]
- anemoi.models.distributed.khop_edges.build_graph_partition(edge_index: Tensor | SparseTensor, num_parts: int, num_nodes: tuple[int, int]) GraphPartition
Build graph partitioning information from a dst-sorted edge_index.
- Parameters:
- Returns:
The graph partitioning information.
- Return type:
- anemoi.models.distributed.khop_edges.build_graph_partition_from_shard_info(edge_index: Tensor | SparseTensor, x: Tuple[Tensor, Tensor], shard_info: BipartiteGraphShardInfo, model_comm_group: ProcessGroup | None = None) GraphPartition
Build a GraphPartition for distributed sharding from current shard metadata.
Derives num_nodes from shard_info and tensor shapes, and sets num_parts to the communication group size.
- Parameters:
edge_index (Adj) – The edge index tensor (must be sorted by destination node).
x (PairTensor) – Node features (src, dst), used to infer sizes when not sharded.
shard_info (BipartiteGraphShardInfo) – Current shard metadata.
model_comm_group (ProcessGroup, optional) – Model communication group.
- Returns:
The graph partitioning information.
- Return type:
- anemoi.models.distributed.khop_edges.ensure_edges_are_dst_sorted(edge_attr: Tensor, edge_index: Tensor | SparseTensor, *, num_dst: int, edges_are_sharded: bool, model_comm_group: ProcessGroup | None = None, edges_are_dst_sorted: bool = True) tuple[Tensor, Tensor | SparseTensor]
Ensure edge tensors are dst-sorted before GraphTransformer attention.
- anemoi.models.distributed.khop_edges.shard_edges_1hop(edge_attr: Tensor, edge_index: Tensor | SparseTensor, src_size: int, dst_size: int, model_comm_group: ProcessGroup | None, edges_are_dst_sorted: bool = True) tuple[Tensor, Tensor | SparseTensor, list[int] | None]
Sort and shard edges for 1-hop sharding.
- Parameters:
edge_attr (Tensor) – Edge attributes.
edge_index (Adj) – Edge index.
src_size (int) – Number of source nodes.
dst_size (int) – Number of destination nodes.
model_comm_group (ProcessGroup, optional) – Model communication group.
edges_are_dst_sorted (bool, optional) – Whether edge_index and edge_attr are already ordered by destination node. Edges from graph providers already are. Pass False for custom full-graph edges that are not ordered this way. If edges are already sharded, each rank is expected to already have the right edges for its local destination nodes.
- Returns:
Sharded edge_attr, sharded edge_index, and edge_shard_sizes.
- Return type:
tuple[Tensor, Adj, ShardSizes]
- anemoi.models.distributed.khop_edges.shard_graph_to_local(partition: GraphPartition, x: Tuple[Tensor, Tensor], edge_attr: Tensor, edge_index: Tensor | SparseTensor, shard_info: BipartiteGraphShardInfo, model_comm_group: ProcessGroup | None = None, cond: Tuple[Tensor, Tensor] | None = None) tuple[Tuple[Tensor, Tensor], Tensor, Tensor | SparseTensor, BipartiteGraphShardInfo, Tuple[Tensor, Tensor] | None]
Shard graph tensors to the local rank using precomputed partition metadata.
Handles all communication (sync src, shard dst/edges) and returns the local subgraph with updated shard metadata.
- Parameters:
partition (GraphPartition) – Precomputed partition metadata.
x (PairTensor) – Node features (src, dst).
edge_attr (Tensor) – Edge attributes.
edge_index (Adj) – Edge indices (assumed dst-sorted).
shard_info (BipartiteGraphShardInfo) – Current shard metadata.
model_comm_group (ProcessGroup, optional) – Model communication group.
cond (tuple[Tensor, Tensor], optional) – Conditioning tensors (cond_src, cond_dst).
- Returns:
Sharded (x_src_local, x_dst), edge_attr, edge_index, updated shard_info, cond subset (or None).
- Return type:
tuple[PairTensor, Tensor, Adj, BipartiteGraphShardInfo, Optional[PairTensor]]
- anemoi.models.distributed.khop_edges.sort_edges_1hop_chunks(num_nodes: int | tuple[int, int], edge_attr: Tensor, edge_index: Tensor | SparseTensor, num_chunks: int, edges_are_dst_sorted: bool = True) tuple[list[Tensor], list[Tensor | SparseTensor]]
Split edges into 1-hop neighbourhood chunks.
Supports two paths: - Fast path (edges_are_dst_sorted=True): O(1) slicing using precomputed partition splits. - Slow path (edges_are_dst_sorted=False): explicit subgraph extraction per chunk.
- Parameters:
num_nodes (Union[int, tuple[int, int]]) – Number of target nodes in the graph, or tuple (src, dst) for a bipartite graph.
edge_attr (Tensor) – Edge attributes.
edge_index (Adj) – Edge index.
num_chunks (int) – Number of chunks to split into.
edges_are_dst_sorted (bool, optional) – Whether edge_index and edge_attr are already ordered by destination node. Edges from graph providers already are. Pass False for custom full-graph edges that are not ordered this way.
- Returns:
List of edge attribute chunks and edge index chunks.
- Return type:
shapes
- class anemoi.models.distributed.shapes.GraphShardInfo(nodes: list[int] | None = None, edges: list[int] | None = None)
Bases:
object
- class anemoi.models.distributed.shapes.BipartiteGraphShardInfo(src_nodes: list[int] | None = None, dst_nodes: list[int] | None = None, edges: list[int] | None = None)
Bases:
object
utils
- anemoi.models.distributed.utils.model_is_distributed(model_comm_group: ProcessGroup | None = None) bool
Return whether a model communication group spans multiple ranks.
- anemoi.models.distributed.utils.get_memory_format(tensor: Tensor)
Helper routine to get the memory format.