Skip to content

Read Events

The readEvents function reads events from EventSourcingDB for a given subject. It returns an async generator that yields raw EventSourcingDB events, which can be converted to Nimbus events using the event mapping utilities.

For full details on reading events, including reading from multiple subjects, reading in reverse order, reading specific ranges, and reading from the last event of a given type, see the Reading Events section in the EventSourcingDB documentation.

Example Application

The examples on this page reference the eventsourcing-demo application.

You can find the full example on GitHub: eventsourcing-demo

Basic Usage

typescript
import {
    eventSourcingDBEventToNimbusEvent,
    readEvents,
} from "@nimbus/eventsourcingdb";

for await (const eventSourcingDBEvent of readEvents("/users/123", {
    recursive: false,
})) {
    const event = eventSourcingDBEventToNimbusEvent(eventSourcingDBEvent);
    console.log(event);
}

Function Parameters

ParameterTypeDescription
subjectstringThe subject to read events for
optionsReadEventsOptionsOptions to control which events are read
signalAbortSignalOptional abort signal to cancel the read

Rebuilding Aggregate State

A common pattern in event sourcing is to rebuild an aggregate's state by replaying all of its events. This is used in command handlers to load the current state before making decisions:

typescript
import {
    eventSourcingDBEventToNimbusEvent,
    readEvents,
    writeEvents,
} from "@nimbus/eventsourcingdb";
import { isSubjectOnEventId } from "eventsourcingdb";

const acceptUserInvitationCommandHandler = async (command) => {
    let state: UserState = { id: command.data.id };

    // Replay all events to rebuild the current state
    for await (
        const eventSourcingDBEvent of readEvents(
            `/users/${command.data.id}`,
            { recursive: false },
        )
    ) {
        const event = eventSourcingDBEventToNimbusEvent(eventSourcingDBEvent);
        state = applyEventToUserState(state, event);
    }

    // Use the rebuilt state to make decisions
    const events = acceptUserInvitation(state, command);

    // Write new events with optimistic concurrency
    await writeEvents(events, [
        isSubjectOnEventId(
            events[0].subject,
            command.data.expectedRevision,
        ),
    ]);
};

Cancellation

Use an AbortSignal to cancel an in-progress read:

typescript
import { readEvents } from "@nimbus/eventsourcingdb";

const controller = new AbortController();

// Cancel after 5 seconds
setTimeout(() => controller.abort(), 5000);

for await (const event of readEvents(
    "/users",
    { recursive: true },
    controller.signal,
)) {
    console.log(event);
}

OpenTelemetry Tracing

Every call to readEvents is automatically wrapped in an OpenTelemetry span named eventsourcingdb.readEvents. The following metrics are recorded:

MetricTypeLabelsDescription
eventsourcingdb_operation_totalCounteroperation, statusTotal number of read operations
eventsourcingdb_operation_duration_secondsHistogramoperationDuration of read operations in seconds