Streaming Classification

The ClassificationStream processes text incrementally with concurrent classification, making it ideal for streaming LLM responses. Text chunks are classified in parallel using a thread pool, and optional inline redaction can be applied as results arrive.

Basic Streaming

from ai_guard import AIGuardClient, ClassificationStream
from ai_guard.api import AIPlatform, ClassifierDescriptionDefault

client = AIGuardClient(
    "https://ai-guard.example.com:4443",
    token="your-api-key",
    agent_id="my-agent",
    platform=AIPlatform.AMAZON_BEDROCK,
)

# Any iterable of strings β€” a generator, list, file, API stream, etc.
source = ["First line here\n", "Phone: 321-507-0525\n", "Third line\n"]

stream = ClassificationStream(
    input=source,
    classifier_description=ClassifierDescriptionDefault(),
    client=client,
    context={"actor": "agent"},
)

for result in stream:
    print(result.text, end="")

Streaming with Redaction

Attach a ClassificationRedactor for inline redaction as chunks are classified:

from ai_guard import AIGuardClient, ClassificationStream
from ai_guard.api import AIPlatform, ClassifierDescriptionProfile
from ai_guard.redact import ClassificationRedactor, RedactPolicy, RedactAction, RedactKind

client = AIGuardClient(
    "https://ai-guard.example.com:4443",
    token="your-api-key",
    agent_id="my-agent",
    platform=AIPlatform.AMAZON_BEDROCK,
)

# Define redaction policy
policy = RedactPolicy(
    actions=[RedactAction(kind=RedactKind.REDACT, classifier="US_PHONE_NUMBER")],
    default=RedactKind.NONE,
    redactor="*",
)
redactor = ClassificationRedactor(policy)

# Stream from any iterable source
source = ["First line here\n", "Phone: 321-507-0525\n", "Third line\n"]

stream = ClassificationStream(
    input=source,
    classifier_description=ClassifierDescriptionProfile(
        uuid="7dbf380f-0af8-4276-acb0-85413db2dbff",
        version=1,
    ),
    client=client,
    context={"actor": "agent"},
    redactor=redactor,
    chunk_size=50,
    max_workers=4,
)

for result in stream:
    if result.redaction and any(a.kind == RedactKind.BLOCK for a in result.redaction.actions):
        print("Blocked!")
        break
    print(result.text, end="")
# Output:
# First line here
# Phone: ************
# Third line

Parameters

ParameterTypeRequiredDescription
inputIterable[str]YesAny iterable of text chunks (generator, list, file, API stream)
classifier_descriptionClassifierDescriptionYesWhich classifiers to use. See Classifier Descriptions
clientAIGuardClientYesAn initialized AI Guard client
contextdictYesMust include "actor" set to "user" or "agent"
redactorClassificationRedactorNoOptional redactor for inline redaction
chunk_sizeintNoMaximum characters per chunk (default: 100)
max_workersintNoThread pool size for concurrent classification (default: 4)

Stream Results

Each iteration yields a ClassificationStreamResult with:

FieldTypeDescription
textstrThe output text (redacted if a redactor is attached, original otherwise)
responseClassificationResponseThe raw classification response for this chunk
redactionRedactionResultThe redaction result (if a redactor is attached)

Handling Blocked Content

When a BLOCK action is triggered during streaming, the result is still yielded so you can handle it inline:

for result in stream:
    if result.redaction and any(a.kind == RedactKind.BLOCK for a in result.redaction.actions):
        # Handle blocked content β€” stop streaming, show warning, etc.
        print("[Content blocked due to sensitive data]")
        break
    print(result.text, end="")

Input Sources

The input parameter accepts any Python iterable of strings. This makes it compatible with virtually any text source:

LLM Streaming Response (e.g., AWS Bedrock)

def bedrock_stream():
    # Your LLM streaming code here
    for event in bedrock_response["body"]:
        chunk = event["chunk"]["bytes"].decode()
        yield chunk

stream = ClassificationStream(
    input=bedrock_stream(),
    classifier_description=ClassifierDescriptionDefault(),
    client=client,
    context={"actor": "agent"},
    redactor=redactor,
)

File Input

with open("input.txt") as f:
    stream = ClassificationStream(
        input=f,
        classifier_description=ClassifierDescriptionDefault(),
        client=client,
        context={"actor": "agent"},
    )
    for result in stream:
        print(result.text, end="")

List of Strings

chunks = ["Hello, ", "my email is ", "[email protected]", " and I live in NYC"]

stream = ClassificationStream(
    input=chunks,
    classifier_description=ClassifierDescriptionDefault(),
    client=client,
    context={"actor": "agent"},
    redactor=redactor,
)

Performance Tuning

Chunk Size

The chunk_size parameter controls how many characters are batched before sending a classification request. Smaller chunks provide lower latency; larger chunks reduce the number of API calls.

Use CaseRecommended chunk_size
Real-time chat UI50–100
Batch processing500–1000
Default100

Max Workers

The max_workers parameter sets the thread pool size for concurrent classification requests. Increase this for higher throughput, but be mindful of your AI Guard service capacity.

Use CaseRecommended max_workers
Single user session2–4
High-throughput pipeline8–16
Default4

What's Next?