> ## Documentation Index
> Fetch the complete documentation index at: https://docs.mellea.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# mellea.stdlib.streaming

> Streaming generation with per-chunk validation.

export const SidebarFix = () => <script dangerouslySetInnerHTML={{
  __html: `
        (function () {
          const INTERVAL_MS = 500;

          const upgradeSidebar = () => {
            const links = document.querySelectorAll('a[href^="#"]');

            links.forEach((link) => {
              if (link.dataset.badged === "true") return;

              const rawText = (link.textContent || "").trim();

              // ========== FUNC ==========
              if (rawText.startsWith("FUNC ")) {
                const label = rawText.replace(/^FUNC\\s+/, "").trim();

                while (link.firstChild) link.removeChild(link.firstChild);

                // 👉 Make the whole link a single flex row & prevent wrapping
                link.style.display = "flex";
                link.style.alignItems = "center";
                link.style.whiteSpace = "nowrap";
                link.style.columnGap = "0.5rem";

                const badge = document.createElement("span");
                badge.style.marginRight = "0.5rem";
                badge.style.display = "inline-flex";
                badge.style.alignItems = "center";
                badge.style.borderRadius = "9999px";
                badge.style.padding = "0rem 0.6rem";
                badge.style.fontSize = "0.5rem";
                badge.style.fontWeight = "700";
                badge.style.letterSpacing = "0.05em";
                badge.style.backgroundColor = "rgba(48, 100, 227, 0.20)";
                badge.style.color = "#1D4ED8";

                badge.textContent = "FUNC";

                link.appendChild(badge);
                link.appendChild(document.createTextNode(label));
                link.dataset.badged = "true";
                return;
              }

              // ========== CLASS ==========
              if (rawText.startsWith("CLASS ")) {
                const label = rawText.replace(/^CLASS\\s+/, "").trim();

                while (link.firstChild) link.removeChild(link.firstChild);

                // 👉 Same flex / nowrap treatment for class links
                link.style.display = "flex";
                link.style.alignItems = "center";
                link.style.whiteSpace = "nowrap";
                link.style.columnGap = "0.5rem";

                const badge = document.createElement("span");
                badge.style.marginRight = "0.5rem";
                badge.style.display = "inline-flex";
                badge.style.alignItems = "center";
                badge.style.borderRadius = "9999px";
                badge.style.padding = "0rem 0.6rem";
                badge.style.fontSize = "0.5rem";
                badge.style.fontWeight = "700";
                badge.style.letterSpacing = "0.05em";
                badge.style.backgroundColor = "rgba(74, 222, 128, 0.20)";
                badge.style.color = "#15803D";

                badge.textContent = "CLASS";

                link.appendChild(badge);
                link.appendChild(document.createTextNode(label));
                link.dataset.badged = "true";
                return;
              }
            });
          };

          upgradeSidebar();
          setInterval(upgradeSidebar, INTERVAL_MS);
        })();
      `
}} />;

<SidebarFix />

Streaming generation with per-chunk validation.

Provides :func:`stream_with_chunking`, the core orchestration primitive that
consumes a streaming :class:`~mellea.core.base.ModelOutputThunk`, applies a
:class:`~mellea.stdlib.chunking.ChunkingStrategy` to produce semantic chunks,
and runs :meth:`~mellea.core.requirement.Requirement.stream_validate` on each
chunk in parallel.  Higher-level streaming APIs build on this function.

The orchestrator emits typed :class:`StreamEvent` objects that consumers can
observe via :meth:`StreamChunkingResult.events`.  Raw validated chunks remain
available via :meth:`StreamChunkingResult.astream`.

