Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.mellea.ai/llms.txt

Use this file to discover all available pages before exploring further.

Streaming generation with per-chunk validation. Provides :func:stream_with_chunking, the core orchestration primitive that consumes a streaming :class:~mellea.core.base.ModelOutputThunk, applies a :class:~mellea.stdlib.chunking.ChunkingStrategy to produce semantic chunks, and runs :meth:~mellea.core.requirement.Requirement.stream_validate on each chunk in parallel. Higher-level streaming APIs build on this function. The orchestrator emits typed :class:StreamEvent objects that consumers can observe via :meth:StreamChunkingResult.events. Raw validated chunks remain available via :meth:StreamChunkingResult.astream.

Functions

FUNC stream_with_chunking

stream_with_chunking(action: Component[Any] | CBlock, backend: Backend, ctx: Context) -> StreamChunkingResult
Generate a streaming response with per-chunk validation. Starts a backend generation with streaming enabled, consumes the :class:~mellea.core.base.ModelOutputThunk’s async stream in a single background task, splits the accumulated text using chunking, and runs :meth:~mellea.core.requirement.Requirement.stream_validate on each new chunk in parallel across all requirements. For each new complete chunk produced by the chunking strategy, stream_validate is called once per active requirement (in parallel via :func:asyncio.gather), receiving that single chunk. Multiple chunks produced from one astream() iteration are validated sequentially in order, so early exit on a "fail" result prevents later chunks in the same batch from being validated or emitted to the consumer. If any requirement returns "fail", the generation is cancelled immediately (via :meth:~mellea.core.base.ModelOutputThunk.cancel_generation) and :attr:StreamChunkingResult.completed is set to False. The failing chunk is not emitted to the consumer; use :attr:StreamChunkingResult.streaming_failures to inspect what failed. When the stream ends naturally, any trailing fragment withheld by the chunking strategy (see :meth:~mellea.stdlib.chunking.ChunkingStrategy.flush) is released as a final chunk and run through stream_validate on the same terms as the regular chunks. On early exit, the trailing fragment is discarded because the generation was cancelled mid-token. After the stream ends naturally, validate() is called on every requirement that did not return "fail" — both "pass" and "unknown" trigger final validation. On early exit, no validate() call is made; :attr:StreamChunkingResult.final_validations remains empty. Requirements are cloned (copy(req)) before backend generation begins, so the originals are never mutated and a raising __copy__ cannot leak an in-flight backend task. The orchestrator emits typed :class:StreamEvent objects throughout execution. Consume them via :meth:StreamChunkingResult.events in parallel with or instead of :meth:StreamChunkingResult.astream. Requirements that need context beyond the current chunk should accumulate it themselves across stream_validate calls (e.g. self._seen = self._seen + chunk). They must not read mot.astream() directly — this orchestrator is the single consumer of the MOT stream. Args:
  • action: The component or content block to generate from.
  • backend: The backend used for generation and final validation.
  • ctx: The generation context.
  • requirements: Sequence of requirements to validate against each chunk during streaming. None disables streaming validation (chunks are still produced; validate() is not called at stream end).
  • chunking: Chunking strategy — either a :class:~mellea.stdlib.chunking.ChunkingStrategy instance or one of the string aliases "sentence" (default), "word", or "paragraph".
  • validation_backend: Optional alternate backend for both stream_validate and final validate calls. When None, backend is used for validation.
Returns:
  • A result object providing :meth:~StreamChunkingResult.astream for incremental chunk consumption, :meth:~StreamChunkingResult.events for typed streaming events, and :meth:~StreamChunkingResult.acomplete for blocking until done.
Raises:
  • ValueError: If chunking is a string that does not match any known alias ("sentence", "word", "paragraph").
  • RuntimeError: If the backend returns an already-computed :class:~mellea.core.base.ModelOutputThunk instead of a streaming one. This indicates the backend is not honouring ModelOption.STREAM.

