clusters

client

class anemoi.inference.clusters.client.ClusterClientProtocol(*args, **kwargs)

Bases: Protocol

classmethod used() bool

Check if this client is valid in the current environment.

class anemoi.inference.clusters.client.ComputeClient(world_size: int, local_rank: int, global_rank: int, master_addr: str, master_port: int, process_group: 'torch.distributed.ProcessGroup | None')

Bases: object

world_size: int
local_rank: int
global_rank: int
master_addr: str
master_port: int
process_group: ProcessGroup | None
property is_master: bool

Return True if the current process is the master process.

class anemoi.inference.clusters.client.ComputeClientFactory

Bases: ABC

Abstract factory class for compute client creation.

priority: int = 0

Priority of the cluster client. Higher values indicate higher priority.

create_client() ComputeClient

Create and return a ComputeClient instance.

abstractmethod classmethod used() bool

Check if this client is valid in the current environment.

property init_method: str

Return the initialisation method string for distributed computing.

property backend: str

Return the backend for distributed computing.

create_model_comm_group() ProcessGroup | None

Create the communication group for model parallelism.

property is_master: bool

Return True if the current process is the master process.

abstract property local_rank: int

Return the rank of the current process.

property device_index: int

Return the device index for the current process, defaults to local rank.

abstract property global_rank: int

Return the rank of the current process.

abstract property world_size: int

Return the total number of processes in the cluster.

abstract property master_addr: str

Return the master address.

abstract property master_port: int

Return the master port.

distributed

class anemoi.inference.clusters.distributed.DistributedCluster

Bases: MappingCluster

Distributed cluster that uses environment variables for distributed setup.

classmethod used() bool

Check if this client is valid in the current environment.

manual

class anemoi.inference.clusters.manual.ManualSpawner(world_size: int, port: int | None = None)

Bases: ComputeSpawner

Manual cluster that uses user-defined world size for distributed setup.

Example usage

In the config ```yaml cluster:

manual:

world_size: 4 port: 12345

```

classmethod used() bool

Check if this client is valid in the current environment.

spawn(fn: Callable[[Configuration, ComputeClientFactory], None], config: Configuration) None

Spawn processes for parallel execution.

Parameters:
  • fn (SPAWN_FUNCTION) – The function to run in each process. Expects to receive the configuration and compute client factory as arguments.

  • config (Configuration) – The configuration object for the runner.

teardown() None

Tear down the cluster environment and join spawned processes.

class anemoi.inference.clusters.manual.ManualClient(world_size: int, local_rank: int, global_rank: int, master_addr: str, master_port: int)

Bases: ComputeClientFactory

classmethod used() bool

Check if this client is valid in the current environment.

property world_size: int

Return the total number of processes in the cluster.

property global_rank: int

Return the rank of the current process.

property local_rank: int

Return the rank of the current process.

property master_addr: str

Return the master address.

property master_port: int

Return the master port.

mapping

class anemoi.inference.clusters.mapping.EnvMapping(local_rank: str | list[str], global_rank: str | list[str], world_size: str | list[str], master_addr: str | list[str], master_port: str | list[str], backend: str | None = None, init_method: str = 'env://')

Bases: object

Dataclass to hold environment variable mappings for cluster configuration.

Elements can be either strings or lists of strings. If a list is provided, the first found environment variable will be used.

local_rank: str | list[str]
global_rank: str | list[str]
world_size: str | list[str]
master_addr: str | list[str]
master_port: str | list[str]
backend: str | None = None
init_method: str = 'env://'
get_env(key: str, default: Any = None)

Get the environment variable value for the given key.

is_set(keys: list[str] | None = None) bool

Check if all environment variables for the given keys are set.

Parameters:

keys (list[str] | None, optional) – List of keys to check, by default None (checks all keys)

Returns:

True if all environment variables are set, False otherwise

Return type:

bool

class anemoi.inference.clusters.mapping.MappingCluster(mapping: dict | EnvMapping)

Bases: ComputeClientFactory

Custom cluster that uses user-defined environment variables for distributed setup.

Example usage

```python from anemoi.inference.clusters.mapping import MappingCluster cluster = MappingCluster(mapping={

“local_rank”: “LOCAL_RANK_ENV_VAR”, “global_rank”: “GLOBAL_RANK_ENV_VAR”, “world_size”: “WORLD_SIZE_ENV_VAR”, “master_addr”: “MASTER_ADDR_ENV_VAR”, “master_port”: “MASTER_PORT_ENV_VAR”, “init_method”: “env://”,

})

property init_method: str

Return the initialisation method string for distributed computing.

property backend: str

Return the backend string for distributed computing.

property world_size: int

Return the total number of processes in the cluster.

property global_rank: int

Return the rank of the current process.

property local_rank: int

Return the rank of the current process.

property master_addr: str

Return the master address.

property master_port: int

Return the master port.

classmethod used() bool

Check if this client is valid in the current environment.

class anemoi.inference.clusters.mapping.CustomCluster(**kwargs)

Bases: MappingCluster

Custom cluster that uses user-defined environment variables for distributed setup.

Example usage

In the config ```yaml runner:

parallel:
cluster:
custom:

local_rank: LOCAL_RANK_ENV_VAR global_rank: GLOBAL_RANK_ENV_VAR world_size: WORLD_SIZE_ENV_VAR master_addr: MASTER_ADDR_ENV_VAR master_port: MASTER_PORT_ENV_VAR init_method: env://

```

mpi

class anemoi.inference.clusters.mpi.MPICluster(use_mpi_backend: bool = False, **kwargs)

Bases: MappingCluster

MPI cluster that uses MPI environment variables for distributed setup.

classmethod used() bool

Check if this client is valid in the current environment.

property backend: str

Return the backend string for distributed computing.

create_model_comm_group() ProcessGroup | None

Create the communication group for model parallelism.

slurm

class anemoi.inference.clusters.slurm.SlurmCluster

Bases: MappingCluster

Slurm cluster that uses SLURM environment variables for distributed setup.

priority: int = 10

Priority of the cluster client. Higher values indicate higher priority.

classmethod used() bool

Check if this client is valid in the current environment.

property master_addr: str

Return the master address.

property master_port: int

Return the master port.

spawner

class anemoi.inference.clusters.spawner.ComputeSpawner

Bases: ABC

Abstract base class for cluster operations for parallel execution.

abstractmethod classmethod used() bool

Check if this client is valid in the current environment.

abstractmethod spawn(fn: Callable[[Configuration, ComputeClientFactory], None], config: Configuration) None

Spawn processes for parallel execution.

Parameters:
  • fn (SPAWN_FUNCTION) – The function to run in each process. Expects to receive the configuration and compute client factory as arguments.

  • config (Configuration) – The configuration object for the runner.

abstractmethod teardown() None

Tear down the cluster environment.