## Functions

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#3064E3]/20 text-[#1D4ED8]">FUNC</span> `stream_with_chunking` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L705" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
stream_with_chunking(action: Component[Any] | CBlock, backend: Backend, ctx: Context) -> StreamChunkingResult
```

Generate a streaming response with per-chunk validation.

Starts a backend generation with streaming enabled, consumes the
:class:`~mellea.core.base.ModelOutputThunk`'s async stream in a single
background task, splits the accumulated text using *chunking*, and runs
:meth:`~mellea.core.requirement.Requirement.stream_validate` on each new
chunk in parallel across all requirements.

For each new complete chunk produced by the chunking strategy,
`stream_validate` is called once per active requirement (in parallel
via :func:`asyncio.gather`), receiving that single chunk.  Multiple
chunks produced from one `astream()` iteration are validated
sequentially in order, so early exit on a `"fail"` result prevents
later chunks in the same batch from being validated or emitted to the
consumer.

If any requirement returns `"fail"`, the generation is cancelled
immediately (via
:meth:`~mellea.core.base.ModelOutputThunk.cancel_generation`) and
:attr:`StreamChunkingResult.completed` is set to `False`.  The
failing chunk is not emitted to the consumer; use
:attr:`StreamChunkingResult.streaming_failures` to inspect what failed.

When the stream ends naturally, any trailing fragment withheld by the
chunking strategy (see :meth:`~mellea.stdlib.chunking.ChunkingStrategy.flush`)
is released as a final chunk and run through `stream_validate` on the
same terms as the regular chunks.  On early exit, the trailing fragment
is discarded because the generation was cancelled mid-token.

After the stream ends naturally, `validate()` is called on every
requirement that did not return `"fail"` — both `"pass"` and
`"unknown"` trigger final validation.  On early exit, no `validate()`
call is made; :attr:`StreamChunkingResult.final_validations` remains
empty.  Requirements are cloned (`copy(req)`) before backend generation
begins, so the originals are never mutated and a raising `__copy__`
cannot leak an in-flight backend task.

The orchestrator emits typed :class:`StreamEvent` objects throughout
execution.  Consume them via :meth:`StreamChunkingResult.events` in
parallel with or instead of :meth:`StreamChunkingResult.astream`.

Requirements that need context beyond the current chunk should
accumulate it themselves across `stream_validate` calls (e.g.
`self._seen = self._seen + chunk`).  They must not read `mot.astream()`
directly — this orchestrator is the single consumer of the MOT stream.

**Args:**

* `action`: The component or content block to generate from.
* `backend`: The backend used for generation and final validation.
* `ctx`: The generation context.
* `requirements`: Sequence of requirements to validate against each chunk
  during streaming.  `None` disables streaming validation (chunks
  are still produced; `validate()` is not called at stream end).
* `chunking`: Chunking strategy — either a :class:`~mellea.stdlib.chunking.ChunkingStrategy`
  instance or one of the string aliases `"sentence"` (default),
  `"word"`, or `"paragraph"`.
* `validation_backend`: Optional alternate backend for both
  `stream_validate` and final `validate` calls.  When `None`,
  *backend* is used for validation.

**Returns:**

* A result object providing :meth:`~StreamChunkingResult.astream`
  for incremental chunk consumption, :meth:`~StreamChunkingResult.events` for
  typed streaming events, and
  :meth:`~StreamChunkingResult.acomplete` for blocking until done.

**Raises:**

* `ValueError`: If *chunking* is a string that does not match any known
  alias (`"sentence"`, `"word"`, `"paragraph"`).
* `RuntimeError`: If the backend returns an already-computed
  :class:`~mellea.core.base.ModelOutputThunk` instead of a streaming
  one.  This indicates the backend is not honouring
  `ModelOption.STREAM`.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

## Classes

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `StreamEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L48" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Base class for all streaming events emitted by :func:`stream_with_chunking`.

The `timestamp` field is auto-populated at instantiation time; callers
do not set it.  Because `timestamp` has `init=False` it is never part
of `__init__`, so subclasses may declare additional fields in any order
without conflict.  Any new `init=False` fields on subclasses must also
use `field(..., init=False)`.

**Attributes:**

* `timestamp`: Unix timestamp (seconds) at the moment the event was created.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `ChunkEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L65" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Emitted after each validated chunk is delivered to the consumer.

Fired after all active requirements' `stream_validate` calls return
non-`"fail"` for this chunk and the chunk has been placed on the
consumer queue.

**Args:**

* `text`: The chunk text that was validated and emitted.
* `chunk_index`: Zero-based position of this chunk in the stream.
* `attempt`: Sampling attempt number (always `1` in v1).

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `QuickCheckEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L84" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Emitted after each per-chunk streaming validation batch.

One event per chunk, covering all active requirements in parallel.
Not emitted when there are no `requirements`.

**Args:**