Classes

CLASS StreamEvent

Base class for all streaming events emitted by :func:stream_with_chunking. The timestamp field is auto-populated at instantiation time; callers do not set it. Because timestamp has init=False it is never part of __init__, so subclasses may declare additional fields in any order without conflict. Any new init=False fields on subclasses must also use field(..., init=False). Attributes:
  • timestamp: Unix timestamp (seconds) at the moment the event was created.

CLASS ChunkEvent

Emitted after each validated chunk is delivered to the consumer. Fired after all active requirements’ stream_validate calls return non-"fail" for this chunk and the chunk has been placed on the consumer queue. Args:
  • text: The chunk text that was validated and emitted.
  • chunk_index: Zero-based position of this chunk in the stream.
  • attempt: Sampling attempt number (always 1 in v1).

CLASS QuickCheckEvent

Emitted after each per-chunk streaming validation batch. One event per chunk, covering all active requirements in parallel. Not emitted when there are no requirements. Args:
  • chunk_index: Zero-based position of the chunk that was validated.
  • attempt: Sampling attempt number (always 1 in v1).
  • passed: True if all active requirements returned non-"fail" for this chunk.
  • results: :class:~mellea.core.requirement.PartialValidationResult from each active requirement, in the same order as the active slice of requirements.

CLASS StreamingDoneEvent

Emitted after all chunks have been validated and delivered to the consumer. Fired after the regular token stream and any trailing fragment released by :meth:~mellea.stdlib.chunking.ChunkingStrategy.flush have both been processed. Only emitted on natural completion — not on early exit (a requirement returned "fail") or on exception. Args:
  • attempt: Sampling attempt number (always 1 in v1).
  • full_text: Complete accumulated text at stream end.

CLASS FullValidationEvent

Emitted after the final :meth:~mellea.core.requirement.Requirement.validate calls complete. Only emitted when at least one requirement did not fail during streaming and the stream completed naturally. Not emitted on early exit. Args:
  • attempt: Sampling attempt number (always 1 in v1).
  • passed: True if all final :class:~mellea.core.requirement.ValidationResult objects passed.
  • results: :class:~mellea.core.requirement.ValidationResult from each non-failed requirement, in requirement order.

CLASS RetryEvent

Reserved for future use. Defined for API completeness — RetryEvent is not emitted by the v1 orchestrator because v1 retry is caller-driven re-invocation of :func:stream_with_chunking. When orchestrator-side retry is added, this event will fire before each re-attempt. Args:
  • attempt: Attempt number being started (1-based).
  • reason: Human-readable reason for the retry.

CLASS CompletedEvent

Emitted when the orchestrator exits, including early-exit cases. Always the last event before :meth:StreamChunkingResult.events terminates. success reflects :attr:StreamChunkingResult.completed. Args:
  • success: True if the stream completed normally (no "fail" result and no unhandled exception); False otherwise.
  • full_text: Complete accumulated text. On early exit or exception, reflects whatever was accumulated before cancellation.
  • attempts_used: Number of orchestrator invocations (always 1 in v1).

CLASS ErrorEvent

Emitted when an unhandled exception occurs in the orchestrator. Args:
  • exception_type: Python class name of the exception (e.g. "ValueError").
  • detail: String representation of the exception. If cancel_generation() also raised during cleanup, the cleanup error is appended.

CLASS StreamChunkingResult

Result of a :func:stream_with_chunking operation. Provides async iteration over validated text chunks as they complete (:meth:astream), typed :class:StreamEvent objects via :meth:events, a blocking :meth:acomplete for awaiting the full result including final validation, and :attr:as_thunk for wrapping the output as a :class:~mellea.core.base.ModelOutputThunk. Instances are created by :func:stream_with_chunking; do not instantiate directly. Args:
  • mot: The :class:~mellea.core.base.ModelOutputThunk from the backend generation call.
  • ctx: The generation context returned alongside the MOT.
