Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.mellea.ai/llms.txt

Use this file to discover all available pages before exploring further.

Post-generation validation waits until the model has finished writing before checking the output. That is fine for short responses, but wastes time and compute when a violation appears in the first sentence of a ten-paragraph reply. Streaming validation moves the check into the generation loop: each chunk is validated as soon as it arrives, and generation is cancelled the moment a requirement fails. By the end you will have covered:
  • stream_with_chunking() — the streaming validation entry point
  • The typed event vocabulary (ChunkEvent, QuickCheckEvent, …) from result.events()
  • Early-exit cancellation and reading streaming_failures
  • Choosing between "word", "sentence", and "paragraph" chunking
  • Subclassing ChunkingStrategy to define a custom split boundary
  • result.astream() for consumers that only need the validated chunks
Prerequisites: Tutorial 02 (async and streaming), Tutorial 04 (requirements and validation), pip install mellea, Ollama running locally with granite4.1:3b downloaded.

Step 1: Your first streaming validation call

stream_with_chunking() returns a StreamChunkingResult immediately. The orchestrator runs in the background, splitting accumulated text into chunks and calling stream_validate() on each one. Consume events with result.events(), then call result.acomplete() to wait for the orchestrator to finish and raise any exception it stored.
# Requires: mellea
# Returns: None
import asyncio
import re

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import (
    ChunkEvent,
    CompletedEvent,
    FullValidationEvent,
    QuickCheckEvent,
    StreamingDoneEvent,
    stream_with_chunking,
)

_SENTENCE_END = re.compile(r"[.!?]+")


class MaxSentencesReq(Requirement):
    """Fails the stream if the model writes more sentences than *limit*."""

    def __init__(self, limit: int) -> None:
        super().__init__()
        self._limit = limit
        self._count = 0

    def format_for_llm(self) -> str:
        return f"The response must be at most {self._limit} sentences."

    async def stream_validate(
        self, chunk: str, *, backend: Backend, ctx: Context
    ) -> PartialValidationResult:
        self._count += len(_SENTENCE_END.findall(chunk))
        if self._count > self._limit:
            return PartialValidationResult(
                "fail", reason=f"Exceeded {self._limit}-sentence limit"
            )
        return PartialValidationResult("unknown")

    async def validate(
        self, backend: Backend, ctx: Context, *, format=None, model_options=None
    ) -> ValidationResult:
        return ValidationResult(result=self._count <= self._limit)


async def main() -> None:
    from mellea.stdlib.session import start_session

    m = start_session()

    result = await stream_with_chunking(
        Instruction("Write a two-sentence summary of how photosynthesis works."),
        m.backend,
        m.ctx,
        requirements=[MaxSentencesReq(limit=3)],
        chunking="sentence",
    )

    async for event in result.events():
        match event:
            case ChunkEvent():
                print(f"  chunk[{event.chunk_index}]: {event.text!r}")
            case QuickCheckEvent(passed=False):
                print(f"  FAIL at chunk {event.chunk_index}: {event.results[0].reason}")
            case StreamingDoneEvent():
                print(f"  stream done — {len(event.full_text)} chars")
            case FullValidationEvent():
                print(f"  final validation: {'pass' if event.passed else 'fail'}")
            case CompletedEvent():
                print(f"  completed — success={event.success}")
            case _:
                pass

    await result.acomplete()
    print(f"\nFull text: {result.full_text!r}")


asyncio.run(main())
Sample output
  chunk[0]: 'Photosynthesis is the process by which plants use sunlight, water, and carbon dioxide to produce glucose and oxygen.'
  chunk[1]: 'This reaction takes place in the chloroplasts and is essential to nearly all life on Earth.'
  stream done — 222 chars
  final validation: pass
  completed — success=True

Full text: 'Photosynthesis is the process by which plants use sunlight, water, and carbon dioxide to produce glucose and oxygen. This reaction takes place in the chloroplasts and is essential to nearly all life on Earth.'
Note: LLM output is non-deterministic. Your result will vary in wording.
Three things to notice:
  • stream_with_chunking() is called with await but returns immediately — the orchestrator runs as a background task.
  • result.events() is an async iterator that yields one event per semantic unit. The loop ends when the CompletedEvent is delivered.
  • result.acomplete() must be called after the event loop drains to propagate any orchestrator exception and to ensure the background task has fully settled.

Step 2: Early exit on failure

