runner

class anemoi.inference.runner.RunnerClasses(*, tensor_handler: type[TensorHandler] = <class 'anemoi.inference.tensors.TensorHandler'>, checkpoint: type[Checkpoint] = <class 'anemoi.inference.checkpoint.Checkpoint'>, metadata: type[Metadata] = <class 'anemoi.inference.metadata.Metadata'>)

Bases: BaseModel

Configurable class types used by the Runner. Child runners can override these with different classes.

model_config = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class anemoi.inference.runner.Runner(config: RunConfiguration, *, classes: RunnerClasses | None = None)

Bases: Context

A runner is responsible for running a model. This class provides the default forecaster implementation with rollout.

property checkpoint: Checkpoint

returns: The checkpoint object. :rtype: Checkpoint

run(*, input_states: dict[str, dict[str, Any]], lead_time: str | int | timedelta, return_numpy: bool = True) Generator[dict[str, dict[str, Any]], None, None]

Run the model.

Parameters:
  • input_states (dict[str, State]) – The input states for each dataset.

  • lead_time (Union[str, int, datetime.timedelta]) – The lead time.

  • return_numpy (bool, optional) – Whether to return the output state fields as numpy arrays, by default True. Otherwise, it will return torch tensors.

Returns:

The forecasted states.

Return type:

Generator[dict[str, State], None, None]

initial_constant_forcings_providers(constant_forcings_providers: list[Forcings]) list[Forcings]

Modify the constant forcings providers for the first step.

initial_dynamic_forcings_providers(dynamic_forcings_providers: list[Forcings]) list[Forcings]

Modify the dynamic forcings providers for the initial step of the inference process.

This method provides a hook to adjust the list of dynamic forcings before the first inference step is executed. By default, it returns the inputs unchanged, but subclasses can override this method to implement custom preprocessing or initialization logic.

prepare_output_state(output: Generator[dict[str, dict[str, Any]], None, None], return_numpy: bool) Generator[dict[str, dict[str, Any]], None, None]

Prepare the output state.

Parameters:
  • output (Generator[dict[str, State], None, None]) – Output state generator. Expects a dictionary of states keyed by dataset name. Expects fields in each state to be torch tensors with shape (values, variables).

  • return_numpy (bool) – Whether to return the output state fields as numpy arrays.

Yields:

Generator[dict[str, State], None, None] – The prepared output state.

property autocast: dtype | str

The autocast precision.

property model: Module

returns: The loaded model. :rtype: Any

predict_step(model: Module, input_tensors_torch: dict[str, Tensor], **kwargs: Any) dict[str, Tensor]

Predict the next step.

Parameters:
  • model (torch.nn.Module) – The model.

  • input_tensors_torch (dict[str, torch.Tensor]) – The input tensors for each dataset.

  • **kwargs (Any) – Additional keyword arguments that will be passed to the model’s predict_step method.

Returns:

The predicted step.

Return type:

torch.Tensor

forecast_stepper(start_date: datetime, lead_time: timedelta) Generator[tuple[timedelta, list[datetime], list[datetime], bool], None, None]

Generate step and date variables for the forecast autoregressive loop.

Parameters:
Returns:

  • step (datetime.timedelta) – Time delta since beginning of forecast

  • valid_date (list[datetime.datetime]) – Date of the forecast

  • next_date (list[datetime.datetime]) – Date used to prepare the next input tensor

  • is_last_step (bool) – True if it’s the last step of the forecast

forecast(lead_time: str, input_tensors_numpy: dict[str, ndarray[tuple[Any, ...], dtype[Any]]], input_states: dict[str, dict[str, Any]]) Generator[dict[str, dict[str, Any]], None, None]

Forecast the future states.

Parameters:
  • lead_time (str) – The lead time.

  • input_tensors_numpy (dict[str, FloatArray]) – The input tensors for each dataset, as numpy arrays with shape (multi_step_input, variables, values).

  • input_states (dict[str, State]) – The input states for each dataset.

Returns:

The forecasted states for each dataset.

Return type:

dict[str, State]

input_state_hook(input_state: dict[str, Any]) None

Hook used by coupled runners to send the input state.

output_state_hook(state: dict[str, Any]) None

Hook used by coupled runners to send the input state.

complete_forecast_hook() None

Hook called at the end of the forecast.

execute() None

Execute the runner.

runners

default

class anemoi.inference.runners.default.DefaultRunner(config: RunConfiguration, *, classes: RunnerClasses | None = None)

Bases: Runner

The default runner is a forecaster.

external_graph

anemoi.inference.runners.external_graph.contains_any(key, specifications)
anemoi.inference.runners.external_graph.update_state_dict(model, external_state_dict, keywords='', ignore_mismatched_layers=False, ignore_additional_layers=False)

Update the model’s stated_dict with entries from an external state_dict. Only entries whose keys contain the specified keywords are considered.

anemoi.inference.runners.external_graph.get_updated_supporting_arrays(update_supporting_arrays: dict[Literal['graph', 'file'], dict[str, str]], graph: Any) dict

Get an update dict for the supporting arrays.

Allows to update the supporting arrays in the checkpoint metadata from the graph data, and from files, which will be loaded using torch.load() or np.load().

parallel

anemoi.inference.runners.parallel.create_parallel_runner(config: Configuration, client_factory: ComputeClientFactory) None

