Back to KB
Difficulty
Intermediate
Read Time
7 min

## [](#what-is-an-atomic-transaction)What Is an Atomic Transaction?

By Codcompass Team··7 min read

Atomic Transactions with Rollback Semantics in Reactive Signal Systems

Current Situation Analysis

Traditional reactive batching mechanisms (batch / transaction) only coalesce effect reruns to reduce computational overhead. They lack rollback semantics, meaning that if an operation fails mid-execution—especially across await boundaries—intermediate state mutations leak to downstream subscribers. This results in:

  • Partial State Commitment: Effects run with inconsistent snapshots, causing UI flicker or invalid application states.
  • Manual Rollover Boilerplate: Developers must manually track pre-operation values and restore them on error, which is error-prone and breaks composability.
  • Async Boundary Fragility: Standard batching doesn't natively handle Promise resolution/rejection, making it impossible to guarantee atomicity across asynchronous workflows.
  • Nested Isolation Failure: Without a dedicated write-log stack, inner transaction failures either corrupt outer state or require complex manual cleanup.

WOW Moment: Key Findings

ApproachFlush FrequencyRollback CapabilityAsync Boundary SafetyIntermediate State LeakageImplementation Complexity
Regular batch/transaction1 on exit❌ None⚠️ Manual handling required🔴 High (leaks on failure)🟢 Low
Manual Pre-Value Tracking1 on exit✅ Yes (manual)⚠️ Error-prone across await🟡 Medium (depends on dev)🔴 High
Atomic Transaction1 on success✅ Automatic🟢 Native Promise support🟢 Zero (strict isolation)🟡 Medium

Key Findings:

  • Atomic transactions guarantee single-flush commitment on success and complete state restoration on failure.
  • The write-log stack architecture enables safe nested transactions without cross-contamination.
  • Lazy recomputation of computed nodes post-rollback eliminates unnecessary synchronous work while maintaining consistency.

Core Solution

The implementation extends the scheduler with a depth-tracked write log, a muted scheduling flag, and explicit commit/rollback pathways. The signal.set() method hooks into the atomic context to record pre-write values only when equality checks pass.

Extending scheduler.ts

import { markStale } from "./computed.js";
import type { Node } from "./graph.js";

export interface Schedulable { run(): void; disposed?: boolean }

// Internal node shape used by signal/computed
export type InternalNode<T = unknown> = { value: T };

// Write log for atomic transactions
type WriteLog = Map<(Node & InternalNode<unknown>), unknown>;

const queue = new Set<Schedulable>();
let scheduled = false;

// > 0 means we are inside batch/transaction mode (delay microtask flushing)
let batchDepth = 0;

// Atomic transaction depth and log stack
let atomicDepth = 0;
const atomicLogs: WriteLog[] = [];

// Mute scheduling during rollback to prevent scheduleJob from creating new work
let muted = 0;

export function scheduleJob(job: Schedulable) {
  if (job.disposed) return;
  queue.add(job);
  if (!scheduled && batchDepth === 0) {
    scheduled = true;
    queueMicrotask(flushJobs);
  }
}

export function batch<T>(fn: () => T): T {
  batchDepth++;
  try {
    return fn();
  } finally {
    batchDepth--;
    if (batchDepth === 0) flushJobs();
  }
}

// Promise detection
function isPromiseLike<T = unknown>(v: any): v is PromiseLike<T> {
  return v != null && typeof v.then === "function";
}

export function transaction<T>(fn: () => T): T;
export function transaction<T>(fn: () => Promise<T>): Promise<T>;
export function transaction<T>(fn: () => T | Promise<T>): T | Promise<T> {
  batchDepth++;
  try {
    const out = fn();
    if (isPromiseLike<T>(out)) {
      return Promise.resolve(out).finally(() => {
        batchDepth--;
        if (batchDepth === 0) flushJobs();
      });
    }
    batchDepth--;
    if (batchDepth === 0) flushJobs();
    return out as T;
  } catch (e) {
    batchDepth--;
    if (batchDepth === 0) flushJobs();
    throw e;
  }
}

