outputs

grib

class anemoi.inference.outputs.grib.HindcastOutput(reference_year: int)

Bases: object

Hindcast output class.

class anemoi.inference.outputs.grib.Matching(config)

Bases: object

patch(variable, request)
class anemoi.inference.outputs.grib.PatchOutput(*patches)

Bases: object

anemoi.inference.outputs.grib.modifier_factory(modifiers: list) list

Create a list of modifier instances.

Parameters:

modifiers (list) – A list of modifier configurations.

Returns:

A list of modifier instances.

Return type:

list

class anemoi.inference.outputs.grib.BaseGribOutput(context: dict, metadata: Metadata, *, post_processors: list[str | dict[str, Any]] | None = None, encoding: dict[str, Any] | None = None, templates: list[str] | str | None = None, grib1_keys: dict[str, Any] | None = None, grib2_keys: dict[str, Any] | None = None, modifiers: list[str] | None = None, variables: list[str] | None = None, output_frequency: int | None = None, write_initial_state: bool | None = None, negative_step_mode: Literal['error', 'write', 'skip'] = 'error')

Bases: Output

Handles grib.

write_initial_state(state: dict[str, Any]) None

Write the initial step of the state.

Parameters:

state (State) – The state object.

write_step(state: dict[str, Any]) None

Write a step of the state.

Parameters:

state (State) – The state object.

abstractmethod write_message(message: ndarray[tuple[Any, ...], dtype[Any]], *args: Any, **kwargs: Any) None

Write a message to the grib file.

Parameters:
  • message (FloatArray) – The message array.

  • *args (Any) – Additional arguments.

  • **kwargs (Any) – Additional keyword arguments.

template(state: dict[str, Any], name: str) object

Get the template for a variable.

Parameters:
  • state (State) – The state object.

  • name (str) – The variable name.

Returns:

The template object.

Return type:

object

template_lookup(name: str) dict

Lookup the template for a variable.

Parameters:

name (str) – The variable name.

Returns:

The template dictionary.

Return type:

dict

gribfile

class anemoi.inference.outputs.gribfile.ArchiveCollector

Bases: object

Collects archive requests.

UNIQUE = {'date', 'expver', 'hdate', 'referenceDate', 'stream', 'time', 'type'}
add(field: dict[str, Any]) None

Add a field to the archive request.

Parameters:

field (Dict[str,Any]) – The field dictionary.

property request: dict[str, Any]

Get the archive request.

class anemoi.inference.outputs.gribfile.GribIoOutput(context: Context, metadata: Metadata, *, out: Path | IOBase, post_processors: list[str | dict[str, Any]] | None = None, encoding: dict[str, Any] | None = None, archive_requests: dict[str, Any] | None = None, check_encoding: bool = True, templates: list[str] | str | None = None, grib1_keys: dict[str, Any] | None = None, grib2_keys: dict[str, Any] | None = None, modifiers: list[str] | None = None, variables: list[str] | None = None, output_frequency: int | None = None, write_initial_state: bool | None = None, split_output: bool = True, negative_step_mode: Literal['error', 'write', 'skip'] = 'error', missing_value: int | float = -9999)

Bases: BaseGribOutput

Output class for grib io.

This class handles writing grib and collecting archive requests. It extends the BaseGribOutput class and implements the write_message method.

write_message(message: ndarray[tuple[Any, ...], dtype[Any]], template: Field, **keys: dict[str, Any]) None

Write a message to the grib file.

Parameters:
  • message (FloatArray) – The message array.

  • template (ekd.Field) – A ekd.Field use as a template for GRIB encoding.

  • **keys (Dict[str, Any]) – Additional keys for the message.

collect_archive_requests(written: tuple, template: object, **keys: Any) None

Collect archive requests.

Parameters:
  • written (tuple) – The written tuple.

  • template (object) – The template object.

  • **keys (Any) – Additional keys for the archive requests.

close() None

Close the grib file.

gribmemory

class anemoi.inference.outputs.gribmemory.GribMemoryOutput(context: Context, metadata: Metadata, *, out: IOBase, post_processors: list[str | dict[str, Any]] | None = None, encoding: dict[str, Any] | None = None, archive_requests: dict[str, Any] | None = None, check_encoding: bool = True, templates: list[str] | str | None = None, grib1_keys: dict[str, Any] | None = None, grib2_keys: dict[str, Any] | None = None, modifiers: list[str] | None = None, variables: list[str] | None = None, output_frequency: int | None = None, write_initial_state: bool | None = None)

Bases: GribIoOutput