Creates and runs a parallel runner.

Parameters:
  • config (Configuration) – The configuration object for the runner.

  • client_factory (ComputeClientFactory) – The compute client factory to use for distributed inference.

class anemoi.inference.runners.parallel.NoOp

Bases: object

No operation class used when returning after spawning processes.

execute(*a, **k) None
class anemoi.inference.runners.parallel.ParallelRunnerMixin(config: Any, compute_client: ComputeClient | None = None, **kwargs)

Bases: Runner

Runner which splits a model over multiple devices. Should be mixed in with a base runner class.

seed(comm_group: ProcessGroup | None) None

Seed all processes in the cluster to ensure reproducibility.

predict_step(model: Any, input_tensor_torch: Tensor, **kwargs: Any) Tensor

Performs a prediction step.

Parameters:
  • model (Any) – The model to use for prediction.

  • input_tensor_torch (torch.Tensor) – The input tensor for the model.

  • **kwargs (Any) – Additional arguments for the prediction step.

Returns:

The prediction result.

Return type:

torch.Tensor

complete_forecast_hook() None

Hook called at the end of the forecast.

create_output(*args, **kwargs) Output

Creates the real output on rank 0 and a none on the others.

simple

class anemoi.inference.runners.simple.SimpleTensorHandler(context: Runner, metadata: Metadata, constant_forcings_input: Input, dynamic_forcings_input: Input, boundary_forcings_input: Input, trace_path: str | None = None)

Bases: TensorHandler

This tensor handler only supports computed forcings.

create_constant_coupled_forcings(variables: list[str], mask: ndarray[tuple[Any, ...], dtype[Any]]) list[Forcings]
create_dynamic_coupled_forcings(variables: list[str], mask: ndarray[tuple[Any, ...], dtype[Any]]) list[Forcings]
class anemoi.inference.runners.simple.SimpleRunner(checkpoint: str, **kwargs)

Bases: Runner

Runner for the low-level API. The user provides a prepared State object and directly calls the Runner.run generator. The input State must contain all fields expected by the model, except for computed forcings which will be loaded by the runner. The user is responsible for providing any constant and dynamic forcings in the input State and during rollout.

execute() None

Execute the runner.

run(*, input_states: dict[str, dict[str, Any]] | dict[str, Any], **kwargs) Generator[dict[str, dict[str, Any]] | dict[str, Any], None, None]

Run the model.

Parameters:
  • input_states (dict[str, State]) – The input states for each dataset.

  • lead_time (Union[str, int, datetime.timedelta]) – The lead time.

  • return_numpy (bool, optional) – Whether to return the output state fields as numpy arrays, by default True. Otherwise, it will return torch tensors.

Returns:

The forecasted states.

Return type:

Generator[dict[str, State], None, None]

temporal_downscaler

class anemoi.inference.runners.temporal_downscaler.TemporalDownscalerMetadata(metadata: dict[str, Any], supporting_arrays: dict[str, ndarray[tuple[Any, ...], dtype[Any]]] = {})

Bases: Metadata

property lagged: list[timedelta]

Return the list of steps for the multi_step_input fields. A temporal downscaler looks ahead instead of backwards.

class anemoi.inference.runners.temporal_downscaler.TemporalDownscalerMultiOutRunner(config: RunConfiguration)

Bases: Runner

A runner to be used for inference of a trained temporal downscaler with multiple output steps. Unlike the single output, the temporal downscaling is all done as one step. Can be applied directly on analysis/forecast data without being coupled to a forecasting model.

This runner makes the following assumptions:
  • The model was trained with two input states: (t and t+temporal_downscaling_window)

  • The output states are between these two states and are set by “frequency” in the config

patch_data_request(request: dict, dataset_name: str) dict

Set sensible defaults when this runner is used with the retrieve command.

execute() None

Execute the temporal downscaler runner with support for multiple temporal downscaling periods.

temporal_downscaler_stepper(start_date: datetime) Generator[tuple[timedelta, datetime], None, None]

Generate step and date variables for the temporal downscaling loop.

Parameters:

start_date (datetime.datetime) – Input start date

Returns:

  • step (datetime.timedelta) – Time delta between the target index date and the start date

  • date (datetime.datetime) – Date of the zeroth index of the input tensor

forecast(lead_time: timedelta, input_tensors_numpy: dict[str, ndarray[tuple[Any, ...], dtype[Any]]], input_states: dict[str, dict[str, Any]]) Generator[dict[str, dict[str, Any]], None, None]

Temporally downscale between the current and future state in the input tensor.

Parameters:
  • lead_time (datetime.timedelta) – Unused. This method processes one temporal downscaling window at a time.

  • input_tensors_numpy (dict[str, FloatArray]) – The input tensors for each dataset, as numpy arrays with shape (multi_step_input, variables, values).

  • input_states (dict[str, State]) – The input states for each dataset. It contains both input dates defined by the config explicit_times.input

Returns:

The temporally downscaled states for each dataset.

Return type:

dict[str, State]

testing

class anemoi.inference.runners.testing.NoModelMixing

Bases: object

property model: Module
class anemoi.inference.runners.testing.NoModelRunner(config: RunConfiguration, *, classes: RunnerClasses | None = None)

Bases: NoModelMixing, DefaultRunner

Runner for running tests.

Inherits from DefaultRunner.