AI Filters, Routing & Guardrails
Atmosphere’s AI layer provides three categories of infrastructure that sit between LLM responses and browser clients: filters that process the text stream, routing that directs requests to the right model, and guardrails that enforce safety policies before and after LLM calls.
AI stream filters
Section titled “AI stream filters”AI filters extend the same BroadcastFilter mechanism used by Atmosphere’s core, specialized for the AI streaming wire protocol. The base class AiStreamBroadcastFilter (in org.atmosphere.ai.filter) handles RawMessage unwrapping, JSON parsing via AiStreamMessage, and re-wrapping automatically. Subclasses only implement filterAiMessage().
AiStreamBroadcastFilter
Section titled “AiStreamBroadcastFilter”This abstract class implements BroadcastFilterLifecycle. Its filter() method:
- Checks if the message is a
RawMessagewrapping a JSON string. - Parses it into an
AiStreamMessage(which has fields:type,data,sessionId,seq, etc.). - Delegates to the abstract
filterAiMessage()method. - Non-AI messages pass through unchanged.
public abstract class AiStreamBroadcastFilter implements BroadcastFilterLifecycle {
protected abstract BroadcastAction filterAiMessage( String broadcasterId, AiStreamMessage msg, String originalJson, RawMessage rawMessage);}Subclasses return one of:
new BroadcastAction(rawMessage)— pass through unchanged.new BroadcastAction(new RawMessage(modified.toJson()))— pass through modified.new BroadcastAction(ACTION.ABORT, rawMessage)— drop the message.new BroadcastAction(ACTION.SKIP, rawMessage)— stop filter chain, deliver as-is.
PiiRedactionFilter
Section titled “PiiRedactionFilter”Detects and redacts personally identifiable information from AI-generated text streams. Since streaming texts arrive one or a few words at a time, the filter buffers streaming texts per session until a sentence boundary (., !, ?, or newline) is detected. At that point it scans the buffered sentence, redacts matches, and emits the cleaned text.
Default patterns:
| Pattern | What it matches |
|---|---|
email | Standard email addresses |
us-phone | US phone numbers (10+ digits, various formats) |
ssn | US Social Security Numbers (NNN-NN-NNNN) |
credit-card | Credit card numbers (13-19 digits with optional separators) |
Usage:
broadcaster.getBroadcasterConfig().addFilter(new PiiRedactionFilter());With a custom replacement string:
broadcaster.getBroadcasterConfig().addFilter(new PiiRedactionFilter("***"));Adding custom patterns:
var filter = new PiiRedactionFilter();filter.addPattern("uk-nino", Pattern.compile("[A-Z]{2}\\d{6}[A-Z]"));broadcaster.getBroadcasterConfig().addFilter(filter);Removing a default pattern:
filter.removePattern("credit-card");On stream completion, any remaining buffered text is flushed with redaction applied. The filter uses a deferred broadcast on a virtual thread to emit the flushed streaming text before the terminal complete message.
ContentSafetyFilter
Section titled “ContentSafetyFilter”Scans AI-generated content for harmful patterns and blocks or replaces unsafe content mid-stream. Like PiiRedactionFilter, it buffers streaming texts into sentence-sized chunks for context-aware scanning. A pluggable SafetyChecker interface allows custom safety logic.
Safety outcomes are modeled as a sealed interface:
public sealed interface SafetyResult permits SafetyResult.Safe, SafetyResult.Unsafe, SafetyResult.Redacted { record Safe() implements SafetyResult {} record Unsafe(String reason) implements SafetyResult {} record Redacted(String cleanText) implements SafetyResult {}}- Safe: pass through unchanged.
- Unsafe: abort the entire stream and send an error message to the client.
- Redacted: replace the text with a cleaned version and continue streaming.
Built-in checker factories:
// Block the stream entirely when a keyword is foundvar checker = ContentSafetyFilter.keywordChecker(Set.of("harmful-term"));broadcaster.getBroadcasterConfig().addFilter(new ContentSafetyFilter(checker));
// Redact keywords instead of blockingvar checker = ContentSafetyFilter.redactingChecker( Set.of("sensitive-term"), "[FILTERED]");broadcaster.getBroadcasterConfig().addFilter(new ContentSafetyFilter(checker));For external moderation APIs, implement SafetyChecker directly:
SafetyChecker apiChecker = text -> { var result = moderationApi.check(text); if (result.isBlocked()) { return new SafetyResult.Unsafe(result.reason()); } return new SafetyResult.Safe();};CostMeteringFilter
Section titled “CostMeteringFilter”Tracks streaming text counts per session and per broadcaster, and optionally enforces streaming text budgets by aborting streams that exceed their allocation. This filter does not modify streaming text content.
var metering = new CostMeteringFilter();metering.setBudget("user-123-broadcaster", 10000); // max 10K streaming textsbroadcaster.getBroadcasterConfig().addFilter(metering);When a budget is exceeded, the filter marks the session as exceeded, drops subsequent streaming texts with ACTION.ABORT, and injects a single error message to notify the client.
Querying usage:
long sessionStreamingTexts = metering.getSessionStreamingTextCount("session-id");long broadcasterStreamingTexts = metering.getBroadcasterStreamingTextCount("broadcaster-id");metering.resetBroadcasterCount("broadcaster-id"); // rolling window resetWiring a StreamingTextBudgetManager for persistent budget tracking:
metering.setBudgetManager(budgetManager, sessionId -> lookupUserId(sessionId));When setBudgetManager is configured, the filter calls budgetManager.recordUsage(ownerId, streamingTextCount) on every stream completion.
Streaming text budget management
Section titled “Streaming text budget management”The StreamingTextBudgetManager (in org.atmosphere.ai.budget) manages per-user or per-organization streaming text budgets with graceful degradation.
var budgetManager = new StreamingTextBudgetManager();budgetManager.setBudget(new StreamingTextBudgetManager.Budget( "user-123", 100_000, "gemini-2.5-flash", 0.8));The Budget record:
public record Budget( String ownerId, long maxStreamingTexts, String fallbackModel, double degradationThreshold) {}When usage approaches the degradationThreshold fraction (e.g., 80%), recommendedModel() returns the cheaper fallback model name. When the budget is fully exhausted, it throws BudgetExceededException:
try { Optional<String> fallback = budgetManager.recommendedModel("user-123"); // fallback.isPresent() means "switch to the cheaper model"} catch (BudgetExceededException e) { // Budget fully exhausted: e.ownerId(), e.budget(), e.used()}Other operations:
long remaining = budgetManager.remaining("user-123");long used = budgetManager.currentUsage("user-123");budgetManager.resetUsage("user-123"); // new billing periodbudgetManager.removeBudget("user-123");AiGuardrail
Section titled “AiGuardrail”The AiGuardrail interface (in org.atmosphere.ai) provides pre-LLM and post-LLM inspection. Guardrails run in the interceptor chain:
Guardrails (pre) -> Rate Limit -> RAG -> [LLM call] -> Guardrails (post) -> Observabilitypublic interface AiGuardrail {
default GuardrailResult inspectRequest(AiRequest request) { return GuardrailResult.pass(); }
default GuardrailResult inspectResponse(String accumulatedResponse) { return GuardrailResult.pass(); }}GuardrailResult is a sealed interface with three variants:
sealed interface GuardrailResult { record Pass() implements GuardrailResult {} record Modify(AiRequest modifiedRequest) implements GuardrailResult {} record Block(String reason) implements GuardrailResult {}
static GuardrailResult pass() { return new Pass(); } static GuardrailResult modify(AiRequest req) { return new Modify(req); } static GuardrailResult block(String reason) { return new Block(reason); }}Example guardrail:
public class PiiGuardrail implements AiGuardrail { @Override public GuardrailResult inspectRequest(AiRequest request) { if (containsPii(request.message())) { return GuardrailResult.block("PII detected in request"); } return GuardrailResult.pass(); }}Register guardrails on an endpoint:
@AiEndpoint(path = "/chat", guardrails = {PiiGuardrail.class})Model routing
Section titled “Model routing”The ModelRouter interface (in org.atmosphere.ai) mirrors Atmosphere’s transport failover pattern (WebSocket -> SSE -> long-polling) applied to the AI layer (GPT-4 -> Claude -> Gemini).
public interface ModelRouter { Optional<AiSupport> route( AiRequest request, List<AiSupport> availableBackends, Set<AiCapability> requiredCapabilities);
void reportFailure(AiSupport backend, Throwable error); void reportSuccess(AiSupport backend);}FallbackStrategy
Section titled “FallbackStrategy”The ModelRouter.FallbackStrategy enum defines four strategies:
| Strategy | Behavior |
|---|---|
NONE | Use the primary model only |
FAILOVER | On failure, try the next backend in priority order |
ROUND_ROBIN | Distribute requests across backends |
CONTENT_BASED | Route based on request characteristics (model hint, tool requirements) |
DefaultModelRouter
Section titled “DefaultModelRouter”The default implementation uses a circuit breaker pattern for health tracking:
- Consecutive failures increment a failure counter.
- After
maxConsecutiveFailures(default 3), the backend is marked unhealthy. - After a cooldown period (default 1 minute), the backend is eligible again.
- A success resets the failure counter.
var router = new DefaultModelRouter(FallbackStrategy.FAILOVER);// or with custom thresholds:var router = new DefaultModelRouter(FallbackStrategy.ROUND_ROBIN, 5, Duration.ofMinutes(2));For CONTENT_BASED routing, the router checks request.model() for a model hint and request.tools() for tool requirements, preferring backends with AiCapability.TOOL_CALLING.
RoutingAiSupport
Section titled “RoutingAiSupport”Wraps a ModelRouter and a list of backends into a single AiSupport instance. On failure, it attempts one retry with the next backend:
var routing = new RoutingAiSupport(router, List.of( springAiSupport, langChain4jSupport, adkSupport));The name() returns a descriptive string like routing(spring-ai,langchain4j,google-adk). Its capabilities() is the union of all backends’ capabilities.
RoutingLlmClient
Section titled “RoutingLlmClient”For lower-level control, RoutingLlmClient (in org.atmosphere.ai.routing) routes at the LlmClient level with configurable rules:
var router = RoutingLlmClient.builder(defaultClient, "gemini-2.5-flash") .route(RoutingRule.contentBased( prompt -> prompt.contains("code"), openaiClient, "gpt-4o")) .route(RoutingRule.contentBased( prompt -> prompt.contains("translate"), claudeClient, "claude-3-haiku")) .build();
router.streamChatCompletion(request, session);Rules are evaluated in order. The first matching rule determines the target client and model. If no rule matches, the default client is used.
Cost-based and Latency-based Routing
Section titled “Cost-based and Latency-based Routing”Beyond content-based rules, RoutingLlmClient supports cost and latency constraints via ModelOption — a record that attaches cost, latency, and capability metadata to each model:
var models = List.of( new RoutingRule.ModelOption(geminiClient, "gemini-2.5-flash", 0.001, 200, 80), new RoutingRule.ModelOption(openaiClient, "gpt-4o", 0.01, 500, 95), new RoutingRule.ModelOption(claudeClient, "claude-3-haiku", 0.002, 150, 70));
var router = RoutingLlmClient.builder(geminiClient, "gemini-2.5-flash") // Under budget: pick the most capable model that fits .route(RoutingRule.costBased(5.0, models)) // Low latency: pick the most capable model under 300ms .route(RoutingRule.latencyBased(300, models)) // Content fallback .route(RoutingRule.contentBased( prompt -> prompt.contains("code"), openaiClient, "gpt-4o")) .build();Cost-based (CostBased): filters models where costPerStreamingText * maxStreamingTexts <= maxCost, then selects the highest-capability model. This lets you use GPT-4o for short prompts and fall back to cheaper models for long ones.
Latency-based (LatencyBased): filters models where averageLatencyMs <= maxLatencyMs, then selects the highest-capability model. Useful for real-time UIs that need sub-second time-to-first-token.
The ModelOption fields:
| Field | Description |
|---|---|
costPerStreamingText | Cost per streaming text in arbitrary units |
averageLatencyMs | Average response latency in milliseconds |
capability | Capability score (higher = more capable); used for tie-breaking |
Budget-aware Degradation
Section titled “Budget-aware Degradation”Combine routing with StreamingTextBudgetManager for automatic model degradation when a user or organization approaches their budget:
var router = RoutingLlmClient.builder(defaultClient, "gpt-4o") .budgetManager(budgetManager, request -> extractOrgId(request)) .route(RoutingRule.costBased(10.0, models)) .build();When an owner’s usage exceeds the degradation threshold, the router switches to the budget manager’s recommended model before evaluating rules. If the budget is fully exhausted, a BudgetExceededException is sent as an error to the client.
Fan-out streaming
Section titled “Fan-out streaming”Fan-out sends the same prompt to multiple models simultaneously, with each model streaming texts through its own child session. The FanOutStreamingSession (in org.atmosphere.ai.fanout) orchestrates this.
FanOutStrategy
Section titled “FanOutStrategy”A sealed interface with three variants:
public sealed interface FanOutStrategy { record AllResponses() implements FanOutStrategy {} record FirstComplete() implements FanOutStrategy {} record FastestStreamingTexts(int streamingTextThreshold) implements FanOutStrategy {}}| Strategy | Behavior |
|---|---|
AllResponses | All models stream to completion. The client receives interleaved text streams distinguishable by session ID. |
FirstComplete | First model to finish wins. All other in-flight calls are cancelled. |
FastestStreamingTexts(n) | Observe streaming text production speed for n initial streaming texts, then keep the fastest model and cancel the rest. |
ModelEndpoint
Section titled “ModelEndpoint”Describes one model to fan out to:
public record ModelEndpoint(String id, LlmClient client, String model) {}FanOutResult
Section titled “FanOutResult”Available after fan-out completes:
public record FanOutResult( String modelId, String fullResponse, long timeToFirstStreamingTextMs, long totalTimeMs, int streamingTextCount) {}var endpoints = List.of( new ModelEndpoint("gemini", geminiClient, "gemini-2.5-flash"), new ModelEndpoint("gpt4", openaiClient, "gpt-4o"));
try (var fanOut = new FanOutStreamingSession(session, endpoints, new FanOutStrategy.AllResponses(), resource)) { fanOut.fanOut(ChatCompletionRequest.of("ignored", userPrompt));
// After completion, inspect results Map<String, FanOutResult> results = fanOut.getResults(); var geminiResult = results.get("gemini"); logger.info("Gemini TTFT: {}ms, total: {}ms, streaming texts: {}", geminiResult.timeToFirstStreamingTextMs(), geminiResult.totalTimeMs(), geminiResult.streamingTextCount());}Child sessions use IDs of the form parentSessionId + "-" + endpointId, so the client can distinguish which model produced each streaming text. The parent session receives metadata events: fanout.models (list of model IDs at start) and fanout.complete (boolean at end).
Putting it all together
Section titled “Putting it all together”A typical production setup combines filters, routing, and budget management:
// 1. Set up budget managementvar budgetManager = new StreamingTextBudgetManager();budgetManager.setBudget(new StreamingTextBudgetManager.Budget( "org-acme", 500_000, "gemini-2.5-flash", 0.8));
// 2. Set up routing with failovervar router = new DefaultModelRouter(FallbackStrategy.FAILOVER);var routing = new RoutingAiSupport(router, List.of( springAiSupport, langChain4jSupport));
// 3. Add filters to the broadcastervar metering = new CostMeteringFilter();metering.setBudgetManager(budgetManager, sid -> lookupOrgId(sid));
broadcaster.getBroadcasterConfig().addFilter(new PiiRedactionFilter());broadcaster.getBroadcasterConfig().addFilter( new ContentSafetyFilter(ContentSafetyFilter.keywordChecker(blockedTerms)));broadcaster.getBroadcasterConfig().addFilter(metering);The filter chain processes every streaming text in order: PII redaction first, then content safety, then cost metering. If PII redaction buffers a streaming text (waiting for a sentence boundary), it is not visible to downstream filters until the sentence is complete.
Testing AI Endpoints
Section titled “Testing AI Endpoints”The atmosphere-ai-test module provides a lightweight testing framework for AI endpoints without spinning up a full server.
Dependency
Section titled “Dependency”<dependency> <groupId>org.atmosphere</groupId> <artifactId>atmosphere-ai-test</artifactId> <version>LATEST</version> <scope>test</scope></dependency>AiTestClient
Section titled “AiTestClient”AiTestClient wraps an AiSupport implementation and captures the full streaming response for assertion:
@Testvoid toolsAreCalled() { var client = new AiTestClient(myAiSupport); var response = client.prompt("What's the weather in Tokyo?");
AiAssertions.assertThat(response) .hasToolCall("get_weather") .withArgument("city", "Tokyo") .hasResult() .and() .containsText("Tokyo") .completedWithin(Duration.ofSeconds(10)) .hasNoErrors();}AiResponse
Section titled “AiResponse”The captured AiResponse record exposes:
| Field | Type | Description |
|---|---|---|
text() | String | Full accumulated text response |
events() | List<AiEvent> | All structured events emitted during streaming |
metadata() | Map<String, Object> | Metadata key-value pairs |
errors() | List<String> | Error messages, if any |
elapsed() | Duration | Wall-clock response time |
completed() | boolean | Whether the stream completed normally |
Filter events by type:
List<AiEvent.ToolStart> toolCalls = response.eventsOfType(AiEvent.ToolStart.class);AiAssertions
Section titled “AiAssertions”Fluent assertion API that chains naturally:
AiAssertions.assertThat(response) .containsText("weather") .containsEventType(AiEvent.ToolStart.class) .hasMetadata("routing.model") .isComplete() .hasNoErrors();Tool call assertions support argument inspection:
AiAssertions.assertThat(response) .hasToolCall("search_docs") .withArgument("query", "atmosphere framework") .hasResult() .and() .completedWithin(Duration.ofSeconds(5));Samples
Section titled “Samples”samples/spring-boot-ai-tools/— demonstrates theCostMeteringInterceptorthat tracks streaming text usage and sends routing metadata to the client.samples/spring-boot-spring-ai-routing/— demonstrates multi-model routing withRoutingAiSupportandDefaultModelRouterusing Spring AI backends.