itionalIds(
streamStates: Record<StreamRole, StreamState>,
currentStep: number
): number[] {
const roles: StreamRole[] = ['user', 'reasoning', 'tool', 'system'];
const positions: number[] = [];
for (const role of roles) {
const stream = streamStates[role];
if (!stream.isComplete) {
// Assign position: (step * num_streams) + stream_index
positions.push(currentStep * roles.length + roles.indexOf(role));
}
}
return positions;
}
### Step 2: Construct Cross-Stream Causal Masks
Standard causal masks block future tokens within a single sequence. Multi-stream masks must additionally block same-timestep cross-attention while allowing observation of all streams at `t-1` and earlier. This preserves the directed acyclic graph required for stable training and inference.
```typescript
function generateCrossStreamMask(
streamLengths: Record<StreamRole, number>,
currentStep: number
): number[][] {
const roles: StreamRole[] = ['user', 'reasoning', 'tool', 'system'];
const totalTokens = roles.reduce((sum, r) => sum + streamLengths[r], 0);
const mask: number[][] = Array.from({ length: totalTokens }, () =>
Array(totalTokens).fill(Number.NEGATIVE_INFINITY)
);
let tokenOffset = 0;
for (const targetRole of roles) {
const targetLen = streamLengths[targetRole];
for (let t = 0; t < targetLen; t++) {
const targetIdx = tokenOffset + t;
// Allow self-attention within stream (causal)
for (let s = 0; s <= t; s++) {
mask[targetIdx][tokenOffset + s] = 0;
}
// Allow cross-stream attention strictly before current timestep
let otherOffset = 0;
for (const sourceRole of roles) {
if (sourceRole === targetRole) {
otherOffset += streamLengths[sourceRole];
continue;
}
const sourceLen = streamLengths[sourceRole];
for (let s = 0; s < sourceLen; s++) {
// Only attend if source token was generated at step < currentStep
const sourceStep = Math.floor(s / roles.length);
if (sourceStep < currentStep) {
mask[targetIdx][otherOffset + s] = 0;
}
}
otherOffset += sourceLen;
}
}
tokenOffset += targetLen;
}
return mask;
}
Step 3: Parallel Decoding Loop
The inference engine advances all active streams simultaneously. Each forward pass consumes the interleaved token buffer, applies the cross-stream mask, and samples one token per output stream. The loop terminates when all streams emit an end-of-sequence token or reach a predefined step limit.
class ConcurrentStreamEngine {
private streams: Record<StreamRole, StreamState>;
private model: TransformerModel;
constructor(model: TransformerModel) {
this.model = model;
this.streams = {
user: { role: 'user', tokens: [], isComplete: false },
reasoning: { role: 'reasoning', tokens: [], isComplete: false },
tool: { role: 'tool', tokens: [], isComplete: false },
system: { role: 'system', tokens: [], isComplete: false },
};
}
async run(maxSteps: number = 128): Promise<string> {
for (let step = 0; step < maxSteps; step++) {
const activeRoles = (Object.keys(this.streams) as StreamRole[]).filter(
r => !this.streams[r].isComplete
);
if (activeRoles.length === 0) break;
const packedInput = this.packStreams(activeRoles);
const posIds = buildInterleavedPositionalIds(this.streams, step);
const attnMask = generateCrossStreamMask(
this.getStreamLengths(),
step
);
const logits = await this.model.forward(packedInput, {
positionIds: posIds,
attentionMask: attnMask,
});
const newTokens = this.samplePerStream(logits, activeRoles);
this.advanceStreams(newTokens);
if (this.allStreamsComplete()) break;
}
return this.decodeStream('reasoning');
}
private packStreams(roles: StreamRole[]): number[] {
return roles.flatMap(r => this.streams[r].tokens);
}
private getStreamLengths(): Record<StreamRole, number> {
const lengths: Record<string, number> = {};
for (const r of Object.keys(this.streams) as StreamRole[]) {
lengths[r] = this.streams[r].tokens.length;
}
return lengths as Record<StreamRole, number>;
}
private samplePerStream(
logits: number[][],
roles: StreamRole[]
): Record<StreamRole, number> {
const sampled: Record<string, number> = {};
roles.forEach((role, idx) => {
const probs = this.softmax(logits[idx]);
sampled[role] = this.weightedSample(probs);
});
return sampled as Record<StreamRole, number>;
}
private advanceStreams(tokens: Record<StreamRole, number>): void {
for (const role of Object.keys(tokens) as StreamRole[]) {
const token = tokens[role];
this.streams[role].tokens.push(token);
if (token === EOS_TOKEN) {
this.streams[role].isComplete = true;
}
}
}
private allStreamsComplete(): boolean {
return Object.values(this.streams).every(s => s.isComplete);
}
private decodeStream(role: StreamRole): string {
return this.streams[role].tokens.map(t => this.tokenizer.decode(t)).join('');
}
private softmax(arr: number[]): number[] {
const max = Math.max(...arr);
const exps = arr.map(v => Math.exp(v - max));
const sum = exps.reduce((a, b) => a + b, 0);
return exps.map(v => v / sum);
}
private weightedSample(probs: number[]): number {
let r = Math.random();
for (let i = 0; i < probs.length; i++) {
r -= probs[i];
if (r <= 0) return i;
}
return probs.length - 1;
}
}
Architecture Decisions & Rationale
- Interleaved Positional Encoding: Standard RoPE or absolute positions fail when multiple streams advance at different rates. Interleaving
(step * num_streams) + stream_index preserves relative temporal distance while allowing the attention mechanism to distinguish stream boundaries without explicit segment IDs.
- Strict
< t Cross-Attention: Allowing same-timestep cross-attention creates circular dependencies during training and breaks the causal DAG. The mask enforces that stream A at step t can only observe streams B, C, etc., at steps 0 to t-1. This guarantees stable gradient flow and deterministic inference.
- Independent EOS Handling: Streams rarely finish simultaneously. The decoding loop tracks completion per stream rather than globally. This prevents premature termination and allows long-running reasoning channels to continue while tool channels exit early.
- Memory-Bound Optimization: Since HBM bandwidth dominates latency, packing multiple streams into a single forward pass amortizes weight loading across parallel predictions. The architecture scales efficiently up to 4-6 streams before attention complexity (
O(N^2)) becomes the bottleneck.
Pitfall Guide
1. Causal Mask Leakage
Explanation: Allowing same-timestep cross-attention causes information leakage between streams during training. The model learns to cheat by observing parallel predictions before they're finalized, leading to degenerate outputs at inference.
Fix: Enforce strict < t masking in the attention builder. Validate masks by checking that mask[t, t] = -inf for all cross-stream pairs.
2. Positional Encoding Collision
Explanation: Using standard absolute positions causes streams to overwrite each other's temporal signals. The attention mechanism confuses which stream a token belongs to, breaking cross-stream conditioning.
Fix: Implement interleaved or stream-offset positional encoding. Ensure position IDs are monotonically increasing across the packed buffer and uniquely identify (stream, timestep).
3. KV Cache Fragmentation
Explanation: Standard KV caches assume a single sequential buffer. Multi-stream generation splits the cache across non-contiguous memory regions, causing cache misses and degraded throughput.
Fix: Shard the KV cache per stream or use a unified cache with stream-aware indexing. Pre-allocate cache slots based on expected stream lengths to avoid dynamic reallocation.
4. Training Data Misalignment
Explanation: Fine-tuning on concatenated single-stream data fails to teach the model cross-stream dependencies. The model defaults to sequential behavior despite architectural changes.
Fix: Construct parallel stream datasets during SFT. Format examples as aligned rows where each timestep contains tokens for all active streams. Use data collators that preserve stream boundaries and temporal alignment.
5. Over-Parallelization
Explanation: Activating too many streams simultaneously increases attention complexity quadratically. Beyond 4-5 streams, latency gains diminish and memory pressure spikes.
Fix: Cap active streams at 3-4. Route auxiliary channels (e.g., logging, metrics, secondary tools) through sequential tool calls rather than dedicated streams. Dynamically deactivate idle streams to free compute.
6. Interrupt Handling Race Conditions
Explanation: Injecting user tokens mid-generation without synchronizing stream states causes misalignment between input and output channels.
Fix: Implement a stream synchronization barrier before injecting external tokens. Pause output streams, append to the input stream, rebuild positional IDs, and resume decoding from the next timestep.
7. Security Boundary Violations
Explanation: Unrestricted cross-attention allows prompt injection payloads in the user stream to leak into reasoning or tool streams.
Fix: Apply attention isolation masks for security-critical streams. Restrict the reasoning and tool streams to attend only to sanitized system or verified_user channels. Audit attention weights during inference to detect cross-stream contamination.
Production Bundle
Action Checklist
Decision Matrix
| Scenario | Recommended Approach | Why | Cost Impact |
|---|
| Simple chatbot with linear prompts | Sequential Chat | Overhead of stream management outweighs benefits | Baseline |
| Multi-tool agent with independent APIs | Multi-Stream (3-4 streams) | Concurrent tool execution reduces end-to-end latency by 60-70% | +5% HBM, -40% wall time |
| Long-context retrieval with CoT | Multi-Stream + Chunked Input | Parallel reasoning while ingesting chunks prevents TTFT scaling | +8% memory, -50% TTFT |
| High-security audit pipeline | Multi-Stream + Attention Isolation | Stream separation prevents prompt injection cross-contamination | +3% compute, +90% audit reliability |
| Speculative decoding workloads | Speculative Decoding | Better for single-stream latency reduction; multi-stream excels at concurrency | +15% memory, -30% TTFT |
Configuration Template
interface MultiStreamConfig {
activeStreams: StreamRole[];
maxSteps: number;
attention: {
crossStreamCausal: boolean;
isolationMask: Partial<Record<StreamRole, StreamRole[]>>;
};
decoding: {
temperature: number;
topP: number;
independentEOS: boolean;
};
cache: {
strategy: 'sharded' | 'unified';
preAllocate: boolean;
maxStreamLength: number;
};
}
const productionConfig: MultiStreamConfig = {
activeStreams: ['user', 'reasoning', 'tool'],
maxSteps: 256,
attention: {
crossStreamCausal: true,
isolationMask: {
reasoning: ['user', 'system'],
tool: ['reasoning', 'system']
}
},
decoding: {
temperature: 0.7,
topP: 0.9,
independentEOS: true
},
cache: {
strategy: 'sharded',
preAllocate: true,
maxStreamLength: 512
}
};
Quick Start Guide
- Initialize the engine: Instantiate
ConcurrentStreamEngine with a multi-stream fine-tuned checkpoint. Load the configuration template and verify stream roles match your agent's workflow.
- Prepare parallel input: Format your prompt as aligned stream rows. Ensure the
user stream contains the initial query while output streams are initialized as empty arrays.
- Run the decoding loop: Call
engine.run(maxSteps). Monitor stream completion flags and attach callbacks to the tool stream for real-time execution dispatch.
- Validate attention masks: Before production deployment, run a dry pass with synthetic sequences. Assert that cross-stream attention values are zero for same-timestep pairs and non-zero for
< t pairs.
- Deploy with cache sharding: Enable sharded KV cache allocation in your inference server. Pre-allocate memory based on
maxStreamLength to prevent dynamic reallocation overhead during peak load.