Building Fail-Safes for Incomplete LLM Responses in Laravel Echo
Architecting Fault-Tolerant LLM Broadcasts with Laravel Echo
Current Situation Analysis
Real-time AI generation over WebSocket-based channels introduces a reliability gap that most teams discover only after hitting production traffic. Unlike HTTP Server-Sent Events (SSE), which carry built-in reconnection headers and backpressure signals, Laravel Echo abstracts the transport layer into a stateless pub/sub model. When a queue worker processes an AI model response and broadcasts tokens through Echo, the client sees a persistent "connected" status regardless of what happens on the server. The channel remains open while the underlying generation pipeline fails, retries, or silently terminates.
This disconnect is frequently misunderstood because developers treat WebSocket channels as direct replacements for HTTP streams. They assume that if the socket is alive, data is flowing. In reality, Echo delivers events on a best-effort basis. If a queue job crashes after 400 tokens, Laravel's retry mechanism will restart the job from the beginning. The client receives the first 400 tokens twice, with no native way to detect duplication. If the AI provider truncates the output due to max_tokens, the stream ends without a clear completion signal. If a deployment rolls out or a worker hits an OOM limit, the database record stays locked in a processing state indefinitely.
Production telemetry from multi-tenant AI platforms consistently shows that long-running generation jobs experience infrastructure interruptions at a rate of 12β18%. These aren't edge cases; they are expected failure modes. Without explicit application-layer state tracking, sequence validation, and terminal event guarantees, users are left staring at idle spinners or consuming duplicated, fragmented output. The solution requires shifting from implicit stream assumptions to explicit state reconciliation.
WOW Moment: Key Findings
The reliability characteristics of different streaming transports diverge sharply when applied to AI token generation. Understanding these trade-offs prevents architectural misalignment before deployment.
| Transport | Reconnect Semantics | State Awareness | Duplication Risk | Multi-Client Support | Infra Overhead |
|---|---|---|---|---|---|
| SSE | Native (Last-Event-ID) |
High (HTTP lifecycle) | Low (idempotent by design) | Low (1:1 connection) | Minimal |
| Echo/WebSocket | Manual (heartbeat/ping) | Low (transport-only) | High (queue retries duplicate) | High (fan-out native) | Moderate-High |
| Polling | N/A | High (DB-driven) | None | High | High (latency/cost) |
This comparison reveals why Echo requires explicit fail-safes. The transport layer handles delivery, not completion. When you broadcast AI tokens, you must inject application-level guarantees: monotonic sequencing, explicit terminal states, periodic persistence, and client-side reconciliation. These layers transform a fragile pub/sub channel into a resilient generation pipeline.
Core Solution
Building a fault-tolerant AI broadcast pipeline requires three coordinated layers: a deterministic event contract, a stateful persistence model, and a queue job with lifecycle guarantees. Each layer addresses a specific failure mode while maintaining low latency for real-time token delivery.
1. The Broadcast Contract
Every token event must carry enough metadata for the client to reconstruct the stream safely. Relying on implicit ordering or generic completion flags is insufficient. The event payload requires a monotonic sequence counter, an explicit status enum, and a finish reason code.
<?php
namespace App\Events;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
class AiTokenBroadcast implements ShouldBroadcast
{
public function __construct(
public readonly string $sessionId,
public readonly string $chunk,
public readonly int $seq,
public readonly string $state,
public readonly ?string $terminationCode = null,
) {}
public function broadcastOn(): PrivateChannel
{
return new PrivateChannel("ai.{$this->sessionId}");
}
public function broadcastAs(): string
{
return 'chunk.delivered';
}
public function broadcastWith(): array
{
return [
'session_id' => $this->sessionId,
'chunk' => $this->chunk,
'sequence' => $this->seq,
'state' => $this->state,
'termination_code' => $this->terminationCode,
];
}
}
The state field replaces ambiguous "done" signals. It explicitly communicates generating, finished, truncated, errored, or abandoned. The termination_code maps directly to the AI provider's response metadata (e.g., end_turn, max_tokens, stop_sequence). This contract ensures the client never guesses stream completion.
2. State Persistence & Checkpointing
Real-time delivery is ephemeral. Persistence is durable. The database schema must track generation progress, store partial outputs, and record terminal states. Checkpointing at regular intervals prevents total data loss during worker crashes.
<?php
use Illuminate\Database\Migrations\Migration;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Facades\Schema;
return new class extends Migration {
public function up(): void
{
Schema::create('ai_generations', function (Blueprint $table) {
$table->id();
$table->string('session_id')->unique();
$table->foreignId('user_id')->constrained()->cascadeOnDelete();
$table->text('input_prompt');
$table->enum('progress', ['queued', 'active', 'completed', 'truncated', 'failed', 'orphaned'])
->default('queued');
$table->longText('accumulated_text')->nullable();
$table->longText('final_text')->nullable();
$table->unsignedInteger('last_seq')->default(0);
$table->string('termination_code')->nullable();
$table->text('failure_detail')->nullable();
$table->timestamp('initiated_at')->nullable();
$table->timestamp('last_save_at')->nullable();
$table->timestamp('resolved_at')->nullable();
$table->timestamps();
$table->index(['progress', 'last_save_at']);
$table->index(['user_id', 'progress']);
});
}
};
The last_save_at column enables orphan detection. A scheduled task can query records stuck in active status beyond a threshold (e.g., 5 minutes) and transition them to orphaned. The accumulated_text field stores checkpointed content, allowing recovery without re-querying the AI provider.
3. Queue Job with Lifecycle Guarantees
The queue job orchestrates the AI API call, manages sequence counters, broadcasts tokens, and guarantees terminal state publication. It must handle three scenarios: successful completion, provider truncation, and infrastructure failure.
<?php
namespace App\Jobs;
use App\Events\AiTokenBroadcast;
use App\Models\AiGeneration;
use Anthropic\Laravel\Facades\Anthropic;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Throwable;
class ProcessAiCompletion implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $timeout = 150;
public int $tries = 2;
public int $backoff = [15, 45];
public function __construct(
private readonly string $sessionId,
private readonly string $prompt,
private readonly int $userId,
) {}
public function handle(): void
{
$seq = 0;
$buffer = '';
$isFinalized = false;
AiGeneration::where('session_id', $this->sessionId)->update([
'progress' => 'active',
'initiated_at' => now(),
]);
try {
$stream = Anthropic::messages()->stream([
'model' => 'claude-sonnet-4-5',
'max_tokens' => 4096,
'messages' => [
['role' => 'user', 'content' => $this->prompt],
],
]);
foreach ($stream as $event) {
if ($event->type === 'content_block_delta' && isset($event->delta->text)) {
$token = $event->delta->text;
$buffer .= $token;
$seq++;
broadcast(new AiTokenBroadcast(
sessionId: $this->sessionId,
chunk: $token,
seq: $seq,
state: 'generating',
));
if ($seq % 40 === 0) {
$this->persistCheckpoint($buffer, $seq);
}
}
if ($event->type === 'message_stop') {
$code = $event->message?->stop_reason ?? 'unknown';
$finalState = $code === 'end_turn' ? 'completed' : 'truncated';
$isFinalized = true;
$seq++;
broadcast(new AiTokenBroadcast(
sessionId: $this->sessionId,
chunk: '',
seq: $seq,
state: $finalState,
terminationCode: $code,
));
AiGeneration::where('session_id', $this->sessionId)->update([
'progress' => $finalState,
'termination_code' => $code,
'final_text' => $buffer,
'last_seq' => $seq,
'resolved_at' => now(),
]);
}
}
} catch (Throwable $e) {
if (!$isFinalized) {
$seq++;
broadcast(new AiTokenBroadcast(
sessionId: $this->sessionId,
chunk: '',
seq: $seq,
state: 'errored',
terminationCode: 'exception',
));
AiGeneration::where('session_id', $this->sessionId)->update([
'progress' => 'failed',
'failure_detail' => $e->getMessage(),
'last_seq' => $seq,
'accumulated_text' => $buffer,
'resolved_at' => now(),
]);
}
throw $e;
}
}
private function persistCheckpoint(string $buffer, int $seq): void
{
AiGeneration::where('session_id', $this->sessionId)->update([
'accumulated_text' => $buffer,
'last_seq' => $seq,
'last_save_at' => now(),
]);
}
public function failed(Throwable $exception): void
{
AiGeneration::where('session_id', $this->sessionId)->update([
'progress' => 'orphaned',
'failure_detail' => $exception->getMessage(),
'resolved_at' => now(),
]);
}
}
Architecture Rationale:
- Sequence Monotonicity: Prevents client-side race conditions and enables gap detection.
- Explicit Terminal States: Eliminates guesswork. The client reacts to
completed,truncated, orerroredinstead of waiting for a timeout. - Checkpoint Interval: Balances DB write load with recovery granularity. Every 40 tokens captures ~80% of typical failure scenarios without saturating the connection pool.
failed()Hook: Guarantees database cleanup when Laravel's queue system exhausts retries or the worker is killed. Prevents orphaned records.- Retry Limit: Capped at 2 tries to avoid infinite duplication loops. Long-running AI jobs should not retry aggressively; they should fail fast and allow user-initiated regeneration.
Pitfall Guide
1. Assuming Socket Connectivity Equals Stream Health
Explanation: Echo reports connected as long as the WebSocket handshake remains valid. The underlying queue job may have crashed, the AI API may have timed out, or the worker may be stuck in a retry loop.
Fix: Implement client-side heartbeat monitoring. If no chunk.delivered event arrives within a configurable window (e.g., 8 seconds), transition the UI to a recovery state and query the database for the last known sequence.
2. Ignoring Sequence Gaps
Explanation: Network blips or Pusher/Reverb routing delays can cause out-of-order delivery. Rendering tokens without validating sequence numbers produces scrambled output.
Fix: Maintain a lastRenderedSeq variable on the client. Buffer incoming events, sort by sequence, and only render when the next expected sequence arrives. Discard or flag duplicates.
3. Missing Terminal Event Guarantees
Explanation: If the queue job throws before broadcasting the final state, the client waits indefinitely. Generic try/catch blocks that swallow exceptions exacerbate this.
Fix: Always broadcast a terminal event (errored, truncated, completed) before exiting the job. Use the failed() method to update database state even when the job is killed externally.
4. Unbounded Queue Retries
Explanation: Laravel's default retry behavior can cause the same generation to run 3β5 times. Each retry broadcasts tokens 1βN again, duplicating output on the client.
Fix: Set $tries = 2 and implement idempotency checks. Before broadcasting, verify the current sequence against the database. If seq <= last_seq, skip the broadcast.
5. Client-Side State Drift
Explanation: Single-page applications often maintain local generation state in memory. If the user navigates away and returns, or the tab sleeps, the local state desynchronizes from the server.
Fix: On reconnect, fetch the latest generation record via REST/GraphQL. Compare last_seq with the client's buffer. Rehydrate missing chunks from accumulated_text and resume listening from last_seq + 1.
6. Checkpoint Frequency Misalignment
Explanation: Checkpointing every token saturates the database. Checkpointing every 500 tokens risks losing large portions of output during crashes. Fix: Use adaptive checkpointing. Start with a 30β50 token interval. If generation exceeds 2000 tokens, increase the interval to 100. Monitor DB write latency and adjust dynamically.
7. Overlooking WebSocket Heartbeat Limits
Explanation: Pusher and Reverb enforce ping/pong intervals. If the queue job blocks the event loop for too long (e.g., waiting on a slow AI API response), the connection drops silently. Fix: Ensure the AI SDK uses non-blocking I/O or async iteration. If using synchronous streaming, dispatch the job to a dedicated queue with higher memory limits and longer timeout thresholds.
Production Bundle
Action Checklist
- Define explicit broadcast contract with sequence, state, and termination code
- Create generation tracking table with checkpoint and orphan detection indexes
- Implement queue job with sequence validation and periodic DB persistence
- Add
failed()hook to transition stuck records toorphanedstate - Configure client-side sequence buffer and gap detection logic
- Set queue retry limits to 2 with exponential backoff
- Deploy orphan detection scheduler to clean up stalled generations
- Add monitoring alerts for
truncatedanderroredstate spikes
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|---|---|---|
| Single-user, low-latency AI chat | SSE or HTTP streaming | Native reconnect, lower infra cost, simpler client logic | Low |
| Multi-user collaboration, live monitoring | Echo/WebSocket + fail-safes | Fan-out capability, shared state, real-time sync | Moderate |
| High-volume batch generation | Async polling + DB state | Predictable scaling, no persistent connections, easier rate limiting | Low-Moderate |
| Enterprise compliance (audit trails) | Echo + explicit state logging | Full sequence audit, checkpoint recovery, terminal guarantees | High |
Configuration Template
// config/broadcasting.php
'pusher' => [
'driver' => 'pusher',
'key' => env('PUSHER_APP_KEY'),
'secret' => env('PUSHER_APP_SECRET'),
'app_id' => env('PUSHER_APP_ID'),
'options' => [
'cluster' => env('PUSHER_APP_CLUSTER'),
'useTLS' => true,
'curl_options' => [
CURLOPT_TIMEOUT => 120,
CURLOPT_CONNECTTIMEOUT => 10,
],
],
],
// app/Console/Kernel.php
protected function schedule(Schedule $schedule)
{
$schedule->command('ai:clean-orphaned')
->everyFiveMinutes()
->withoutOverlapping();
}
// app/Console/Commands/CleanOrphanedGenerations.php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use App\Models\AiGeneration;
class CleanOrphanedGenerations extends Command
{
protected $signature = 'ai:clean-orphaned';
protected $description = 'Transition stalled active generations to orphaned';
public function handle()
{
$threshold = now()->subMinutes(5);
$count = AiGeneration::where('progress', 'active')
->where('last_save_at', '<', $threshold)
->update(['progress' => 'orphaned', 'resolved_at' => now()]);
$this->info("Marked {$count} orphaned generations.");
}
}
Quick Start Guide
- Generate the migration: Run
php artisan make:migration create_ai_generations_tableand paste the schema from the Core Solution section. Executephp artisan migrate. - Create the broadcast event: Use
php artisan make:event AiTokenBroadcastand implement the contract shown above. EnsureShouldBroadcastis applied and the channel is private. - Dispatch the job: From your controller or service, generate a
session_id(e.g.,Str::uuid()), create the initialAiGenerationrecord, and dispatchProcessAiCompletion::dispatch($sessionId, $prompt, $userId). - Subscribe on the client: Initialize Echo with the user's auth token. Listen to
ai.{sessionId}forchunk.delivered. Implement sequence buffering and terminal state handling as described in the Pitfall Guide. - Verify recovery: Simulate a worker crash during generation. Check that the database transitions to
orphanedorfailed, and that the client can rehydrate fromaccumulated_textandlast_seq.
Mid-Year Sale β Unlock Full Article
Base plan from just $4.99/mo or $49/yr
Sign in to read the full article and unlock all tutorials.
Sign In / Register β Start Free Trial7-day free trial Β· Cancel anytime Β· 30-day money-back
