Distributed

graph

anemoi.models.distributed.graph.ensure_sharded(x: Tensor, dim: int, shard_sizes: list[int] | None, model_comm_group: ProcessGroup | None = None) tuple[Tensor, list[int] | None]

Ensure that the input tensor is sharded along the specified dimension.

If shard_sizes is not None the tensor is assumed to already be sharded and a consistency check is performed. Otherwise the tensor is sharded using balanced partitioning and the resulting sizes are returned.

Parameters:
  • x (Tensor) – Input tensor.

  • dim (int) – Dimension along which to shard.

  • shard_sizes (ShardSizes) – Per-rank partition sizes, or None if the tensor is replicated.

  • model_comm_group (ProcessGroup, optional) – Model communication group.

Returns:

The (possibly sharded) tensor and the shard sizes.

Return type:

tuple[Tensor, ShardSizes]

anemoi.models.distributed.graph.shard_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup, gather_in_backward: bool = True) Tensor

Shard tensor.

Keeps only part of the tensor that is relevant for the current rank.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to shard

  • sizes (ShardSizes) – Per-rank shard sizes

  • mgroup (ProcessGroup) – model communication group

  • gather_in_backward (bool) – perform gather in backward, default True

Returns:

Sharded tensor.

Return type:

Tensor

anemoi.models.distributed.graph.gather_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup) Tensor

Gather tensor.

Gathers tensor shards from ranks.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to gather

  • sizes (ShardSizes) – Per-rank shard sizes

  • mgroup (ProcessGroup) – model communication group

Returns:

Gathered tensor.

Return type:

Tensor

anemoi.models.distributed.graph.reduce_tensor(input_: Tensor, mgroup: ProcessGroup) Tensor

Reduce tensor.

Reduces tensor across ranks.

Parameters:
  • input (Tensor) – Input

  • mgroup (ProcessGroup) – model communication group

Returns:

Reduced tensor.

Return type:

Tensor

anemoi.models.distributed.graph.sync_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup, gather_in_fwd: bool = True) Tensor

Sync tensor.

Perform a gather in the forward pass and an allreduce followed by a split in the backward pass.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to gather

  • sizes (ShardSizes) – Per-rank shard sizes

  • mgroup (ProcessGroup) – model communication group

Returns:

Synced tensor.

Return type:

Tensor

anemoi.models.distributed.graph.reduce_shard_tensor(input_: Tensor, dim: int, sizes: list[int] | None, mgroup: ProcessGroup) Tensor

Reduces and then shards tensor.

Perform an allreduce followed by a split in the forward pass and a gather in the backward pass.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to gather

  • sizes (ShardSizes) – Per-rank shard sizes

  • mgroup (ProcessGroup) – model communication group

Returns:

Reduced sharded tensor.

Return type:

Tensor

anemoi.models.distributed.graph.all_to_all_transpose(input_: Tensor, dim_split: int, split_sizes: list[int] | None, dim_concat: int, concat_sizes: list[int] | None, mgroup: ProcessGroup) Tensor

All-to-all transpose.

Switch the tensor from a dim_concat-sharded to a dim_split-sharded tensor via all-to-all transpose, reverse all-to-all in the backwards pass.

Parameters:
  • input (Tensor) – Input tensor to be transposed.

  • dim_split (int) – Dimension along which to split the input tensor.

  • split_sizes (ShardSizes) – Shapes of the split tensors.

  • dim_concat (int) – Dimension along which to concatenate the transposed tensors.

  • concat_sizes (ShardSizes) – Shapes of the concatenated tensors.

  • mgroup (ProcessGroup) – Model communication group.

Returns:

Transposed tensor.

Return type:

Tensor

khop_edges

anemoi.models.distributed.khop_edges.sort_edge_index_by_dst(edge_index: Tensor | SparseTensor, max_value: int = None) Tuple[Tensor | SparseTensor, Tensor]

Sort edge indices by destination node.

anemoi.models.distributed.khop_edges.is_edge_index_dst_sorted(edge_index: Tensor | SparseTensor) bool

Check whether edge_index is sorted by destination node (edge_index[1]).

class anemoi.models.distributed.khop_edges.GraphPartition(num_nodes: tuple[int, int], num_edges: int, num_parts: int, dst_splits: list[int], edge_splits: list[int])

Bases: object

Precomputed partitioning metadata for a graph with dst-sorted edges.

