Skip to main content

Event sourcing

Event sourcing records every state change as an immutable domain event rather than overwriting the current state. On restart, the actor replays its event history to reconstruct the exact state it had when it stopped.

The design

EventSourcedBehavior wires together four components: a command handler that returns Effect values, an event handler that folds events onto state, an EventStore, and optional snapshot and retention policies.

src/Aggregates/OrderActor.php
<?php

declare(strict_types=1);

use Monadial\Nexus\Core\Actor\ActorContext;
use Monadial\Nexus\Persistence\EventSourced\Effect;
use Monadial\Nexus\Persistence\EventSourced\EventSourcedBehavior;
use Monadial\Nexus\Persistence\EventSourced\SnapshotStrategy;
use Monadial\Nexus\Persistence\EventSourced\RetentionPolicy;
use Monadial\Nexus\Persistence\PersistenceId;

$behavior = EventSourcedBehavior::create(
PersistenceId::of('Order', $orderId),
new OrderState(),
static fn (ActorContext $ctx, object $cmd, OrderState $state): Effect => match (true) {
$cmd instanceof PlaceOrder => Effect::persist(new OrderPlaced($cmd->items)),
$cmd instanceof CancelOrder => Effect::persist(new OrderCancelled()),
$cmd instanceof GetOrder => Effect::none()->thenReply($cmd->replyTo, fn($s) => $s),
default => Effect::unhandled(),
},
static fn (OrderState $state, object $event): OrderState => match (true) {
$event instanceof OrderPlaced => $state->withItems($event->items),
$event instanceof OrderCancelled => $state->cancel(),
default => $state,
},
)
->withEventStore($eventStore)
->withSnapshotStore($snapshotStore)
->withSnapshotStrategy(SnapshotStrategy::everyN(50))
->withRetention(RetentionPolicy::snapshotAndEvents(3, deleteEventsTo: true))
->toBehavior();

Command handler and Effect

The command handler receives the current ActorContext, the incoming command, and the current projected state. It returns an Effect:

  • Effect::persist(...$events) — write one or more events to the store, then apply them via the event handler.
  • Effect::none() — acknowledge the command without persisting (useful for read queries).
  • Effect::reply($to, $message) — send a reply as the sole effect.
  • Effect::stash() — buffer the command for later; replay after $ctx->unstashAll().
  • Effect::stop() — stop the actor after the effect completes.
  • Effect::unhandled() — route to dead letters.

Chain side-effects after persistence using ->thenRun(fn($state) => ...) and ->thenReply($to, fn($state) => $msg). These closures execute only after events are durably written.

Event handler

The event handler is a pure function: (State, Event): State. It receives the event and the current state, and returns the new state. It must be free of side-effects — the engine calls it both during live command processing and during recovery replay.

PersistenceId

Every persistent actor needs a stable identity:

src/Aggregates/OrderActor.php
use Monadial\Nexus\Persistence\PersistenceId;

$id = PersistenceId::of('Order', 'order-42');
// serialises to "Order|order-42"

The entityType and entityId together form the event stream key. Choose IDs that are stable across restarts — typically a domain entity's UUID.

Recovery sequence

On actor startup, the PersistenceEngine loads the latest snapshot, replays events written after that snapshot, and delivers RecoveryCompleted before accepting user commands.

Figure 1: Recovery sequence — snapshot load → event replay → RecoveryCompleted → accepting commands.

Command-path flowchart

Once recovered, each incoming command travels through the command handler to the event store before side-effects fire.

Figure 2: Command-path flow from incoming command through the event store to side-effects.

Writer-conflict sequence

Each ActorSystem stamps a unique ULID writerId on every EventEnvelope. If two systems write to the same stream, the ReplayFilter detects the interleave during recovery.

Figure 3: Two actor systems write to the same stream — ReplayFilter detects the writer conflict on recovery.

Replay decision flow

Recovery chooses between full replay and snapshot-accelerated replay based on what is stored.

Figure 4: Snapshot-and-events vs replay-from-zero decision at actor startup.

EventStore

The EventStore interface exposes four operations:

  • persist(PersistenceId, EventEnvelope...) — append atomically; throws ConcurrentModificationException on sequence conflict.
  • load(PersistenceId, from, to) — load the event range for replay.
  • deleteUpTo(PersistenceId, seqNr) — called by retention policy after a snapshot is confirmed.
  • highestSequenceNr(PersistenceId) — used to detect conflicts and set the next sequence number.

See stores for the available implementations.

SnapshotStore

The SnapshotStore interface saves and loads SnapshotEnvelope objects:

  • save(PersistenceId, SnapshotEnvelope) — persist a state snapshot with its sequence number.
  • load(PersistenceId) — return the latest snapshot, or null if none exists.
  • delete(PersistenceId, maxSequenceNr) — prune snapshots older than the retention limit.

See snapshots for snapshot strategies and retention policies.

Failure modes

Persistence failures surface as exceptions during actor startup or command processing.

SymptomCauseRecovery
Actor never starts, RecoveryException loggedEventStore unreachable or corrupted during replayVerify store connectivity; restart the actor system after the store is healthy
WriterConflictException on startupTwo ActorSystem instances wrote to the same event streamRun a single writer per stream; use ReplayFilter::repairByDiscardOld() for migration scenarios
ConcurrentModificationException on persistDuplicate sequence number — optimistic lock violationA single actor is the only writer; this indicates an implementation bug; inspect the store
Command silently discardedEffect::unhandled() returned; no match in command handlerAdd the missing match branch or return Effect::none() for intentional no-ops
Stash never drained$ctx->unstashAll() not called after recoveryCall $ctx->unstashAll() in the command handler branch that completes the stash condition

See also