When stream_validate() returns "fail", the orchestrator cancels the backend immediately and stops the stream. No further chunks are delivered, and the failure is recorded in result.streaming_failures. Lower the sentence limit so the model is likely to exceed it:
# Requires: mellea
# Returns: None
import asyncio
import re

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import ChunkEvent, CompletedEvent, QuickCheckEvent, stream_with_chunking

_SENTENCE_END = re.compile(r"[.!?]+")


class MaxSentencesReq(Requirement):
    def __init__(self, limit: int) -> None:
        super().__init__()
        self._limit = limit
        self._count = 0

    def format_for_llm(self) -> str:
        return f"The response must be at most {self._limit} sentences."

    async def stream_validate(
        self, chunk: str, *, backend: Backend, ctx: Context
    ) -> PartialValidationResult:
        self._count += len(_SENTENCE_END.findall(chunk))
        if self._count > self._limit:
            return PartialValidationResult(
                "fail", reason=f"Exceeded {self._limit}-sentence limit"
            )
        return PartialValidationResult("unknown")

    async def validate(
        self, backend: Backend, ctx: Context, *, format=None, model_options=None
    ) -> ValidationResult:
        return ValidationResult(result=self._count <= self._limit)


async def main() -> None:
    from mellea.stdlib.session import start_session

    m = start_session()

    # Ask for five sentences but cap the requirement at two.
    # The stream should be cancelled after the third sentence arrives.
    result = await stream_with_chunking(
        Instruction("Write five sentences about the history of the internet."),
        m.backend,
        m.ctx,
        requirements=[MaxSentencesReq(limit=2)],
        chunking="sentence",
    )

    async for event in result.events():
        match event:
            case ChunkEvent():
                print(f"  chunk[{event.chunk_index}]: {event.text[:60]!r}...")
            case QuickCheckEvent(passed=False):
                print(f"  CANCELLED at chunk {event.chunk_index}")
            case CompletedEvent():
                print(f"  completed — success={event.success}")
            case _:
                pass

    await result.acomplete()

    if result.streaming_failures:
        req, pvr = result.streaming_failures[0]
        print(f"\nStreaming failure: {pvr.reason}")
        print(f"Text at cancellation:\n{result.full_text!r}")
    else:
        print(f"\nFull text: {result.full_text!r}")


asyncio.run(main())
Sample output
  chunk[0]: 'The internet began as ARPANET, a U.S. Defense Department pr'...
  chunk[1]: 'In the 1980s, the network expanded beyond government use and'...
  chunk[2]: 'Tim Berners-Lee invented the World Wide Web in 1989, transfo'...
  CANCELLED at chunk 2
  completed — success=False

Streaming failure: Exceeded 2-sentence limit
Text at cancellation:
'The internet began as ARPANET, a U.S. Defense Department project in the late 1960s. In the 1980s, the network expanded beyond government use and began connecting universities and research centres. Tim Berners-Lee invented the World Wide Web in 1989...'
Note: Whether the stream is cancelled depends on whether the model exceeds the limit. If the model happens to comply, streaming_failures will be empty and result.completed will be True.
result.full_text always contains the text accumulated up to the point where generation stopped — useful for debugging what the model produced before the requirement failed.

Step 3: Choosing a chunking strategy

The built-in strategies cover a coarse-to-fine spectrum:
AliasSplits onGood for
"word"WhitespaceToken-local checks: forbidden words, numeric limits
"sentence"., !, ? followed by whitespaceGrammar, coherence, per-sentence content rules
"paragraph"Two or more consecutive newlinesTopic coherence, citation presence, heading structure
The trade-off is latency vs context. Word chunking fires after every token — maximum reaction speed, but each chunk carries only a single word. Paragraph chunking waits for blank lines — full paragraph context for the validator, but detection is later and may happen after the model has produced a large amount of invalid content. To see the granularity difference concretely, switch to word chunking and print every fifth word — so you can count how many more validation events fire compared to Step 1’s two sentences:
# Requires: mellea
# Returns: None
import asyncio

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import ChunkEvent, CompletedEvent, QuickCheckEvent, stream_with_chunking

_FORBIDDEN = {"deprecated", "legacy", "obsolete"}


class ForbiddenWordReq(Requirement):
    """Cancels the stream the moment any forbidden word appears."""

    def format_for_llm(self) -> str:
        return f"Do not use any of the following words: {', '.join(sorted(_FORBIDDEN))}."

    async def stream_validate(
        self, chunk: str, *, backend: Backend, ctx: Context
    ) -> PartialValidationResult:
        word = chunk.strip().lower().strip(".,!?;:\"'")
        if word in _FORBIDDEN:
            return PartialValidationResult("fail", reason=f"Forbidden word: {chunk.strip()!r}")
        return PartialValidationResult("unknown")

    async def validate(
        self, backend: Backend, ctx: Context, *, format=None, model_options=None
    ) -> ValidationResult:
        return ValidationResult(result=True)