* `chunk_index`: Zero-based position of the chunk that was validated.
* `attempt`: Sampling attempt number (always `1` in v1).
* `passed`: `True` if all active requirements returned non-`"fail"`
  for this chunk.
* `results`: :class:`~mellea.core.requirement.PartialValidationResult`
  from each active requirement, in the same order as the active
  slice of `requirements`.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `StreamingDoneEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L107" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Emitted after all chunks have been validated and delivered to the consumer.

Fired after the regular token stream and any trailing fragment released by
:meth:`~mellea.stdlib.chunking.ChunkingStrategy.flush` have both been
processed.  Only emitted on natural completion — not on early exit (a
requirement returned `"fail"`) or on exception.

**Args:**

* `attempt`: Sampling attempt number (always `1` in v1).
* `full_text`: Complete accumulated text at stream end.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `FullValidationEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L125" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Emitted after the final :meth:`~mellea.core.requirement.Requirement.validate` calls complete.

Only emitted when at least one requirement did not fail during streaming
and the stream completed naturally.  Not emitted on early exit.

**Args:**

* `attempt`: Sampling attempt number (always `1` in v1).
* `passed`: `True` if all final
  :class:`~mellea.core.requirement.ValidationResult` objects passed.
* `results`: :class:`~mellea.core.requirement.ValidationResult` from each
  non-failed requirement, in requirement order.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `RetryEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L145" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Reserved for future use.

Defined for API completeness — `RetryEvent` is not emitted by the
v1 orchestrator because v1 retry is caller-driven re-invocation of
:func:`stream_with_chunking`.  When orchestrator-side retry is added,
this event will fire before each re-attempt.

**Args:**

* `attempt`: Attempt number being started (1-based).
* `reason`: Human-readable reason for the retry.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `CompletedEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L163" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Emitted when the orchestrator exits, including early-exit cases.

Always the last event before :meth:`StreamChunkingResult.events`
terminates.  `success` reflects :attr:`StreamChunkingResult.completed`.

**Args:**

* `success`: `True` if the stream completed normally (no `"fail"`
  result and no unhandled exception); `False` otherwise.
* `full_text`: Complete accumulated text.  On early exit or exception,
  reflects whatever was accumulated before cancellation.
* `attempts_used`: Number of orchestrator invocations (always `1` in v1).

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `ErrorEvent` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L183" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Emitted when an unhandled exception occurs in the orchestrator.

**Args:**

* `exception_type`: Python class name of the exception
  (e.g. `"ValueError"`).
* `detail`: String representation of the exception.  If
  `cancel_generation()` also raised during cleanup, the cleanup
  error is appended.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#4ADE8033]/20 text-[#15803D]">CLASS</span> `StreamChunkingResult` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L203" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

Result of a :func:`stream_with_chunking` operation.

Provides async iteration over validated text chunks as they complete
(:meth:`astream`), typed :class:`StreamEvent` objects via :meth:`events`,
a blocking :meth:`acomplete` for awaiting the full result including final
validation, and :attr:`as_thunk` for wrapping the output as a
:class:`~mellea.core.base.ModelOutputThunk`.

Instances are created by :func:`stream_with_chunking`; do not instantiate
directly.

**Args:**

* `mot`: The :class:`~mellea.core.base.ModelOutputThunk` from the backend
  generation call.
* `ctx`: The generation context returned alongside the MOT.

**Attributes:**

* `completed`: `False` if the stream exited early because a requirement
  returned `"fail"` during streaming; `True` otherwise.
* `full_text`: The generated text available after streaming completes.
  On natural completion, the full accumulated text.  On early exit
  (a requirement returned `"fail"`), only the validated and emitted
  portion — i.e. what consumers received via :meth:`astream`.
  Available after :meth:`acomplete` returns.
* `final_validations`: :class:`~mellea.core.requirement.ValidationResult`
  objects from the final :meth:`~mellea.core.requirement.Requirement.validate`
  calls on all non-failed requirements.  Available after
  :meth:`acomplete` returns.
* `streaming_failures`: `(Requirement, PartialValidationResult)` pairs
  for every requirement that returned `"fail"` during streaming.

<div className="h-8" />

**Methods:**

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

#### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#3064E3]/20 text-[#1D4ED8]">FUNC</span> `astream` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L265" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
astream(self) -> AsyncIterator[str]
```

Yield validated text chunks as they complete.

Each yielded string is a chunk that has passed per-chunk streaming
validation (or the stream had no requirements).  Iteration ends when
all chunks have been yielded, whether the stream completed normally or
was cancelled early on a `"fail"` result.

**Single-consumer.** Chunks are delivered via an
:class:`asyncio.Queue` that this method drains; calling
`astream()` a second time on the same result blocks indefinitely
because the queue is empty and the terminating `None` sentinel
has already been consumed.  If you need the chunks after
iteration, capture them into a list during the first pass or use
:attr:`full_text` after :meth:`acomplete`.

**Raises:**

* `Exception`: Propagates any error from the background orchestration
  task.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

#### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#3064E3]/20 text-[#1D4ED8]">FUNC</span> `events` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L301" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
events(self) -> AsyncIterator[StreamEvent]
```

Yield typed streaming events as they are emitted by the orchestrator.

Each yielded object is a :class:`StreamEvent` subclass describing a
point in the orchestration lifecycle.  Consumers can dispatch on type:

.. code-block:: python

async for event in result.events():
match event:
case ChunkEvent():
print(f"chunk \{event.chunk\_index}: \{event.text!r}")
case QuickCheckEvent(passed=False):
print(f"chunk \{event.chunk\_index} failed validation")
case CompletedEvent():
print(f"done — success=\{event.success}")

Typical event order (natural completion with requirements):

1. :class:`QuickCheckEvent` / :class:`ChunkEvent` pairs, one per chunk
   (validation fires first; the chunk is released to the consumer only
   after passing).  Includes any trailing fragment released by the
   chunking strategy's `flush()` method.
2. :class:`StreamingDoneEvent` — all chunks (including flush) delivered.
3. :class:`FullValidationEvent` — final `validate()` calls returned.
4. :class:`CompletedEvent` — orchestrator is exiting.

On early exit: :class:`QuickCheckEvent` (`passed=False`) is the
last validation event, followed by :class:`CompletedEvent`.  No
:class:`StreamingDoneEvent` or :class:`FullValidationEvent` is emitted.

On exception: :class:`ErrorEvent` followed by :class:`CompletedEvent`.

**Single-consumer.**  Events are delivered via a queue that this method
drains; calling `events()` a second time raises :exc:`RuntimeError`.

**Raises:**

* `RuntimeError`: If called more than once on the same result.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

#### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#3064E3]/20 text-[#1D4ED8]">FUNC</span> `acomplete` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L361" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
acomplete(self) -> None
```

Await full completion, including final validation.

After this method returns, :attr:`full_text`, :attr:`completed`,
:attr:`final_validations`, and :attr:`streaming_failures` are all
populated.  If :meth:`astream` has already been consumed to
exhaustion, this call is effectively a no-op.

**Raises:**

* `Exception`: Propagates the orchestrator exception if :meth:`astream`
  has not yet consumed it (raise-once — only one of `astream`
  or `acomplete` raises, whichever drains the failure marker
  first).
* `asyncio.CancelledError`: If the orchestration task was externally
  cancelled (e.g. via :func:`asyncio.wait_for` timeout).

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />

#### <span className="ml-2 inline-flex items-center rounded-full px-2 py-1 text-[0.7rem] font-bold tracking-wide bg-[#3064E3]/20 text-[#1D4ED8]">FUNC</span> `as_thunk` <sup><a href="https://github.com/generative-computing/mellea/blob/v0.6.0/mellea/stdlib/streaming.py#L401" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python theme={null}
as_thunk(self) -> ModelOutputThunk[str]
```

Wrap the output as a computed :class:`~mellea.core.base.ModelOutputThunk`.

Returns a new thunk with `value` set to :attr:`full_text` and
generation metadata copied from the original MOT.  Safe to call on
early-exit results; `value` reflects the validated and emitted
portion (same as :attr:`full_text` — see its docstring).

**Returns:**

* ModelOutputThunk\[str]: A computed thunk containing the streamed output.

**Raises:**

* `RuntimeError`: If called before :meth:`acomplete` has returned.

<div className="w-full h-px bg-gray-200 dark:bg-gray-700 my-4" />
