Skip to content

Durable Checkpoints

Agent workflows persist as parent-chained snapshots. Pause without holding a live thread; resume via REST, programmatic replay, or fork to explore alternative branches.

<dependency>
<groupId>org.atmosphere</groupId>
<artifactId>atmosphere-checkpoint</artifactId>
<version>LATEST</version> <!-- check Maven Central for latest -->
</dependency>
CheckpointStore store = new InMemoryCheckpointStore();
store.start();
// Save a root snapshot
var root = WorkflowSnapshot.root("coord-42", myWorkflowState);
store.save(root);
// Derive a child snapshot on each workflow step
var next = root.deriveWith(updatedState);
store.save(next);
// Resume: load the latest snapshot for a coordination
var latest = store.list(CheckpointQuery.forCoordination("coord-42"))
.stream()
.reduce((a, b) -> b)
.orElseThrow();
// Fork to explore an alternative branch
var branch = store.fork(latest.id(), alternativeState);

Application code owns the workflow state type S and its serialization. The in-memory store keeps state as-is; persistent stores accept a caller-supplied serializer.

TypePurpose
CheckpointStoreSPI: save, load, fork, list, delete, deleteCoordination
WorkflowSnapshot<S>Immutable record of workflow state + parent link + metadata
CheckpointIdOpaque typed identifier (UUID-backed by default)
CheckpointQueryFilter for list() — by coordinationId, agentName, time range, limit
CheckpointEventSealed Saved/Loaded/Forked/Deleted lifecycle events
CheckpointListenerCallback for lifecycle events
InMemoryCheckpointStoreDefault in-memory implementation, thread-safe, with eviction

If the atmosphere-coordinator module is on the classpath, decorate its journal to persist a snapshot on every (or selected) coordination event:

var store = new InMemoryCheckpointStore();
var journal = new CheckpointingCoordinationJournal<>(
new InMemoryCoordinationJournal(),
store,
CheckpointingCoordinationJournal.onAgentBoundaries(),
CoordinationStateExtractors.event());
journal.start();

Snapshots form a chain per coordination: the first snapshot is a root, each subsequent one references its predecessor. Supply a custom CoordinationStateExtractor<S> to capture domain state rather than the raw event.

store.addListener(event -> {
if (event instanceof CheckpointEvent.Saved saved) {
metrics.counter("checkpoints.saved",
"coordination", saved.coordinationId()).increment();
}
});

Listeners must be thread-safe. Exceptions thrown from a listener are logged and swallowed; they never abort the store operation.

Atmosphere has two durability primitives with distinct scopes:

AxisSessionStoreCheckpointStore
LayerTransport / connectionCoordinator / workflow
Unit persistedDurableSession (rooms, broadcasters, metadata)WorkflowSnapshot<S> (state, parent, agent)
MutationOverwrite-on-updateAppend-only + fork
Triggered byConnect/disconnectCoordinationEvent

They compose: carry a CheckpointId in DurableSession.metadata so a streaming client reconnecting through durable-sessions can rehydrate both its transport state and its workflow position in one flow. That is the streaming-native resumable workflow pattern.

See the atmosphere-checkpoint module README for the complete design rationale and composition pattern.