async def main() -> None:
    from mellea.stdlib.session import start_session

    m = start_session()

    result = await stream_with_chunking(
        Instruction(
            "Describe three advantages of cloud-native development in two sentences."
        ),
        m.backend,
        m.ctx,
        requirements=[ForbiddenWordReq()],
        chunking="word",
    )

    word_count = 0
    async for event in result.events():
        match event:
            case ChunkEvent():
                word_count += 1
                # Print every fifth word to show how many events fire.
                if word_count % 5 == 1:
                    print(f"  word {word_count:>3}: {event.text!r}")
            case QuickCheckEvent(passed=False):
                print(f"  CANCELLED at word {event.chunk_index}: {event.results[0].reason}")
            case CompletedEvent():
                status = "CANCELLED" if not event.success else "ok"
                print(f"  {status}{word_count} word events total")
            case _:
                pass

    await result.acomplete()

    if result.streaming_failures:
        print(f"Failure: {result.streaming_failures[0][1].reason}")
    else:
        print(f"Full text: {result.full_text!r}")


asyncio.run(main())
Sample output
  word   1: 'Cloud-native'
  word   6: 'resilient'
  word  11: 'and'
  word  16: 'allows'
  word  21: 'horizontally,'
  word  26: 'costs,'
  word  31: 'deployments,'
  word  36: 'services.'
  ok — 38 word events total
Full text: 'Cloud-native development enables scalable, resilient ...'
Note: LLM output is non-deterministic. Your result will vary in wording.
The same two-sentence response that produced 2 ChunkEvent items with sentence chunking now produces 38. The validator fires on every word — maximum reaction speed at the cost of per-chunk context. If a forbidden word appears, the stream stops at that word and no further ChunkEvent items are emitted. To see early exit in action, change _FORBIDDEN to include a common English word like "and" or "the".

Step 4: Raw chunk access with astream()

If you only need the validated chunks and do not want event metadata, use result.astream() instead of result.events(). It yields the text of each validated chunk as a plain string — useful for streaming output directly to a UI buffer or building the response incrementally without a match dispatch:
# Requires: mellea
# Returns: None
import asyncio
import re

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import stream_with_chunking

_SENTENCE_END = re.compile(r"[.!?]+")


class MaxSentencesReq(Requirement):
    def __init__(self, limit: int) -> None:
        super().__init__()
        self._limit = limit
        self._count = 0

    def format_for_llm(self) -> str:
        return f"The response must be at most {self._limit} sentences."

    async def stream_validate(
        self, chunk: str, *, backend: Backend, ctx: Context
    ) -> PartialValidationResult:
        self._count += len(_SENTENCE_END.findall(chunk))
        if self._count > self._limit:
            return PartialValidationResult("fail", reason=f"Exceeded {self._limit}-sentence limit")
        return PartialValidationResult("unknown")

    async def validate(
        self, backend: Backend, ctx: Context, *, format=None, model_options=None
    ) -> ValidationResult:
        return ValidationResult(result=self._count <= self._limit)


async def main() -> None:
    from mellea.stdlib.session import start_session

    m = start_session()

    result = await stream_with_chunking(
        Instruction("Write a two-sentence summary of the water cycle."),
        m.backend,
        m.ctx,
        requirements=[MaxSentencesReq(limit=3)],
        chunking="sentence",
    )

    # astream() yields only validated chunk text — no event wrapper.
    async for chunk in result.astream():
        print(chunk, end=" ", flush=True)
    print()

    await result.acomplete()
    print(f"completed={result.completed}")


asyncio.run(main())
Sample output
Water evaporates from oceans and lakes, rises into the atmosphere, and
condenses into clouds.  Precipitation falls back to Earth as rain or
snow, replenishing rivers, lakes, and groundwater.
completed=True
Note: LLM output is non-deterministic. Your result will vary in wording.
astream() and events() are independent — both are available on the same result object and can even be consumed concurrently with asyncio.gather. Each is single-consumer: calling either iterator a second time raises RuntimeError. If you need chunks after the fact, capture them to a list during iteration or read result.full_text after acomplete().

Step 5: A custom chunking strategy