Attributes:
  • completed: False if the stream exited early because a requirement returned "fail" during streaming; True otherwise.
  • full_text: The generated text available after streaming completes. On natural completion, the full accumulated text. On early exit (a requirement returned "fail"), only the validated and emitted portion — i.e. what consumers received via :meth:astream. Available after :meth:acomplete returns.
  • final_validations: :class:~mellea.core.requirement.ValidationResult objects from the final :meth:~mellea.core.requirement.Requirement.validate calls on all non-failed requirements. Available after :meth:acomplete returns.
  • streaming_failures: (Requirement, PartialValidationResult) pairs for every requirement that returned "fail" during streaming.
Methods:

FUNC astream

astream(self) -> AsyncIterator[str]
Yield validated text chunks as they complete. Each yielded string is a chunk that has passed per-chunk streaming validation (or the stream had no requirements). Iteration ends when all chunks have been yielded, whether the stream completed normally or was cancelled early on a "fail" result. Single-consumer. Chunks are delivered via an :class:asyncio.Queue that this method drains; calling astream() a second time on the same result blocks indefinitely because the queue is empty and the terminating None sentinel has already been consumed. If you need the chunks after iteration, capture them into a list during the first pass or use :attr:full_text after :meth:acomplete. Raises:
  • Exception: Propagates any error from the background orchestration task.

FUNC events

events(self) -> AsyncIterator[StreamEvent]
Yield typed streaming events as they are emitted by the orchestrator. Each yielded object is a :class:StreamEvent subclass describing a point in the orchestration lifecycle. Consumers can dispatch on type: .. code-block:: python async for event in result.events(): match event: case ChunkEvent(): print(f”chunk {event.chunk_index}: {event.text!r}”) case QuickCheckEvent(passed=False): print(f”chunk {event.chunk_index} failed validation”) case CompletedEvent(): print(f”done — success={event.success}”) Typical event order (natural completion with requirements):
  1. :class:QuickCheckEvent / :class:ChunkEvent pairs, one per chunk (validation fires first; the chunk is released to the consumer only after passing). Includes any trailing fragment released by the chunking strategy’s flush() method.
  2. :class:StreamingDoneEvent — all chunks (including flush) delivered.
  3. :class:FullValidationEvent — final validate() calls returned.
  4. :class:CompletedEvent — orchestrator is exiting.
On early exit: :class:QuickCheckEvent (passed=False) is the last validation event, followed by :class:CompletedEvent. No :class:StreamingDoneEvent or :class:FullValidationEvent is emitted. On exception: :class:ErrorEvent followed by :class:CompletedEvent. Single-consumer. Events are delivered via a queue that this method drains; calling events() a second time raises :exc:RuntimeError. Raises:
  • RuntimeError: If called more than once on the same result.

FUNC acomplete

acomplete(self) -> None
Await full completion, including final validation. After this method returns, :attr:full_text, :attr:completed, :attr:final_validations, and :attr:streaming_failures are all populated. If :meth:astream has already been consumed to exhaustion, this call is effectively a no-op. Raises:
  • Exception: Propagates the orchestrator exception if :meth:astream has not yet consumed it (raise-once — only one of astream or acomplete raises, whichever drains the failure marker first).
  • asyncio.CancelledError: If the orchestration task was externally cancelled (e.g. via :func:asyncio.wait_for timeout).

FUNC as_thunk

as_thunk(self) -> ModelOutputThunk[str]
Wrap the output as a computed :class:~mellea.core.base.ModelOutputThunk. Returns a new thunk with value set to :attr:full_text and generation metadata copied from the original MOT. Safe to call on early-exit results; value reflects the validated and emitted portion (same as :attr:full_text — see its docstring). Returns:
  • ModelOutputThunk[str]: A computed thunk containing the streamed output.
Raises:
  • RuntimeError: If called before :meth:acomplete has returned.