Streaming Classification
The ClassificationStream processes text incrementally with concurrent classification, making it ideal for streaming LLM responses. Text chunks are classified in parallel using a thread pool, and optional inline redaction can be applied as results arrive.
Basic Streaming
from ai_guard import AIGuardClient, ClassificationStream
from ai_guard.api import AIPlatform, ClassifierDescriptionDefault
client = AIGuardClient(
"https://ai-guard.example.com:4443",
token="your-api-key",
agent_id="my-agent",
platform=AIPlatform.AMAZON_BEDROCK,
)
# Any iterable of strings β a generator, list, file, API stream, etc.
source = ["First line here\n", "Phone: 321-507-0525\n", "Third line\n"]
stream = ClassificationStream(
input=source,
classifier_description=ClassifierDescriptionDefault(),
client=client,
context={"actor": "agent"},
)
for result in stream:
print(result.text, end="")Streaming with Redaction
Attach a ClassificationRedactor for inline redaction as chunks are classified:
from ai_guard import AIGuardClient, ClassificationStream
from ai_guard.api import AIPlatform, ClassifierDescriptionProfile
from ai_guard.redact import ClassificationRedactor, RedactPolicy, RedactAction, RedactKind
client = AIGuardClient(
"https://ai-guard.example.com:4443",
token="your-api-key",
agent_id="my-agent",
platform=AIPlatform.AMAZON_BEDROCK,
)
# Define redaction policy
policy = RedactPolicy(
actions=[RedactAction(kind=RedactKind.REDACT, classifier="US_PHONE_NUMBER")],
default=RedactKind.NONE,
redactor="*",
)
redactor = ClassificationRedactor(policy)
# Stream from any iterable source
source = ["First line here\n", "Phone: 321-507-0525\n", "Third line\n"]
stream = ClassificationStream(
input=source,
classifier_description=ClassifierDescriptionProfile(
uuid="7dbf380f-0af8-4276-acb0-85413db2dbff",
version=1,
),
client=client,
context={"actor": "agent"},
redactor=redactor,
chunk_size=50,
max_workers=4,
)
for result in stream:
if result.redaction and any(a.kind == RedactKind.BLOCK for a in result.redaction.actions):
print("Blocked!")
break
print(result.text, end="")
# Output:
# First line here
# Phone: ************
# Third lineParameters
| Parameter | Type | Required | Description |
|---|---|---|---|
input | Iterable[str] | Yes | Any iterable of text chunks (generator, list, file, API stream) |
classifier_description | ClassifierDescription | Yes | Which classifiers to use. See Classifier Descriptions |
client | AIGuardClient | Yes | An initialized AI Guard client |
context | dict | Yes | Must include "actor" set to "user" or "agent" |
redactor | ClassificationRedactor | No | Optional redactor for inline redaction |
chunk_size | int | No | Maximum characters per chunk (default: 100) |
max_workers | int | No | Thread pool size for concurrent classification (default: 4) |
Stream Results
Each iteration yields a ClassificationStreamResult with:
| Field | Type | Description |
|---|---|---|
text | str | The output text (redacted if a redactor is attached, original otherwise) |
response | ClassificationResponse | The raw classification response for this chunk |
redaction | RedactionResult | The redaction result (if a redactor is attached) |
Handling Blocked Content
When a BLOCK action is triggered during streaming, the result is still yielded so you can handle it inline:
for result in stream:
if result.redaction and any(a.kind == RedactKind.BLOCK for a in result.redaction.actions):
# Handle blocked content β stop streaming, show warning, etc.
print("[Content blocked due to sensitive data]")
break
print(result.text, end="")Input Sources
The input parameter accepts any Python iterable of strings. This makes it compatible with virtually any text source:
LLM Streaming Response (e.g., AWS Bedrock)
def bedrock_stream():
# Your LLM streaming code here
for event in bedrock_response["body"]:
chunk = event["chunk"]["bytes"].decode()
yield chunk
stream = ClassificationStream(
input=bedrock_stream(),
classifier_description=ClassifierDescriptionDefault(),
client=client,
context={"actor": "agent"},
redactor=redactor,
)File Input
with open("input.txt") as f:
stream = ClassificationStream(
input=f,
classifier_description=ClassifierDescriptionDefault(),
client=client,
context={"actor": "agent"},
)
for result in stream:
print(result.text, end="")List of Strings
chunks = ["Hello, ", "my email is ", "[email protected]", " and I live in NYC"]
stream = ClassificationStream(
input=chunks,
classifier_description=ClassifierDescriptionDefault(),
client=client,
context={"actor": "agent"},
redactor=redactor,
)Performance Tuning
Chunk Size
The chunk_size parameter controls how many characters are batched before sending a classification request. Smaller chunks provide lower latency; larger chunks reduce the number of API calls.
| Use Case | Recommended chunk_size |
|---|---|
| Real-time chat UI | 50β100 |
| Batch processing | 500β1000 |
| Default | 100 |
Max Workers
The max_workers parameter sets the thread pool size for concurrent classification requests. Increase this for higher throughput, but be mindful of your AI Guard service capacity.
| Use Case | Recommended max_workers |
|---|---|
| Single user session | 2β4 |
| High-throughput pipeline | 8β16 |
| Default | 4 |
What's Next?
- Observability & Metrics β Track classification and redaction events
- Classifier Descriptions β Choose the right classifier configuration
- Error Reference β Handle errors in streaming workflows
Updated 3 days ago