The built-in strategies cover the most common boundaries. For structured output — numbered lists, code blocks, CSV rows — you can subclass ChunkingStrategy and define your own split boundary. Two methods to implement:
  • split(accumulated_text) — called on every new token delta. Return all complete chunks found so far; withhold any trailing fragment. Must be stateless: it receives the full accumulated text each time, not a delta.
  • flush(accumulated_text) — called once at natural end of stream. Release the withheld trailing fragment, or return [] to discard it.
Here is a LineChunker that splits on single newlines — natural for numbered list output where each line is one item:
# Requires: mellea
# Returns: None
import asyncio
import re

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.chunking import ChunkingStrategy
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import ChunkEvent, CompletedEvent, QuickCheckEvent, stream_with_chunking

_NUMBERED_LINE = re.compile(r"^\s*\d+[\.\)]\s")


class LineChunker(ChunkingStrategy):
    """Emits one complete line per chunk, splitting on single newlines."""

    def split(self, accumulated_text: str) -> list[str]:
        if "\n" not in accumulated_text:
            return []
        last_nl = accumulated_text.rfind("\n")
        return [line for line in accumulated_text[:last_nl].split("\n") if line.strip()]

    def flush(self, accumulated_text: str) -> list[str]:
        if not accumulated_text:
            return []
        last_nl = accumulated_text.rfind("\n")
        trailing = (
            accumulated_text if last_nl == -1 else accumulated_text[last_nl + 1 :]
        ).strip()
        return [trailing] if trailing else []


class NumberedLineReq(Requirement):
    """Cancels the stream if any line does not begin with a number."""

    def format_for_llm(self) -> str:
        return "Every line must begin with a number followed by a period (e.g. '1. ')."

    async def stream_validate(
        self, chunk: str, *, backend: Backend, ctx: Context
    ) -> PartialValidationResult:
        if not _NUMBERED_LINE.match(chunk):
            return PartialValidationResult(
                "fail", reason=f"Line does not start with a number: {chunk.strip()!r}"
            )
        return PartialValidationResult("unknown")

    async def validate(
        self, backend: Backend, ctx: Context, *, format=None, model_options=None
    ) -> ValidationResult:
        # All format checking happens during streaming. Lines that reach validate()
        # are guaranteed to have passed stream_validate() already.
        return ValidationResult(result=True)


async def main() -> None:
    from mellea.stdlib.session import start_session

    m = start_session()

    result = await stream_with_chunking(
        Instruction(
            "List five world capitals, one per line, numbered 1 through 5. "
            "Use the format: '1. City'. Output only the numbered list, nothing else."
        ),
        m.backend,
        m.ctx,
        requirements=[NumberedLineReq()],
        chunking=LineChunker(),
    )

    async for event in result.events():
        match event:
            case ChunkEvent():
                print(f"  line[{event.chunk_index}]: {event.text.strip()!r}")
            case QuickCheckEvent(passed=False):
                print(f"  FAIL: {event.results[0].reason}")
            case CompletedEvent():
                print(f"  completed — success={event.success}")
            case _:
                pass

    await result.acomplete()


asyncio.run(main())
Sample output
  line[0]: '1. London'
  line[1]: '2. Paris'
  line[2]: '3. Tokyo'
  line[3]: '4. Ottawa'
  line[4]: '5. Canberra'
  completed — success=True
Note: LLM output is non-deterministic. Your result will vary in wording.
validate() on NumberedLineReq always returns True because all format checking happens during streaming. If any line fails, the stream is cancelled before reaching validate(). Lines that do reach it have already passed stream_validate(). This pattern — enforce in stream_validate, pass in validate — is common for requirements whose invariant is a property of individual chunks rather than the full output. Pass a ChunkingStrategy instance (not a string alias) to use a custom chunker. The built-in chunkers (WordChunker, SentenceChunker, ParagraphChunker) are also available as instances if you need to pass one explicitly or subclass to override flush().
See also: docs/examples/streaming/custom_chunking.py for an annotated version of this pattern with a more detailed split()/flush() contract walkthrough.

What you built

ConceptWhat it gives you
stream_with_chunking() + requirements=Per-chunk validation with automatic early exit
result.events()Typed event stream — observe every chunk, validation result, and lifecycle signal
QuickCheckEvent(passed=False)Detect the moment a requirement fails, mid-stream
result.streaming_failuresList of (requirement, PartialValidationResult) pairs for failed checks
"word" / "sentence" / "paragraph"Built-in chunking strategies trading reaction speed for context
ChunkingStrategy subclassCustom split boundaries for structured output (lists, code, CSV)
result.astream()Raw validated chunks without event metadata

See also: How-to: Streaming with per-chunk validation | Concepts: The Requirements System — Streaming validation | Examples: streaming/