> ## Documentation Index
> Fetch the complete documentation index at: https://developer.onetrust.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Streaming Classification

The `ClassificationStream` processes text incrementally with concurrent classification, making it ideal for streaming LLM responses. Text chunks are classified in parallel using a thread pool, and optional inline redaction can be applied as results arrive.

## Basic Streaming

```python
from ai_guard import AIGuardClient, ClassificationStream
from ai_guard.api import AIPlatform, ClassifierDescriptionDefault

client = AIGuardClient(
    "https://ai-guard.example.com:4443",
    token="your-api-key",
    agent_id="my-agent",
    platform=AIPlatform.AMAZON_BEDROCK,
)

# Any iterable of strings — a generator, list, file, API stream, etc.
source = ["First line here\n", "Phone: 321-507-0525\n", "Third line\n"]

stream = ClassificationStream(
    input=source,
    classifier_description=ClassifierDescriptionDefault(),
    client=client,
    context={"actor": "agent"},
)

for result in stream:
    print(result.text, end="")
```

## Streaming with Redaction

Attach a `ClassificationRedactor` for inline redaction as chunks are classified:

```python
from ai_guard import AIGuardClient, ClassificationStream
from ai_guard.api import AIPlatform, ClassifierDescriptionProfile
from ai_guard.redact import ClassificationRedactor, RedactPolicy, RedactAction, RedactKind

client = AIGuardClient(
    "https://ai-guard.example.com:4443",
    token="your-api-key",
    agent_id="my-agent",
    platform=AIPlatform.AMAZON_BEDROCK,
)

# Define redaction policy
policy = RedactPolicy(
    actions=[RedactAction(kind=RedactKind.REDACT, classifier="US_PHONE_NUMBER")],
    default=RedactKind.NONE,
    redactor="*",
)
redactor = ClassificationRedactor(policy)

# Stream from any iterable source
source = ["First line here\n", "Phone: 321-507-0525\n", "Third line\n"]

stream = ClassificationStream(
    input=source,
    classifier_description=ClassifierDescriptionProfile(
        uuid="7dbf380f-0af8-4276-acb0-85413db2dbff",
        version=1,
    ),
    client=client,
    context={"actor": "agent"},
    redactor=redactor,
    chunk_size=50,
    max_workers=4,
)

for result in stream:
    if result.redaction and any(a.kind == RedactKind.BLOCK for a in result.redaction.actions):
        print("Blocked!")
        break
    print(result.text, end="")
# Output:
# First line here
# Phone: ************
# Third line
```

## Parameters

| Parameter                | Type                     | Required | Description                                                                                   |
| ------------------------ | ------------------------ | -------- | --------------------------------------------------------------------------------------------- |
| `input`                  | `Iterable[str]`          | Yes      | Any iterable of text chunks (generator, list, file, API stream)                               |
| `classifier_description` | `ClassifierDescription`  | Yes      | Which classifiers to use. See [Classifier Descriptions](https://developer.onetrust.com/onetrust/docs/ai-guard-classifier-descriptions) |
| `client`                 | `AIGuardClient`          | Yes      | An initialized AI Guard client                                                                |
| `context`                | `dict`                   | Yes      | Must include `"actor"` set to `"user"` or `"agent"`                                           |
| `redactor`               | `ClassificationRedactor` | No       | Optional redactor for inline redaction                                                        |
| `chunk_size`             | `int`                    | No       | Maximum characters per chunk (default: `100`)                                                 |
| `max_workers`            | `int`                    | No       | Thread pool size for concurrent classification (default: `4`)                                 |

## Stream Results

Each iteration yields a `ClassificationStreamResult` with:

| Field       | Type                     | Description                                                              |
| ----------- | ------------------------ | ------------------------------------------------------------------------ |
| `text`      | `str`                    | The output text (redacted if a redactor is attached, original otherwise) |
| `response`  | `ClassificationResponse` | The raw classification response for this chunk                           |
| `redaction` | `RedactionResult`        | The redaction result (if a redactor is attached)                         |

## Handling Blocked Content

When a `BLOCK` action is triggered during streaming, the result is still yielded so you can handle it inline:

```python
for result in stream:
    if result.redaction and any(a.kind == RedactKind.BLOCK for a in result.redaction.actions):
        # Handle blocked content — stop streaming, show warning, etc.
        print("[Content blocked due to sensitive data]")
        break
    print(result.text, end="")
```

## Input Sources

The `input` parameter accepts any Python iterable of strings. This makes it compatible with virtually any text source:

### LLM Streaming Response (e.g., AWS Bedrock)

```python
def bedrock_stream():
    # Your LLM streaming code here
    for event in bedrock_response["body"]:
        chunk = event["chunk"]["bytes"].decode()
        yield chunk

stream = ClassificationStream(
    input=bedrock_stream(),
    classifier_description=ClassifierDescriptionDefault(),
    client=client,
    context={"actor": "agent"},
    redactor=redactor,
)
```

### File Input

```python
with open("input.txt") as f:
    stream = ClassificationStream(
        input=f,
        classifier_description=ClassifierDescriptionDefault(),
        client=client,
        context={"actor": "agent"},
    )
    for result in stream:
        print(result.text, end="")
```

### List of Strings

```python
chunks = ["Hello, ", "my email is ", "user@example.com", " and I live in NYC"]

stream = ClassificationStream(
    input=chunks,
    classifier_description=ClassifierDescriptionDefault(),
    client=client,
    context={"actor": "agent"},
    redactor=redactor,
)
```

## Performance Tuning

### Chunk Size

The `chunk_size` parameter controls how many characters are batched before sending a classification request. Smaller chunks provide lower latency; larger chunks reduce the number of API calls.

| Use Case          | Recommended `chunk_size` |
| ----------------- | ------------------------ |
| Real-time chat UI | 50–100                   |
| Batch processing  | 500–1000                 |
| Default           | 100                      |

### Max Workers

The `max_workers` parameter sets the thread pool size for concurrent classification requests. Increase this for higher throughput, but be mindful of your AI Guard service capacity.

| Use Case                 | Recommended `max_workers` |
| ------------------------ | ------------------------- |
| Single user session      | 2–4                       |
| High-throughput pipeline | 8–16                      |
| Default                  | 4                         |

## What's Next?

* [Observability & Metrics](https://developer.onetrust.com/onetrust/docs/ai-guard-metrics) — Track classification and redaction events
* [Classifier Descriptions](https://developer.onetrust.com/onetrust/docs/ai-guard-classifier-descriptions) — Choose the right classifier configuration
* [Error Reference](https://developer.onetrust.com/onetrust/docs/ai-guard-error-reference) — Handle errors in streaming workflows