// Atomic transaction (with rollback)
export function inAtomic() {
  return atomicDepth > 0;
}

// Record the "first write in this level"; called by signal.set() when a write is confirmed
export function recordAtomicWrite<T>(node: Node & InternalNode<T>, prevValue: T) {
  const log = atomicLogs[atomicLogs.length - 1];
  if (!log) return; // safety guard: no active atomic layer
  if (!log.has(node)) log.set(node, prevValue);
}

function writeNodeValue<T>(node: Node & InternalNode<T>, v: T) {
  if ("value" in node) (node as { value: T }).value = v;
}

function mergeChildIntoParent(child: WriteLog, parent: WriteLog) {
  for (const [node, prev] of child) {
    if (!parent.has(node)) parent.set(node, prev);
  }
}

export function atomic<T>(fn: () => T): T;
export function atomic<T>(fn: () => Promise<T>): P

romise<T>; export function atomic<T>(fn: () => T | Promise<T>): T | Promise<T> { // Enter atomic layer: suppress flushing (shared batchDepth), start write logging batchDepth++; atomicDepth++; atomicLogs.push(new Map<(Node & InternalNode<unknown>), unknown>());

const exitCommit = () => { const log = atomicLogs.pop()!; atomicDepth--; // Inner success -> merge first-seen old values into parent if (atomicDepth > 0) { mergeChildIntoParent(log, atomicLogs[atomicLogs.length - 1]!); } // Only flush when the outermost layer exits batchDepth--; if (batchDepth === 0) flushJobs(); };

const exitRollback = () => { const log = atomicLogs.pop()!; atomicDepth--; // Silent rollback: avoid scheduling while restoring values muted++; try { for (const [node, prev] of log) { writeNodeValue(node, prev); if ((node as Node).kind === "signal") { for (const sub of (node as Node).subs) { if (sub.kind === "computed") markStale(sub); // sub.kind === "effect" does not need scheduling // muted blocks it, and we also do not flush afterward } } } queue.clear(); // clear jobs created during this level scheduled = false; } finally { muted--; } // No flush on failure; just exit batch/atomic depth batchDepth--; };

try { const out = fn(); if (isPromiseLike<T>(out)) { return Promise.resolve(out).then( (v) => { exitCommit(); return v; }, (err) => { exitRollback(); throw err; } ); } // Synchronous success exitCommit(); return out as T; } catch (e) { // Synchronous failure -> rollback exitRollback(); throw e; } }

export function flushSync() { if (!scheduled && queue.size === 0) return; flushJobs(); }

function flushJobs() { scheduled = false; let guard = 0; while (queue.size) { const list = Array.from(queue); queue.clear(); for (const job of list) job.run(); if (++guard > 10000) throw new Error("Infinite update loop"); } }


### Adjustments in `signal.ts`

import { markStale } from "./computed.js"; import { link, track, unlink, type Node } from "./graph.js"; import { SymbolRegistry as Effects } from "./registry.js"; import { inAtomic, recordAtomicWrite, type InternalNode } from "./scheduler.js";

type Comparator<T> = (a: T, b: T) => boolean; const defaultEquals = Object.is;

export function signal<T>(initial: T, equals: Comparator<T> = defaultEquals) { const node: Node & InternalNode<T> & { kind: "signal"; equals: Comparator<T> } = { kind: "signal", deps: new Set(), subs: new Set(), value: initial, equals, };

const get = () => { track(node); return node.value; };

const set = (next: T | ((prev: T) => T)) => { const prev = node.value; const nxtVal = typeof next === "function" ? (next as (p: T) => T)(node.value) : next; if (node.equals(node.value, nxtVal)) return;

// Atomic hook: record the previous value only when the write is confirmed,
// and only the first time this level touches the node
if (inAtomic()) recordAtomicWrite(node, prev);

// Perform the actual write
node.value = nxtVal;

// No downstream subscribers -> exit early to avoid unnecessary work
if (node.subs.size === 0) return;

// Has downstream subscribers -> follow the original propagation logic
for (const sub of node.subs) {
  if (sub.kind === "effect") {
    Effects.get(sub)?.schedule();
  } else if (sub.kind === "computed") {
    markStale(sub);
  }
}

};

const subscribe = (observer: Node) => { if (observer.kind === "signal") { throw new Error("A signal cannot subscribe to another node"); } link(observer, node); return () => unlink(observer, node); };

return { get, set, subscribe, peek: () => node.value }; }


### Consistency Guarantees
When an atomic transaction fails:
- All affected `computed` nodes are marked as stale.
- On the next read after rollback, they lazily recompute based on the latest signal values.
- The UI never sees the invalid snapshot, and no flush happens during rollback.

### Usage Scenarios

#### Inner transaction fails, outer transaction continues
(only the inner level rolls back)  

const a = signal(0); const b = signal(0);

await atomic(async () => { // outer a.set(1); // OK try { await atomic(async () => { // inner b.set(1); throw new Error("boom"); // inner fails -> rollback b to 0 }); } catch {} // At this point: a = 1, b = 0 }); // outer succeeds -> one flush


#### Outer transaction fails
(everything rolls back)  

const a = signal(0); const b = signal(0);

try { await atomic(async () => { a.set(1); await Promise.resolve(); b.set(2); throw new Error("oops"); // entire transaction fails -> rollback both a and b }); } catch {}

// a = 0, b = 0 // and there was no flush in this transaction


## Pitfall Guide
1. **Ignoring First-Write Semantics**: Recording every `set()` call instead of only the first write per node overwrites the original pre-transaction value. This breaks rollback accuracy, as the system will restore to an intermediate state rather than the true baseline.
2. **Scheduling During Rollback**: Failing to increment the `muted` counter while restoring values allows `scheduleJob` to queue new work. This triggers infinite loops or executes effects against partially restored state.
3. **Async Boundary Mismanagement**: Not wrapping Promise resolution/rejection in explicit `exitCommit`/`exitRollback` handlers breaks atomicity across `await`. The transaction depth will desynchronize, causing premature flushes or leaked state.
4. **Nested Log Overwriting**: Merging child transaction logs without checking `parent.has(node)` overwrites the outer transaction's recorded baseline. This violates isolation guarantees and causes outer rollbacks to restore incorrect values.
5. **Bypassing Equality Checks**: Calling `recordAtomicWrite` before verifying `Object.is(prev, next)` creates unnecessary log entries. This increases memory overhead and forces redundant stale-marking during rollback.
6. **Forgetting to Clear the Job Queue**: On rollback, pending microtasks from the failed level must be purged (`queue.clear()`). Leaving them scheduled causes effects to run with restored values, producing stale UI updates or inconsistent derived state.

## Deliverables
- **Blueprint**: `scheduler.ts` atomic transaction engine with depth-tracked write logs, muted scheduling, and Promise-aware commit/rollback pathways.
- **Integration Template**: `signal.ts` modification hook demonstrating safe `recordAtomicWrite` placement post-equality check.
- **Rollback Verification Checklist**:
  - [ ] Verify `atomicDepth` and `batchDepth` decrement symmetrically in all code paths (sync/async/success/failure).
  - [ ] Confirm `muted` flag prevents `scheduleJob` from queuing during `exitRollback`.
  - [ ] Validate nested transaction log merging preserves outermost baseline values.
  - [ ] Test async failure paths to ensure `queue.clear()` executes before depth decrement.
  - [ ] Assert `computed` nodes remain lazy post-rollback and only recompute on explicit read.
- **Configuration Template**: Standardized `atomic()` wrapper for optimistic UI updates, form submissions, and cross-signal state mutations requiring strict consistency guarantees.