Handles grib files in memory.

netcdf

none

class anemoi.inference.outputs.none.NoneOutput(context: Context, metadata: Metadata, *, variables: list[str] | None = None, post_processors: list[str | dict[str, Any]] | None = None, output_frequency: int | None = None, write_initial_state: bool | None = None)

Bases: Output

None output class.

write_step(state: dict[str, Any]) None

Write a step of the state.

Parameters:

state (State) – The state dictionary.

parallel

class anemoi.inference.outputs.parallel.MessageType(value)

Bases: str, Enum

Types of messages sent from the main process to the writer processes. Used for logging and control flow in the writer loop.

TERMINATE = 'Terminate'
INITIAL_STATE = 'InitialState'
STATE = 'State'
OPEN = 'Open'
class anemoi.inference.outputs.parallel.ParallelOutput(context: Context, metadata: Metadata, *, output: Output | Any | None = None, num_writers: int = 1, **kwargs: Any)

Bases: Output

Wraps another Output and offloads write_state calls to one or more forked writer processes. The output is split along the field dimension and each chunk is sent to a different writer process via multiprocessing queues. Each writer process writes the initial state into its own file.

When writing a file output, a suffix ‘_{writer_id}’ is appended to the file name to avoid conflicts between writers.

Usage in YAML:

output:
  parallel:
    num_writers: 2
    output:
      grib:
        path: output.grib
This yaml will result in the following outputs being written:
  • output_w0.grib

  • output_w1.grib

open(state: dict[str, Any]) None

Spawn the writer processes during open() instead of __init__() to ensure they have access to the full context.

Pass the open() message to the writers so that they can call the output-appropriate open() method.

write_step(state: dict[str, Any]) None

Write a step of the state.

Parameters:

state (State) – The state to write.

write_state(state: dict[str, Any], message=MessageType.STATE) None

Write the state, dispatching to writer processes when enabled.

dispatch_state_to_writers(state: dict[str, Any], message=MessageType.STATE) None

Send the state to each writer process via multiprocessing queues.

Takes an optional ‘message’ argument to indicate the type of message being sent, which is used for control flow in the writer loop.

close() None

Terminate writer processes, then close the wrapped output.

print_summary(depth: int = 0) None

Print a summary of the output configuration.

Parameters:

depth (int, optional) – The indentation depth for the summary, by default 0.

write_initial_state(state: dict[str, Any]) None

Write the initial state.

plot

anemoi.inference.outputs.plot.fix(lons: ndarray[tuple[Any, ...], dtype[Any]]) ndarray[tuple[Any, ...], dtype[Any]]

Fix longitudes greater than 180 degrees.

Parameters:

lons (FloatArray) – Array of longitudes.

Returns:

Fixed array of longitudes.

Return type:

FloatArray

printer

anemoi.inference.outputs.printer.print_state(state: dict[str, ~typing.Any], print: ~collections.abc.Callable[[...], None] = <built-in function print>, max_lines: int = 4, variables: list[str] | ~typing.Literal['all'] | None = None) None

Print the state.

Parameters:
  • state (State) – The state dictionary.

  • print (function, optional) – The print function to use, by default print.

  • max_lines (int, optional) – The maximum number of lines to print, by default 4.

  • variables (list, optional) – The list of variables to print, by default None.

raw

tee

class anemoi.inference.outputs.tee.TeeOutput(context: Context, metadata: Metadata, *args, outputs: Sequence | None = None, **kwargs: Any)

Bases: Output

TeeOutput class to manage multiple outputs.

write_initial_state(state: dict[str, Any]) None

Write the initial state to all outputs.

Parameters:

state (State) – The state dictionary.

write_state(state: dict[str, Any]) None

Write the state to all outputs.

Parameters:

state (State) – The state dictionary.

write_step(state: dict[str, Any]) None

Raise NotImplementedError as TeeOutput does not support write_step.

Parameters:

state (State) – The state dictionary.

open(state: dict[str, Any]) None

Open all outputs.

Parameters:

state (State) – The state dictionary.

close() None

Close all outputs.

print_summary(depth: int = 0) None

Print the summary of all outputs.

Parameters:

depth (int, optional) – The depth of the summary.

truth

zarr

anemoi.inference.outputs.zarr.create_zarr_array(store: StoreLike, name: str, shape: tuple, dtype: str, dimensions: tuple[str, ...], chunks: tuple[int, ...] | str | bool, fill_value: float | None = None) Any

Create a Zarr array with the given parameters.

Parses the Zarr version to handle differences in API between versions 2 and 3.