Streaming generation with per-chunk validation. Provides :func:Documentation Index
Fetch the complete documentation index at: https://docs.mellea.ai/llms.txt
Use this file to discover all available pages before exploring further.
stream_with_chunking, the core orchestration primitive that
consumes a streaming :class:~mellea.core.base.ModelOutputThunk, applies a
:class:~mellea.stdlib.chunking.ChunkingStrategy to produce semantic chunks,
and runs :meth:~mellea.core.requirement.Requirement.stream_validate on each
chunk in parallel. Higher-level streaming APIs build on this function.
The orchestrator emits typed :class:StreamEvent objects that consumers can
observe via :meth:StreamChunkingResult.events. Raw validated chunks remain
available via :meth:StreamChunkingResult.astream.
Functions
FUNC stream_with_chunking
~mellea.core.base.ModelOutputThunk’s async stream in a single
background task, splits the accumulated text using chunking, and runs
:meth:~mellea.core.requirement.Requirement.stream_validate on each new
chunk in parallel across all requirements.
For each new complete chunk produced by the chunking strategy,
stream_validate is called once per active requirement (in parallel
via :func:asyncio.gather), receiving that single chunk. Multiple
chunks produced from one astream() iteration are validated
sequentially in order, so early exit on a "fail" result prevents
later chunks in the same batch from being validated or emitted to the
consumer.
If any requirement returns "fail", the generation is cancelled
immediately (via
:meth:~mellea.core.base.ModelOutputThunk.cancel_generation) and
:attr:StreamChunkingResult.completed is set to False. The
failing chunk is not emitted to the consumer; use
:attr:StreamChunkingResult.streaming_failures to inspect what failed.
When the stream ends naturally, any trailing fragment withheld by the
chunking strategy (see :meth:~mellea.stdlib.chunking.ChunkingStrategy.flush)
is released as a final chunk and run through stream_validate on the
same terms as the regular chunks. On early exit, the trailing fragment
is discarded because the generation was cancelled mid-token.
After the stream ends naturally, validate() is called on every
requirement that did not return "fail" — both "pass" and
"unknown" trigger final validation. On early exit, no validate()
call is made; :attr:StreamChunkingResult.final_validations remains
empty. Requirements are cloned (copy(req)) before backend generation
begins, so the originals are never mutated and a raising __copy__
cannot leak an in-flight backend task.
The orchestrator emits typed :class:StreamEvent objects throughout
execution. Consume them via :meth:StreamChunkingResult.events in
parallel with or instead of :meth:StreamChunkingResult.astream.
Requirements that need context beyond the current chunk should
accumulate it themselves across stream_validate calls (e.g.
self._seen = self._seen + chunk). They must not read mot.astream()
directly — this orchestrator is the single consumer of the MOT stream.
Args:
action: The component or content block to generate from.backend: The backend used for generation and final validation.ctx: The generation context.requirements: Sequence of requirements to validate against each chunk during streaming.Nonedisables streaming validation (chunks are still produced;validate()is not called at stream end).chunking: Chunking strategy — either a :class:~mellea.stdlib.chunking.ChunkingStrategyinstance or one of the string aliases"sentence"(default),"word", or"paragraph".validation_backend: Optional alternate backend for bothstream_validateand finalvalidatecalls. WhenNone, backend is used for validation.
- A result object providing :meth:
~StreamChunkingResult.astreamfor incremental chunk consumption, :meth:~StreamChunkingResult.eventsfor typed streaming events, and :meth:~StreamChunkingResult.acompletefor blocking until done.
ValueError: If chunking is a string that does not match any known alias ("sentence","word","paragraph").RuntimeError: If the backend returns an already-computed :class:~mellea.core.base.ModelOutputThunkinstead of a streaming one. This indicates the backend is not honouringModelOption.STREAM.
Classes
CLASS StreamEvent
Base class for all streaming events emitted by :func:stream_with_chunking.
The timestamp field is auto-populated at instantiation time; callers
do not set it. Because timestamp has init=False it is never part
of __init__, so subclasses may declare additional fields in any order
without conflict. Any new init=False fields on subclasses must also
use field(..., init=False).
Attributes:
timestamp: Unix timestamp (seconds) at the moment the event was created.
CLASS ChunkEvent
Emitted after each validated chunk is delivered to the consumer.
Fired after all active requirements’ stream_validate calls return
non-"fail" for this chunk and the chunk has been placed on the
consumer queue.
Args:
text: The chunk text that was validated and emitted.chunk_index: Zero-based position of this chunk in the stream.attempt: Sampling attempt number (always1in v1).
CLASS QuickCheckEvent
Emitted after each per-chunk streaming validation batch.
One event per chunk, covering all active requirements in parallel.
Not emitted when there are no requirements.
Args:
chunk_index: Zero-based position of the chunk that was validated.attempt: Sampling attempt number (always1in v1).passed:Trueif all active requirements returned non-"fail"for this chunk.results: :class:~mellea.core.requirement.PartialValidationResultfrom each active requirement, in the same order as the active slice ofrequirements.
CLASS StreamingDoneEvent
Emitted after all chunks have been validated and delivered to the consumer.
Fired after the regular token stream and any trailing fragment released by
:meth:~mellea.stdlib.chunking.ChunkingStrategy.flush have both been
processed. Only emitted on natural completion — not on early exit (a
requirement returned "fail") or on exception.
Args:
attempt: Sampling attempt number (always1in v1).full_text: Complete accumulated text at stream end.
CLASS FullValidationEvent
Emitted after the final :meth:~mellea.core.requirement.Requirement.validate calls complete.
Only emitted when at least one requirement did not fail during streaming
and the stream completed naturally. Not emitted on early exit.
Args:
attempt: Sampling attempt number (always1in v1).passed:Trueif all final :class:~mellea.core.requirement.ValidationResultobjects passed.results: :class:~mellea.core.requirement.ValidationResultfrom each non-failed requirement, in requirement order.
CLASS RetryEvent
Reserved for future use.
Defined for API completeness — RetryEvent is not emitted by the
v1 orchestrator because v1 retry is caller-driven re-invocation of
:func:stream_with_chunking. When orchestrator-side retry is added,
this event will fire before each re-attempt.
Args:
attempt: Attempt number being started (1-based).reason: Human-readable reason for the retry.
CLASS CompletedEvent
Emitted when the orchestrator exits, including early-exit cases.
Always the last event before :meth:StreamChunkingResult.events
terminates. success reflects :attr:StreamChunkingResult.completed.
Args:
success:Trueif the stream completed normally (no"fail"result and no unhandled exception);Falseotherwise.full_text: Complete accumulated text. On early exit or exception, reflects whatever was accumulated before cancellation.attempts_used: Number of orchestrator invocations (always1in v1).
CLASS ErrorEvent
Emitted when an unhandled exception occurs in the orchestrator.
Args:
exception_type: Python class name of the exception (e.g."ValueError").detail: String representation of the exception. Ifcancel_generation()also raised during cleanup, the cleanup error is appended.
CLASS StreamChunkingResult
Result of a :func:stream_with_chunking operation.
Provides async iteration over validated text chunks as they complete
(:meth:astream), typed :class:StreamEvent objects via :meth:events,
a blocking :meth:acomplete for awaiting the full result including final
validation, and :attr:as_thunk for wrapping the output as a
:class:~mellea.core.base.ModelOutputThunk.
Instances are created by :func:stream_with_chunking; do not instantiate
directly.
Args:
mot: The :class:~mellea.core.base.ModelOutputThunkfrom the backend generation call.ctx: The generation context returned alongside the MOT.
completed:Falseif the stream exited early because a requirement returned"fail"during streaming;Trueotherwise.full_text: The generated text available after streaming completes. On natural completion, the full accumulated text. On early exit (a requirement returned"fail"), only the validated and emitted portion — i.e. what consumers received via :meth:astream. Available after :meth:acompletereturns.final_validations: :class:~mellea.core.requirement.ValidationResultobjects from the final :meth:~mellea.core.requirement.Requirement.validatecalls on all non-failed requirements. Available after :meth:acompletereturns.streaming_failures:(Requirement, PartialValidationResult)pairs for every requirement that returned"fail"during streaming.
FUNC astream
"fail" result.
Single-consumer. Chunks are delivered via an
:class:asyncio.Queue that this method drains; calling
astream() a second time on the same result blocks indefinitely
because the queue is empty and the terminating None sentinel
has already been consumed. If you need the chunks after
iteration, capture them into a list during the first pass or use
:attr:full_text after :meth:acomplete.
Raises:
Exception: Propagates any error from the background orchestration task.
FUNC events
StreamEvent subclass describing a
point in the orchestration lifecycle. Consumers can dispatch on type:
.. code-block:: python
async for event in result.events():
match event:
case ChunkEvent():
print(f”chunk {event.chunk_index}: {event.text!r}”)
case QuickCheckEvent(passed=False):
print(f”chunk {event.chunk_index} failed validation”)
case CompletedEvent():
print(f”done — success={event.success}”)
Typical event order (natural completion with requirements):
- :class:
QuickCheckEvent/ :class:ChunkEventpairs, one per chunk (validation fires first; the chunk is released to the consumer only after passing). Includes any trailing fragment released by the chunking strategy’sflush()method. - :class:
StreamingDoneEvent— all chunks (including flush) delivered. - :class:
FullValidationEvent— finalvalidate()calls returned. - :class:
CompletedEvent— orchestrator is exiting.
QuickCheckEvent (passed=False) is the
last validation event, followed by :class:CompletedEvent. No
:class:StreamingDoneEvent or :class:FullValidationEvent is emitted.
On exception: :class:ErrorEvent followed by :class:CompletedEvent.
Single-consumer. Events are delivered via a queue that this method
drains; calling events() a second time raises :exc:RuntimeError.
Raises:
RuntimeError: If called more than once on the same result.
FUNC acomplete
full_text, :attr:completed,
:attr:final_validations, and :attr:streaming_failures are all
populated. If :meth:astream has already been consumed to
exhaustion, this call is effectively a no-op.
Raises:
Exception: Propagates the orchestrator exception if :meth:astreamhas not yet consumed it (raise-once — only one ofastreamoracompleteraises, whichever drains the failure marker first).asyncio.CancelledError: If the orchestration task was externally cancelled (e.g. via :func:asyncio.wait_fortimeout).
FUNC as_thunk
~mellea.core.base.ModelOutputThunk.
Returns a new thunk with value set to :attr:full_text and
generation metadata copied from the original MOT. Safe to call on
early-exit results; value reflects the validated and emitted
portion (same as :attr:full_text — see its docstring).
Returns:
- ModelOutputThunk[str]: A computed thunk containing the streamed output.
RuntimeError: If called before :meth:acompletehas returned.