stream_with_chunking()— the streaming validation entry point- The typed event vocabulary (
ChunkEvent,QuickCheckEvent, …) fromresult.events() - Early-exit cancellation and reading
streaming_failures - Choosing between
"word","sentence", and"paragraph"chunking - Subclassing
ChunkingStrategyto define a custom split boundary result.astream()for consumers that only need the validated chunks
pip install mellea, Ollama running locally with granite4.1:3b downloaded.
Step 1: Your first streaming validation call
stream_with_chunking() returns a StreamChunkingResult immediately. The
orchestrator runs in the background, splitting accumulated text into chunks and
calling stream_validate() on each one. Consume events with result.events(),
then call result.acomplete() to wait for the orchestrator to finish and raise
any exception it stored.
Sample output
Note: LLM output is non-deterministic. Your result will vary in wording.Three things to notice:
stream_with_chunking()is called withawaitbut returns immediately — the orchestrator runs as a background task.result.events()is an async iterator that yields one event per semantic unit. The loop ends when theCompletedEventis delivered.result.acomplete()must be called after the event loop drains to propagate any orchestrator exception and to ensure the background task has fully settled.
Step 2: Early exit on failure
Whenstream_validate() returns "fail", the orchestrator cancels the backend
immediately and stops the stream. No further chunks are delivered, and the
failure is recorded in result.streaming_failures.
Lower the sentence limit so the model is likely to exceed it:
Sample output
Note: Whether the stream is cancelled depends on whether the model exceeds the limit. If the model happens to comply,streaming_failureswill be empty andresult.completedwill beTrue.
result.full_text always contains the text accumulated up to the point where
generation stopped — useful for debugging what the model produced before the
requirement failed.
Step 3: Choosing a chunking strategy
The built-in strategies cover a coarse-to-fine spectrum:| Alias | Splits on | Good for |
|---|---|---|
"word" | Whitespace | Token-local checks: forbidden words, numeric limits |
"sentence" | ., !, ? followed by whitespace | Grammar, coherence, per-sentence content rules |
"paragraph" | Two or more consecutive newlines | Topic coherence, citation presence, heading structure |
Sample output
Note: LLM output is non-deterministic. Your result will vary in wording.The same two-sentence response that produced 2
ChunkEvent items with sentence
chunking now produces 38. The validator fires on every word — maximum reaction
speed at the cost of per-chunk context.
If a forbidden word appears, the stream stops at that word and no further
ChunkEvent items are emitted. To see early exit in action, change _FORBIDDEN
to include a common English word like "and" or "the".
Step 4: Raw chunk access with astream()
If you only need the validated chunks and do not want event metadata, use
result.astream() instead of result.events(). It yields the text of each
validated chunk as a plain string — useful for streaming output directly to a
UI buffer or building the response incrementally without a match dispatch:
Sample output
Note: LLM output is non-deterministic. Your result will vary in wording.
astream() and events() are independent — both are available on the same
result object and can even be consumed concurrently with asyncio.gather. Each
is single-consumer: calling either iterator a second time raises
RuntimeError. If you need chunks after the fact, capture them to a list
during iteration or read result.full_text after acomplete().
Step 5: A custom chunking strategy
The built-in strategies cover the most common boundaries. For structured output — numbered lists, code blocks, CSV rows — you can subclassChunkingStrategy
and define your own split boundary.
Two methods to implement:
split(accumulated_text)— called on every new token delta. Return all complete chunks found so far; withhold any trailing fragment. Must be stateless: it receives the full accumulated text each time, not a delta.flush(accumulated_text)— called once at natural end of stream. Release the withheld trailing fragment, or return[]to discard it.
LineChunker that splits on single newlines — natural for numbered
list output where each line is one item:
Sample output
Note: LLM output is non-deterministic. Your result will vary in wording.
validate() on NumberedLineReq always returns True because all format
checking happens during streaming. If any line fails, the stream is cancelled
before reaching validate(). Lines that do reach it have already passed
stream_validate(). This pattern — enforce in stream_validate, pass in
validate — is common for requirements whose invariant is a property of
individual chunks rather than the full output.
Pass a ChunkingStrategy instance (not a string alias) to use a custom
chunker. The built-in chunkers (WordChunker, SentenceChunker,
ParagraphChunker) are also available as instances if you need to pass one
explicitly or subclass to override flush().
See also:docs/examples/streaming/custom_chunking.pyfor an annotated version of this pattern with a more detailedsplit()/flush()contract walkthrough.
What you built
| Concept | What it gives you |
|---|---|
stream_with_chunking() + requirements= | Per-chunk validation with automatic early exit |
result.events() | Typed event stream — observe every chunk, validation result, and lifecycle signal |
QuickCheckEvent(passed=False) | Detect the moment a requirement fails, mid-stream |
result.streaming_failures | List of (requirement, PartialValidationResult) pairs for failed checks |
"word" / "sentence" / "paragraph" | Built-in chunking strategies trading reaction speed for context |
ChunkingStrategy subclass | Custom split boundaries for structured output (lists, code, CSV) |
result.astream() | Raw validated chunks without event metadata |
See also: How-to: Streaming with per-chunk validation | Concepts: The Requirements System — Streaming validation | Examples: streaming/