mellea.helpers.async_helpers
Async helper functions for managing concurrent model output thunks.
Provides send_to_queue, which feeds a backend response coroutine or async iterator
into an asyncio.Queue (including sentinel and error forwarding); wait_for_all_mots,
which gathers multiple ModelOutputThunk computations in a single asyncio.gather
call; and get_current_event_loop, a safe wrapper that returns None instead of
raising when no event loop is running. These utilities are used internally by backends
that operate in async contexts.
Functions
FUNC send_to_queue
send_to_queue(co: Coroutine[Any, Any, AsyncIterator | Any] | AsyncIterator, aqueue: asyncio.Queue) -> None
Processes the output of an async chat request by sending the output to an async queue.
Args:
co: A coroutine or async iterator producing the backend response.aqueue: The async queue to send results to. A sentinelNoneis appended on completion; an exception instance is appended on error.
FUNC wait_for_all_mots
wait_for_all_mots(mots: list[ModelOutputThunk]) -> None
Helper function to make waiting for multiple ModelOutputThunks to be computed easier.
All ModelOutputThunks must be from the same event loop. This should always be the case in sampling functions, session functions, and top-level mellea functions.
Args:
mots: List ofModelOutputThunkobjects to await concurrently.
FUNC get_current_event_loop
get_current_event_loop() -> None | asyncio.AbstractEventLoop
Get the current event loop without having to catch exceptions.
Returns:
- The running event loop, or
Noneif no loop is running.
Classes
CLASS ClientCache
A simple LRU cache.
Used to keep track of clients for backends where the client is tied to a specific event loop.
Args:
capacity: Maximum number of entries to hold before evicting the least recently used.
Attributes:
cache: Ordered dictionary storing cached key-value pairs in LRU order; always initialised empty at construction.
Methods:
FUNC current_size
current_size(self) -> int
Just return the size of the key set. This isn't necessarily safe.
Returns:
- Number of entries currently in the cache.
FUNC get
get(self, key: int) -> Any | None
Gets a value from the cache.
Args:
key: Integer cache key.
Returns:
- The cached value, or
Noneif the key is not present.
FUNC put
put(self, key: int, value: Any) -> None
Put a value into the cache.
Args:
key: Integer cache key.value: Value to store.