Distributed

graph

anemoi.models.distributed.graph.shard_tensor(input_: Tensor, dim: int, shapes: tuple, mgroup: ProcessGroup, gather_in_backward: bool = True) Tensor

Shard tensor.

Keeps only part of the tensor that is relevant for the current rank.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to shard

  • shapes (tuple) – Shapes of sharded Tensors

  • mgroup (ProcessGroup) – model communication group

  • gather_in_backward (bool) – perform gather in backward, default True

Returns:

Sharded tensor.

Return type:

Tensor

anemoi.models.distributed.graph.gather_tensor(input_: Tensor, dim: int, shapes: tuple, mgroup: ProcessGroup) Tensor

Gather tensor.

Gathers tensor shards from ranks.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to gather

  • shapes (tuple) – Shapes of sharded Tensors

  • mgroup (ProcessGroup) – model communication group

Returns:

Gathered tensor.

Return type:

Tensor

anemoi.models.distributed.graph.reduce_tensor(input_: Tensor, mgroup: ProcessGroup) Tensor

Reduce tensor.

Reduces tensor across ranks.

Parameters:
  • input (Tensor) – Input

  • mgroup (ProcessGroup) – model communication group

Returns:

Reduced tensor.

Return type:

Tensor

anemoi.models.distributed.graph.sync_tensor(input_: Tensor, dim: int, shapes: tuple, mgroup: ProcessGroup) Tensor

Sync tensor.

Perform a gather in the forward pass and an allreduce followed by a split in the backward pass.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to gather

  • shapes (tuple) – Shapes of sharded Tensors

  • mgroup (ProcessGroup) – model communication group

Returns:

Synced tensor.

Return type:

Tensor

anemoi.models.distributed.graph.reduce_shard_tensor(input_: Tensor, dim: int, shapes: tuple, mgroup: ProcessGroup) Tensor

Reduces and then shards tensor.

Perform an allreduce followed by a split in the forward pass and a gather in the backward pass.

Parameters:
  • input (Tensor) – Input

  • dim (int) – dimension along which to gather

  • shapes (tuple) – Shapes of sharded Tensors

  • mgroup (ProcessGroup) – model communication group

Returns:

Reduced sharded tensor.

Return type:

Tensor

khop_edges

anemoi.models.distributed.khop_edges.get_k_hop_edges(nodes: Tensor, edge_attr: Tensor, edge_index: Tensor | SparseTensor, num_hops: int = 1) tuple[Tensor | SparseTensor, Tensor]

Return 1 hop subgraph.

Parameters:
  • nodes (Tensor) – destination nodes

  • edge_attr (Tensor) – edge attributes

  • edge_index (Adj) – edge index

  • num_hops (int, Optional, by default 1) – number of required hops

Returns:

K-hop subgraph of edge index and edge attributes

Return type:

tuple[Adj, Tensor]

anemoi.models.distributed.khop_edges.sort_edges_1hop_sharding(num_nodes: int | tuple[int, int], edge_attr: Tensor, edge_index: Tensor | SparseTensor, mgroup: ProcessGroup | None = None) tuple[Tensor | SparseTensor, Tensor, list, list]

Rearanges edges into 1 hop neighbourhoods for sharding across GPUs.

Parameters:
  • num_nodes (Union[int, tuple[int, int]]) – Number of (target) nodes in Graph

  • edge_attr (Tensor) – edge attributes

  • edge_index (Adj) – edge index

  • mgroup (ProcessGroup) – model communication group

Returns:

edges sorted according to k hop neigh., edge attributes of sorted edges, shapes of edge indices for partitioning between GPUs, shapes of edge attr for partitioning between GPUs

Return type:

tuple[Adj, Tensor, list, list]

anemoi.models.distributed.khop_edges.sort_edges_1hop_chunks(num_nodes: int | tuple[int, int], edge_attr: Tensor, edge_index: Tensor | SparseTensor, num_chunks: int) tuple[list[Tensor], list[Tensor | SparseTensor]]

Rearanges edges into 1 hop neighbourhood chunks.

Parameters:
  • num_nodes (Union[int, tuple[int, int]]) – Number of (target) nodes in Graph, tuple for bipartite graph

  • edge_attr (Tensor) – edge attributes

  • edge_index (Adj) – edge index

  • num_chunks (int) – number of chunks used if mgroup is None

Returns:

list of sorted edge attribute chunks, list of sorted edge_index chunks

Return type:

tuple[list[Tensor], list[Adj]]

shapes

anemoi.models.distributed.shapes.get_shape_shards(tensor: Tensor, dim: int, model_comm_group: ProcessGroup | None = None) list

Get shape of tensor shards.

anemoi.models.distributed.shapes.change_channels_in_shape(shape_list: list, channels: int) list

Change the number of channels in the tensor shape definition list.

transformer

anemoi.models.distributed.transformer.shard_heads(input_: Tensor, shapes: list, mgroup: ProcessGroup) Tensor

Sync tensor.

Gathers e.g query, key or value tensor along sequence dimension via all to all communication and shards along head dimension for parallel self-attention computation. Expected format is (batch_size, … heads, sequence_length, channels)

Parameters:
  • input (Tensor) – Input

  • shapes (list) – shapes of shards

  • mgroup (ProcessGroup) – model communication group

Returns:

Sharded heads.

Return type:

Tensor

anemoi.models.distributed.transformer.shard_sequence(input_: Tensor, shapes: list, mgroup: ProcessGroup) Tensor

Sync tensor.

Gathers e.g query, key or value tensor along head dimension via all to all communication and shards along sequence dimension for parallel mlp and layernorm computation. Expected format is (batch_size, … heads, sequence_length, channels)

Parameters:
  • input (Tensor) – Input

  • shapes (list) – shapes of shards

  • mgroup (ProcessGroup) – model communication group

Returns:

Sharded sequence

Return type:

Tensor

utils

anemoi.models.distributed.utils.get_memory_format(tensor: Tensor)

Helper routine to get the memory format.