## [](#what-is-an-atomic-transaction)What Is an Atomic Transaction?
Atomic Transactions with Rollback Semantics in Reactive Signal Systems
Current Situation Analysis
Traditional reactive batching mechanisms (batch / transaction) only coalesce effect reruns to reduce computational overhead. They lack rollback semantics, meaning that if an operation fails mid-execution—especially across await boundaries—intermediate state mutations leak to downstream subscribers. This results in:
- Partial State Commitment: Effects run with inconsistent snapshots, causing UI flicker or invalid application states.
- Manual Rollover Boilerplate: Developers must manually track pre-operation values and restore them on error, which is error-prone and breaks composability.
- Async Boundary Fragility: Standard batching doesn't natively handle Promise resolution/rejection, making it impossible to guarantee atomicity across asynchronous workflows.
- Nested Isolation Failure: Without a dedicated write-log stack, inner transaction failures either corrupt outer state or require complex manual cleanup.
WOW Moment: Key Findings
| Approach | Flush Frequency | Rollback Capability | Async Boundary Safety | Intermediate State Leakage | Implementation Complexity |
|---|---|---|---|---|---|
Regular batch/transaction | 1 on exit | ❌ None | ⚠️ Manual handling required | 🔴 High (leaks on failure) | 🟢 Low |
| Manual Pre-Value Tracking | 1 on exit | ✅ Yes (manual) | ⚠️ Error-prone across await | 🟡 Medium (depends on dev) | 🔴 High |
| Atomic Transaction | 1 on success | ✅ Automatic | 🟢 Native Promise support | 🟢 Zero (strict isolation) | 🟡 Medium |
Key Findings:
- Atomic transactions guarantee single-flush commitment on success and complete state restoration on failure.
- The write-log stack architecture enables safe nested transactions without cross-contamination.
- Lazy recomputation of
computednodes post-rollback eliminates unnecessary synchronous work while maintaining consistency.
Core Solution
The implementation extends the scheduler with a depth-tracked write log, a muted scheduling flag, and explicit commit/rollback pathways. The signal.set() method hooks into the atomic context to record pre-write values only when equality checks pass.
Extending scheduler.ts
import { markStale } from "./computed.js";
import type { Node } from "./graph.js";
export interface Schedulable { run(): void; disposed?: boolean }
// Internal node shape used by signal/computed
export type InternalNode<T = unknown> = { value: T };
// Write log for atomic transactions
type WriteLog = Map<(Node & InternalNode<unknown>), unknown>;
const queue = new Set<Schedulable>();
let scheduled = false;
// > 0 means we are inside batch/transaction mode (delay microtask flushing)
let batchDepth = 0;
// Atomic transaction depth and log stack
let atomicDepth = 0;
const atomicLogs: WriteLog[] = [];
// Mute scheduling during rollback to prevent scheduleJob from creating new work
let muted = 0;
export function scheduleJob(job: Schedulable) {
if (job.disposed) return;
queue.add(job);
if (!scheduled && batchDepth === 0) {
scheduled = true;
queueMicrotask(flushJobs);
}
}
export function batch<T>(fn: () => T): T {
batchDepth++;
try {
return fn();
} finally {
batchDepth--;
if (batchDepth === 0) flushJobs();
}
}
// Promise detection
function isPromiseLike<T = unknown>(v: any): v is PromiseLike<T> {
return v != null && typeof v.then === "function";
}
export function transaction<T>(fn: () => T): T;
export function transaction<T>(fn: () => Promise<T>): Promise<T>;
export function transaction<T>(fn: () => T | Promise<T>): T | Promise<T> {
batchDepth++;
try {
const out = fn();
if (isPromiseLike<T>(out)) {
return Promise.resolve(out).finally(() => {
batchDepth--;
if (batchDepth === 0) flushJobs();
});
}
batchDepth--;
if (batchDepth === 0) flushJobs();
return out as T;
} catch (e) {
batchDepth--;
if (batchDepth === 0) flushJobs();
throw e;
}
}
// Atomic transaction (with rollback)
export function inAtomic() {
return atomicDepth > 0;
}
// Record the "first write in this level"; called by signal.set() when a write is confirmed
export function recordAtomicWrite<T>(node: Node & InternalNode<T>, prevValue: T) {
const log = atomicLogs[atomicLogs.length - 1];
if (!log) return; // safety guard: no active atomic layer
if (!log.has(node)) log.set(node, prevValue);
}
function writeNodeValue<T>(node: Node & InternalNode<T>, v: T) {
if ("value" in node) (node as { value: T }).value = v;
}
function mergeChildIntoParent(child: WriteLog, parent: WriteLog) {
for (const [node, prev] of child) {
if (!parent.has(node)) parent.set(node, prev);
}
}
export function atomic<T>(fn: () => T): T;
export function atomic<T>(fn: () => Promise<T>): P
romise<T>; export function atomic<T>(fn: () => T | Promise<T>): T | Promise<T> { // Enter atomic layer: suppress flushing (shared batchDepth), start write logging batchDepth++; atomicDepth++; atomicLogs.push(new Map<(Node & InternalNode<unknown>), unknown>());
const exitCommit = () => { const log = atomicLogs.pop()!; atomicDepth--; // Inner success -> merge first-seen old values into parent if (atomicDepth > 0) { mergeChildIntoParent(log, atomicLogs[atomicLogs.length - 1]!); } // Only flush when the outermost layer exits batchDepth--; if (batchDepth === 0) flushJobs(); };
const exitRollback = () => { const log = atomicLogs.pop()!; atomicDepth--; // Silent rollback: avoid scheduling while restoring values muted++; try { for (const [node, prev] of log) { writeNodeValue(node, prev); if ((node as Node).kind === "signal") { for (const sub of (node as Node).subs) { if (sub.kind === "computed") markStale(sub); // sub.kind === "effect" does not need scheduling // muted blocks it, and we also do not flush afterward } } } queue.clear(); // clear jobs created during this level scheduled = false; } finally { muted--; } // No flush on failure; just exit batch/atomic depth batchDepth--; };
try { const out = fn(); if (isPromiseLike<T>(out)) { return Promise.resolve(out).then( (v) => { exitCommit(); return v; }, (err) => { exitRollback(); throw err; } ); } // Synchronous success exitCommit(); return out as T; } catch (e) { // Synchronous failure -> rollback exitRollback(); throw e; } }
export function flushSync() { if (!scheduled && queue.size === 0) return; flushJobs(); }
function flushJobs() { scheduled = false; let guard = 0; while (queue.size) { const list = Array.from(queue); queue.clear(); for (const job of list) job.run(); if (++guard > 10000) throw new Error("Infinite update loop"); } }
### Adjustments in `signal.ts`
import { markStale } from "./computed.js"; import { link, track, unlink, type Node } from "./graph.js"; import { SymbolRegistry as Effects } from "./registry.js"; import { inAtomic, recordAtomicWrite, type InternalNode } from "./scheduler.js";
type Comparator<T> = (a: T, b: T) => boolean; const defaultEquals = Object.is;
export function signal<T>(initial: T, equals: Comparator<T> = defaultEquals) { const node: Node & InternalNode<T> & { kind: "signal"; equals: Comparator<T> } = { kind: "signal", deps: new Set(), subs: new Set(), value: initial, equals, };
const get = () => { track(node); return node.value; };
const set = (next: T | ((prev: T) => T)) => { const prev = node.value; const nxtVal = typeof next === "function" ? (next as (p: T) => T)(node.value) : next; if (node.equals(node.value, nxtVal)) return;
// Atomic hook: record the previous value only when the write is confirmed,
// and only the first time this level touches the node
if (inAtomic()) recordAtomicWrite(node, prev);
// Perform the actual write
node.value = nxtVal;
// No downstream subscribers -> exit early to avoid unnecessary work
if (node.subs.size === 0) return;
// Has downstream subscribers -> follow the original propagation logic
for (const sub of node.subs) {
if (sub.kind === "effect") {
Effects.get(sub)?.schedule();
} else if (sub.kind === "computed") {
markStale(sub);
}
}
};
const subscribe = (observer: Node) => { if (observer.kind === "signal") { throw new Error("A signal cannot subscribe to another node"); } link(observer, node); return () => unlink(observer, node); };
return { get, set, subscribe, peek: () => node.value }; }
### Consistency Guarantees
When an atomic transaction fails:
- All affected `computed` nodes are marked as stale.
- On the next read after rollback, they lazily recompute based on the latest signal values.
- The UI never sees the invalid snapshot, and no flush happens during rollback.
### Usage Scenarios
#### Inner transaction fails, outer transaction continues
(only the inner level rolls back)
const a = signal(0); const b = signal(0);
await atomic(async () => { // outer a.set(1); // OK try { await atomic(async () => { // inner b.set(1); throw new Error("boom"); // inner fails -> rollback b to 0 }); } catch {} // At this point: a = 1, b = 0 }); // outer succeeds -> one flush
#### Outer transaction fails
(everything rolls back)
const a = signal(0); const b = signal(0);
try { await atomic(async () => { a.set(1); await Promise.resolve(); b.set(2); throw new Error("oops"); // entire transaction fails -> rollback both a and b }); } catch {}
// a = 0, b = 0 // and there was no flush in this transaction
## Pitfall Guide
1. **Ignoring First-Write Semantics**: Recording every `set()` call instead of only the first write per node overwrites the original pre-transaction value. This breaks rollback accuracy, as the system will restore to an intermediate state rather than the true baseline.
2. **Scheduling During Rollback**: Failing to increment the `muted` counter while restoring values allows `scheduleJob` to queue new work. This triggers infinite loops or executes effects against partially restored state.
3. **Async Boundary Mismanagement**: Not wrapping Promise resolution/rejection in explicit `exitCommit`/`exitRollback` handlers breaks atomicity across `await`. The transaction depth will desynchronize, causing premature flushes or leaked state.
4. **Nested Log Overwriting**: Merging child transaction logs without checking `parent.has(node)` overwrites the outer transaction's recorded baseline. This violates isolation guarantees and causes outer rollbacks to restore incorrect values.
5. **Bypassing Equality Checks**: Calling `recordAtomicWrite` before verifying `Object.is(prev, next)` creates unnecessary log entries. This increases memory overhead and forces redundant stale-marking during rollback.
6. **Forgetting to Clear the Job Queue**: On rollback, pending microtasks from the failed level must be purged (`queue.clear()`). Leaving them scheduled causes effects to run with restored values, producing stale UI updates or inconsistent derived state.
## Deliverables
- **Blueprint**: `scheduler.ts` atomic transaction engine with depth-tracked write logs, muted scheduling, and Promise-aware commit/rollback pathways.
- **Integration Template**: `signal.ts` modification hook demonstrating safe `recordAtomicWrite` placement post-equality check.
- **Rollback Verification Checklist**:
- [ ] Verify `atomicDepth` and `batchDepth` decrement symmetrically in all code paths (sync/async/success/failure).
- [ ] Confirm `muted` flag prevents `scheduleJob` from queuing during `exitRollback`.
- [ ] Validate nested transaction log merging preserves outermost baseline values.
- [ ] Test async failure paths to ensure `queue.clear()` executes before depth decrement.
- [ ] Assert `computed` nodes remain lazy post-rollback and only recompute on explicit read.
- **Configuration Template**: Standardized `atomic()` wrapper for optimistic UI updates, form submissions, and cross-signal state mutations requiring strict consistency guarantees.