Enables O(1) slicing for both distributed sharding and local chunking by exploiting the fact that edges are already sorted by destination node.

Parameters:
  • num_nodes (tuple[int, int]) – Number of (src, dst) nodes in the full graph.

  • num_edges (int) – Total number of edges.

  • num_parts (int) – Number of partitions (= world size for sharding, or num_parts for local chunking).

  • dst_splits (list[int]) – Per-partition destination node counts.

  • edge_splits (list[int]) – Per-partition edge counts (derived from dst-sorted edge structure).

materialise(partition_id: int, x: Tuple[Tensor, Tensor], edge_attr: Tensor, edge_index: Tensor | SparseTensor, cond: Tuple[Tensor, Tensor] | None = None) tuple[Tuple[Tensor, Tensor], Tensor, Tensor | SparseTensor, Tensor, Tuple[Tensor, Tensor] | None]

Materialise a single partition by slicing nodes, edges and conditioning.

Pure local operation — no communication. Suitable for chunking within a single device.

Parameters:
  • partition_id (int) – The partition to materialise.

  • x (PairTensor) – Node features (src, dst).

  • edge_attr (Tensor) – Edge attributes.

  • edge_index (Adj) – Edge indices (assumed dst-sorted).

  • cond (tuple[Tensor, Tensor], optional) – Conditioning tensors (cond_src, cond_dst).

Returns:

(x_src_subset, x_dst_subset), edge_attr_subset, edge_index_relabeled, cond subset (or None).

Return type:

tuple[PairTensor, Tensor, Adj, Optional[PairTensor]]

anemoi.models.distributed.khop_edges.build_graph_partition(edge_index: Tensor | SparseTensor, num_parts: int, num_nodes: tuple[int, int]) GraphPartition

Build graph partitioning information from a dst-sorted edge_index.

Parameters:
  • edge_index (Adj) – The edge index tensor (must be sorted by destination node).

  • num_parts (int) – The number of chunks to partition the graph into.

  • num_nodes (tuple[int, int]) – The number of (src, dst) nodes in the graph.

Returns:

The graph partitioning information.

Return type:

GraphPartition

anemoi.models.distributed.khop_edges.build_graph_partition_from_shard_info(edge_index: Tensor | SparseTensor, x: Tuple[Tensor, Tensor], shard_info: BipartiteGraphShardInfo, model_comm_group: ProcessGroup | None = None) GraphPartition

Build a GraphPartition for distributed sharding from current shard metadata.

Derives num_nodes from shard_info and tensor shapes, and sets num_parts to the communication group size.

Parameters:
  • edge_index (Adj) – The edge index tensor (must be sorted by destination node).

  • x (PairTensor) – Node features (src, dst), used to infer sizes when not sharded.

  • shard_info (BipartiteGraphShardInfo) – Current shard metadata.

  • model_comm_group (ProcessGroup, optional) – Model communication group.

Returns:

The graph partitioning information.

Return type:

GraphPartition

anemoi.models.distributed.khop_edges.ensure_edges_are_dst_sorted(edge_attr: Tensor, edge_index: Tensor | SparseTensor, *, num_dst: int, edges_are_sharded: bool, model_comm_group: ProcessGroup | None = None, edges_are_dst_sorted: bool = True) tuple[Tensor, Tensor | SparseTensor]

Ensure edge tensors are dst-sorted before GraphTransformer attention.

anemoi.models.distributed.khop_edges.shard_edges_1hop(edge_attr: Tensor, edge_index: Tensor | SparseTensor, src_size: int, dst_size: int, model_comm_group: ProcessGroup | None, edges_are_dst_sorted: bool = True) tuple[Tensor, Tensor | SparseTensor, list[int] | None]

Sort and shard edges for 1-hop sharding.

Parameters:
  • edge_attr (Tensor) – Edge attributes.

  • edge_index (Adj) – Edge index.

  • src_size (int) – Number of source nodes.

  • dst_size (int) – Number of destination nodes.

  • model_comm_group (ProcessGroup, optional) – Model communication group.

  • edges_are_dst_sorted (bool, optional) – Whether edge_index and edge_attr are already ordered by destination node. Edges from graph providers already are. Pass False for custom full-graph edges that are not ordered this way. If edges are already sharded, each rank is expected to already have the right edges for its local destination nodes.

Returns:

Sharded edge_attr, sharded edge_index, and edge_shard_sizes.

Return type:

tuple[Tensor, Adj, ShardSizes]

