hatClient;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
@Service
public class InferenceStreamService {
private final ChatClient aiClient;
public InferenceStreamService(ChatClient.Builder clientBuilder) {
this.aiClient = clientBuilder.build();
}
public Flux<String> generateConversationStream(String userPrompt) {
return aiClient.prompt()
.user(userPrompt)
.stream()
.content()
.doOnCancel(() -> System.out.println("Stream terminated by client"))
.doOnError(err -> System.err.println("Inference pipeline error: " + err.getMessage()));
}
}
**Architecture Rationale:**
- `ChatClient` is the modern Spring AI abstraction for LLM interactions. It handles provider-specific chunking and tokenization.
- `.stream().content()` returns a `Flux<String>` where each emission represents a generated token or token chunk.
- `doOnCancel` and `doOnError` attach lifecycle hooks without breaking the reactive chain. This prevents resource leaks when clients disconnect mid-stream.
### Step 2: SSE Controller Endpoint
Expose the stream using the `text/event-stream` media type. Spring WebFlux automatically handles SSE framing when this content type is declared.
```java
package com.codcompass.ai.web;
import com.codcompass.ai.service.InferenceStreamService;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
@RestController
@RequestMapping("/api/v1/conversations")
public class StreamController {
private final InferenceStreamService inferenceService;
public StreamController(InferenceStreamService inferenceService) {
this.inferenceService = inferenceService;
}
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> handleStreamingRequest(@RequestParam String query) {
return inferenceService.generateConversationStream(query);
}
}
Architecture Rationale:
MediaType.TEXT_EVENT_STREAM_VALUE signals to the framework that this is a long-lived, unidirectional stream.
- Spring automatically prefixes each emission with
data: and appends double newlines, complying with the SSE specification.
- Returning
Flux<String> keeps the pipeline non-blocking. The thread is released immediately after subscription, and the event loop handles token delivery.
Step 3: Frontend Consumer with Resilience
Browser-native EventSource handles SSE connections, but production interfaces require reconnection logic, error boundaries, and token buffering to prevent UI thrashing.
class LlmStreamConsumer {
constructor(endpoint) {
this.endpoint = endpoint;
this.buffer = [];
this.flushThreshold = 4;
this.eventSource = null;
}
connect(query, onUpdate, onComplete, onError) {
const url = `${this.endpoint}?query=${encodeURIComponent(query)}`;
this.eventSource = new EventSource(url);
this.eventSource.onmessage = (event) => {
this.buffer.push(event.data);
if (this.buffer.length >= this.flushThreshold) {
onUpdate(this.buffer.join(''));
this.buffer = [];
}
};
this.eventSource.onerror = () => {
if (this.eventSource.readyState === EventSource.CLOSED) {
onError(new Error('Connection permanently closed'));
} else {
// Auto-reconnect with exponential backoff
setTimeout(() => this.connect(query, onUpdate, onComplete, onError), 1000);
}
};
this.eventSource.addEventListener('complete', () => {
if (this.buffer.length > 0) onUpdate(this.buffer.join(''));
onComplete();
this.eventSource.close();
});
}
disconnect() {
if (this.eventSource) this.eventSource.close();
}
}
// Usage
const streamer = new LlmStreamConsumer('/api/v1/conversations/stream');
streamer.connect(
'Explain reactive backpressure in Spring WebFlux',
(chunk) => document.getElementById('output').textContent += chunk,
() => console.log('Generation finished'),
(err) => console.error('Stream failed:', err)
);
Architecture Rationale:
- Token buffering (
flushThreshold) reduces DOM manipulation frequency. Emitting every single token causes layout thrashing and CPU spikes in the browser.
onerror distinguishes between transient network drops and permanent failures, enabling automatic reconnection.
- Custom
complete event handling ensures the final buffer is flushed before closing the connection.
Pitfall Guide
1. Ignoring Reactive Backpressure
Explanation: LLMs can generate tokens faster than the network or frontend can consume them. Unbounded Flux streams may accumulate tokens in memory, causing OOM errors or dropped packets.
Fix: Apply .limitRate(32) or .onBackpressureBuffer() to the reactive chain. This applies credit-based flow control, ensuring the pipeline only requests tokens the downstream can handle.
2. Mixing Blocking Calls with Reactive Streams
Explanation: Introducing synchronous database calls, file I/O, or Thread.sleep() inside a Flux pipeline blocks the event loop threads. This cascades into thread pool exhaustion and stalls all concurrent SSE connections.
Fix: Replace blocking operations with reactive equivalents (ReactiveRepository, WebClient, Sinks). If blocking is unavoidable, offload to a bounded elastic scheduler using .subscribeOn(Schedulers.boundedElastic()).
Explanation: Manually constructing SSE responses without adhering to the data:\n\n format breaks browser parsing. Some developers attempt to use ResponseEntity<String> with custom headers, which bypasses Spring's SSE formatter.
Fix: Always declare produces = MediaType.TEXT_EVENT_STREAM_VALUE and return Flux<String>. Let Spring's HttpMessageWriter handle protocol framing. If using SseEmitter, manually format payloads with SseEmitter.event().data(chunk).name("message").
4. Silent Connection Drops Without Reconnection
Explanation: Network flakiness, proxy timeouts, or load balancer idle limits frequently terminate SSE connections. Without client-side reconnection logic, users experience abrupt silence with no recovery path.
Fix: Implement exponential backoff reconnection in the frontend. Track connection state, preserve conversation context, and resume streaming from the last known token index if the backend supports checkpointing.
5. Over-Streaming Raw Tokens to the UI
Explanation: Emitting every model token directly to the DOM causes excessive reflows, especially with markdown rendering or syntax highlighting. This degrades client-side performance and increases battery consumption on mobile devices.
Fix: Buffer tokens server-side or client-side (3β5 tokens per flush). Use requestAnimationFrame for DOM updates to align with the browser's paint cycle. Debounce markdown parsing until a sentence boundary or newline is detected.
6. CORS Misconfiguration for Streaming Endpoints
Explanation: Browsers enforce strict CORS policies for EventSource. Missing Access-Control-Allow-Origin or improper Access-Control-Expose-Headers causes silent preflight failures, especially when the frontend and backend run on different ports or domains.
Fix: Configure global CORS in Spring Boot:
@Bean
public WebMvcConfigurer corsConfigurer() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/v1/**")
.allowedOrigins("https://app.example.com")
.allowedMethods("GET")
.exposedHeaders("X-Stream-Status", "X-Request-Id");
}
};
}
7. Memory Leaks from Unclosed Subscriptions
Explanation: When a client disconnects, the Flux subscription may remain active if not properly disposed. This leaks memory and keeps LLM provider connections open, incurring unnecessary API costs.
Fix: Attach .doOnCancel() and .doOnTerminate() to the stream. In Spring AI, ensure ChatClient sessions are scoped to the request lifecycle. Use Disposable tracking if manually managing subscriptions outside the controller layer.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Unidirectional chat, low infrastructure overhead | SSE + Spring AI Flux | HTTP-native, no WebSocket server required, easy load balancer support | Low (standard HTTP routing) |
| Bidirectional real-time collaboration, typing indicators, presence | WebSockets | Full-duplex communication, lower latency for client-to-server events | Medium (requires WebSocket server & session management) |
| Legacy browser support, strict proxy environments | Long Polling | Fallback when SSE/WebSocket is blocked by corporate firewalls | High (frequent HTTP overhead, higher latency) |
| High-throughput batch inference with progress tracking | Server-Sent Events + Checkpointing | Decouples generation from delivery, enables resume-on-failure | Medium (requires state storage for checkpoints) |
Configuration Template
# application.yml
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4o-mini
temperature: 0.7
stream: true
ollama:
base-url: http://localhost:11434
chat:
options:
model: llama3.2
temperature: 0.6
webflux:
server:
max-in-memory-size: 10MB
codec:
max-in-memory-size: 10MB
server:
netty:
worker-count: 4
select-count: 2
servlet:
session:
timeout: 0
management:
endpoints:
web:
exposure:
include: health,metrics,prometheus
metrics:
tags:
application: ai-streaming-gateway
Quick Start Guide
- Initialize Project: Generate a Spring Boot 3.3+ project with
spring-boot-starter-webflux and spring-ai-starter. Add your preferred LLM provider dependency (OpenAI, Ollama, Anthropic).
- Configure Streaming: Set
spring.ai.[provider].chat.options.stream=true in application.yml. Ensure the provider supports incremental token emission.
- Deploy Endpoint: Create a
@RestController returning Flux<String> with MediaType.TEXT_EVENT_STREAM_VALUE. Inject ChatClient and call .stream().content().
- Connect Frontend: Instantiate
EventSource pointing to the streaming endpoint. Attach onmessage for progressive rendering and onerror for reconnection logic.
- Validate & Monitor: Use
curl -N or browser DevTools to verify SSE framing. Instrument TTFT metrics and adjust token buffering thresholds based on client performance profiles.