anemoi.models.distributed.khop_edges.shard_graph_to_local(partition: GraphPartition, x: Tuple[Tensor, Tensor], edge_attr: Tensor, edge_index: Tensor | SparseTensor, shard_info: BipartiteGraphShardInfo, model_comm_group: ProcessGroup | None = None, cond: Tuple[Tensor, Tensor] | None = None) tuple[Tuple[Tensor, Tensor], Tensor, Tensor | SparseTensor, BipartiteGraphShardInfo, Tuple[Tensor, Tensor] | None]

Shard graph tensors to the local rank using precomputed partition metadata.

Handles all communication (sync src, shard dst/edges) and returns the local subgraph with updated shard metadata.

Parameters:
  • partition (GraphPartition) – Precomputed partition metadata.

  • x (PairTensor) – Node features (src, dst).

  • edge_attr (Tensor) – Edge attributes.

  • edge_index (Adj) – Edge indices (assumed dst-sorted).

  • shard_info (BipartiteGraphShardInfo) – Current shard metadata.

  • model_comm_group (ProcessGroup, optional) – Model communication group.

  • cond (tuple[Tensor, Tensor], optional) – Conditioning tensors (cond_src, cond_dst).

Returns:

Sharded (x_src_local, x_dst), edge_attr, edge_index, updated shard_info, cond subset (or None).

Return type:

tuple[PairTensor, Tensor, Adj, BipartiteGraphShardInfo, Optional[PairTensor]]

anemoi.models.distributed.khop_edges.sort_edges_1hop_chunks(num_nodes: int | tuple[int, int], edge_attr: Tensor, edge_index: Tensor | SparseTensor, num_chunks: int, edges_are_dst_sorted: bool = True) tuple[list[Tensor], list[Tensor | SparseTensor]]

Split edges into 1-hop neighbourhood chunks.

Supports two paths: - Fast path (edges_are_dst_sorted=True): O(1) slicing using precomputed partition splits. - Slow path (edges_are_dst_sorted=False): explicit subgraph extraction per chunk.

Parameters:
  • num_nodes (Union[int, tuple[int, int]]) – Number of target nodes in the graph, or tuple (src, dst) for a bipartite graph.

  • edge_attr (Tensor) – Edge attributes.

  • edge_index (Adj) – Edge index.

  • num_chunks (int) – Number of chunks to split into.

  • edges_are_dst_sorted (bool, optional) – Whether edge_index and edge_attr are already ordered by destination node. Edges from graph providers already are. Pass False for custom full-graph edges that are not ordered this way.

Returns:

List of edge attribute chunks and edge index chunks.

Return type:

tuple[list[Tensor], list[Adj]]

anemoi.models.distributed.khop_edges.drop_unconnected_src_nodes(x_src: Tensor, edge_index: Tensor | SparseTensor, num_nodes: tuple[int, int]) tuple[Tensor, Tensor | SparseTensor, Tensor]

Drop unconnected nodes from x_src and relabel edges.

Parameters:
  • x_src (Tensor) – Source node features.

  • edge_index (Adj) – Edge index.

  • num_nodes (tuple[int, int]) – Number of nodes in graph (src, dst).

Returns:

Reduced node features, relabeled edge index, and indices of connected source nodes.

Return type:

tuple[Tensor, Adj, Tensor]

shapes

class anemoi.models.distributed.shapes.GraphShardInfo(nodes: list[int] | None = None, edges: list[int] | None = None)

Bases: object

class anemoi.models.distributed.shapes.BipartiteGraphShardInfo(src_nodes: list[int] | None = None, dst_nodes: list[int] | None = None, edges: list[int] | None = None)

Bases: object

anemoi.models.distributed.shapes.get_shard_sizes(tensor: Tensor, dim: int, model_comm_group: ProcessGroup | None = None) list[int] | None

Get per-rank shard sizes for a tensor split along a specific dimension.

anemoi.models.distributed.shapes.expand_shard_sizes_to_shapes(tensor: Tensor, dim: int, shard_sizes_dim: list[int]) list[list[int]]

Expand per-dimension shard sizes to full per-rank tensor shapes.

utils

anemoi.models.distributed.utils.model_is_distributed(model_comm_group: ProcessGroup | None = None) bool

Return whether a model communication group spans multiple ranks.

anemoi.models.distributed.utils.get_memory_format(tensor: Tensor)

Helper routine to get the memory format.