Evgeniy Khyst's Tech Blog
Articles on various software engineering topics
Follow @evgeniy-khist
· 13 min read

Event Sourcing with EventStoreDB

Introduction

EventStoreDB is the database for Event Sourcing.

This repository provides a sample of event sourced system that uses EventStoreDB as event store.

EventStoreDB Logo

See also

Example Domain

This sample uses heavily simplified ride hailing domain model inspired by tech/uklon experience.

  • A rider can place an order for a ride along a route specifying a price.
  • A driver can accept and complete an order.
  • An order can be cancelled before completion.

Domain use case diagram

Domain state diagram

Event Sourcing and CQRS 101

State-Oriented Persistence

State-oriented persistence

Event Sourcing

Event sourcing persists the state of an entity as a sequence of immutable state-changing events.

Event sourcing

Whenever the state of an entity changes, a new event is appended to the list of events.

Event sourcing

Current state of an entity can be restored by replaying all its events.

Event sourcing is best suited for short-living entities with relatively small total number of event (like orders).

Restoring the state of the short-living entity by replaying all its events doesn't have any performance impact. Thus, no optimizations for restoring state are required for short-living entities.

For endlessly stored entities (like users or bank accounts) with thousands of events restoring state by replaying all events is not optimal and snapshotting should be considered.

Snapshotting is an optimization technique where a snapshot of the aggregate's state is also saved, so an application can restore the current state of an aggregate from the snapshot instead of from scratch.

Snapshotting in event souring

An entity in event sourcing is also referenced as an aggregate.

A sequence of events for the same aggregate are also referenced as a stream.

CQRS

CQRS (Command-query responsibility segregation) stands for segregating the responsibility between commands (write requests) and queries (read requests). The write requests and the read requests are processed by different handlers.

A command generates zero or more events or results in an error.

CQRS

CQRS is a self-sufficient architectural pattern and doesn't require event sourcing.

Event sourcing is usually used in conjunction with CQRS. Event store is used as a write database and SQL or NoSQL database as a read database.

CQRS

Events in event sourcing are a part of a bounded context and should not be used "as-is" for integration with other bounded contexts. Integration events representing the current state of an aggregate should be used for communication between bounded contexts instead of a raw event sourcing change events.

Advantages of CQRS

  • Independent scaling of the read and write databases.
  • Optimized data schema for the read database (e.g. the read databases can be denormalized).
  • Simpler queries (e.g. complex JOIN operations can be avoided).

Advantages of Event Sourcing

  • Having a true history of the system (audit and traceability).
  • Ability to put the system in any prior state (e.g. for debugging).
  • Read-side projections can be created as needed (later) from events. It allows responding to future needs and new requirements.

Requirements for Event Store

  • Permanent storage. Store events forever.
  • Optimistic concurrency control. Prevent lost update anomaly (write-write conflicts).
  • Loading current state. Loading all previous events for the particular aggregate ID from an event store.
  • Subscribe to all events by aggregate type. Instead of subscribing to a single event stream that represents an aggregate.
  • Checkpoints. Store the event offset (a position in the stream) after handling it. Subscribe from the last known position instead of the stream start after the application restart.

Solution Architecture

EventStoreDB natively supports appending events, concurrency control, reading events, persistent subscriptions on events.

The important part of the solution is EventStoreDB persistent subscriptions.

Persistent subscriptions aim to deliver events in real-time to connected subscribers and are maintained by the server. Persistent subscriptions keep the last known position from where the subscription starts getting events on the server.

Persistent subscription can be load-balanced and process events in parallel. In order for the server to load-balance subscribers, it uses the concept of consumer groups.

There is Pinned consumer strategy designed to be used with an indexing projection such as the system $by_category projection.

Event stream id is hashed to one of 1024 buckets assigned to individual clients. When a client disconnects it's buckets are assigned to other clients. When a client connects, it is assigned some of the existing buckets. This naively attempts to maintain a balanced workload.

The main aim of Pinned consumer strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing. This is not a guarantee, and you should handle the usual ordering and concurrency issues.

EvenStoreDB persistent subscriptions

All parts together look like this

EvenStoreDB event store

Permanent Storage

EventStoreDB stores all data permanently be default.

Optimistic concurrency control

When appending events to a stream you can supply a stream revision to have optimisic concurrency control.

Loading current state

Events can be easily read from a stream.

Subscribe to all events by aggregate type

EventStoreDB has a built-in system projection $by_category linking existing events from streams to a new stream with a $ce- prefix (a category) by splitting a stream id by a separator (- be default).

For example events from the streams with IDs order-123, order-124 and order-125 will be linked to the stream $ce-order.

You can subscribe to a $ce-order stream and receive notifications for changes in all order streams.

Checkpoints

Persistent subscriptions keep the last known position from where the subscription starts getting events on the server.

Snapshotting

Snapshotting is important for long-living or endlessly stored aggregates as we do not want to replay all the events (potentially thousands) to restore an aggregate state every time.

EventStoreDB doesn't provide snapshotting functionality out of the box, but you can easily implement it by yourself.

On every nth event append an aggregate state (snapshot) to a separate stream specifying the revision.

To restore an aggregate state:

  1. first read the latest value of the snapshot stream (backwards),
  2. then read forward from the original stream from the revision the snapshot points to.

For example:

  1. Events are appended to the stream order-123
  2. Every nth event a snapshot is appended to the stream order_snapshot-123 (use _ instead of - in the suffix _snapshot, - is used by the system projection $by_category)
  3. Read backwards a single result form order_snapshot-123 stream
    ReadStreamOptions options = ReadStreamOptions.get()
        .backwards()
        .fromEnd();
    
    ReadResult result = client.readStream("order_snapshot-123", 1 /*maxCount*/, options)
        .get();
    
    List<ResolvedEvent> events = result.getEvents();
    
  4. Read forwards from nth revision form order-123 stream
    ReadStreamOptions options = ReadStreamOptions.get()
        .forwards()
        .fromRevision(n);
    
    ReadResult result = client.readStream("order-123", options)
        .get();
    
    List<ResolvedEvent> events = result.getEvents();
    

Clustering

EventStoreDB allows you to run more than one node as a cluster to achieve high availability.

EventStoreDB clusters follow a "shared nothing" philosophy, meaning that clustering requires no shared disks for clustering to work. Instead, several database nodes store your data to ensure it isn't lost in case of a drive failure or a node crashing.

EventStoreDB uses a quorum-based replication model, in which a majority of nodes in the cluster must acknowledge that they committed a write to disk before acknowledging the write to the client. This means that to be able to tolerate the failure of n nodes, the cluster must be of size (2n + 1). A cluster of tree nodes can continue to accept writes if one node is unavailable. A cluster of five nodes can continue to accept writes if two nodes are unavailable, and so forth.

The cluster cannot be dynamically scaled. If you need to change the number of cluster nodes, the cluster size setting must be changed on all nodes before the new node can join.

Sharding

A shard is a horizontal partition of data in a database. Each shard is held on a separate database server instance, to spread load.

The distribution of records/documents among the cluster's shards is determined by the shard key.

EventStoreDB doesn't support sharding out of the box, but you can easily implement it by yourself.

Finding all records/documents matching query predicates is hard when a table/collection is partitioned and stored in different shards. If a query doesn't have a predicate for a shard key then all shards have to be queried for partial results and then combined.

EventStoreDB support only very simple operations: append event to a stream and read events from a stream. In other words, save and find events by a stream name.

The two common options for sharding are partition by location (e.g. city, country, region) or by date (e.g. year). For example:

  • Or store streams from the USA in the dedicated US EventStoreDB instance and streams from the EU in the dedicated EU EventStoreDB instance.
  • Store the recent streams (created in the current year) in the operational EventStoreDB instance and the older streams in the "archive" EventStoreDB instance.

Sharding with EventStoreDB can be implemented as follows:

  1. For each shard set up a separate EventStoreDB instance.
  2. For each EventStoreDB instance instantiate a client.
  3. Include a shard key into the stream name to determine what EventStoreDB client to use to reach to corresponding shard, e.g. order_2020-123, order_2021-124, or order_UA-123, order_GB-124.
public class OrderEventStore {

  private final EventStoreDBClient clientUSA;
  private final EventStoreDBClient clientEU;

  // ...

  public void append(Event event) {
    // EventData eventData = ...
    // AppendToStreamOptions options = ...
    String country = event.getCountry();
    getClient(country)
        .appendToStream(toStreamName(country, event.getAggregateId()), options, eventData)
        .get();
  }

  public List<Event> readEvents(UUID aggregateId, String country) {
    ReadResult result = getClient(country).readStream(toStreamName(country, aggregateId)).get();
    List<Event> events = new ArrayList<>();
    for (ResolvedEvent resolvedEvent : result.getEvents()) {
      // Event event = ... 
      events.add(event);
    }
    return events;
  }

  public void subscribe(String country, Consumer<Event> consumer) {
    SubscriptionListener listener = new SubscriptionListener() {
      @Override
      public void onEvent(Subscription subscription, ResolvedEvent resolvedEvent) {
        // Event event = ...
        consumer.accept(event);
      }
    };
    // The streams $ce-order_COUNTRY are created by the system projection $by_category
    client.subscribeToStream("$ce-order_" + country, listener);
  }

  private String toStreamName(String country, UUID aggregateId) {
    return "order_" + country + "-" + aggregateId;
  }

  private EventStoreDBClient getClient(String country) {
    return "USA".equals(country) ? clientUSA : clientEU;
  }
}

Drawbacks

Pinned consumer strategy decreases the likelihood of concurrency and ordering issues while maintaining load balancing. This is not a guarantee, and you should handle the usual ordering and concurrency issues.

Integration events are delivered with at-least-once delivery guarantee. The exactly-once delivery guarantee is hard to achieve due to a dual-write. A dual-write describes a situation when you need to atomically update the database and publish messages and two-phase commit (2PC) is not an option.

Consumers of integration events should be idempotent and filter duplicates and unordered events.

For assured ordering use catch-up subscriptions (simple subscriptions) instead of persistent subscriptions. Catch-up subscriptions serve the purpose of receiving events from a stream for a single subscriber. Subscribers for catch-up subscriptions get events in order and, therefore, are able to process events sequentially. There is nothing on the server that gets stored for such a subscriber. You have to keep the last known position for the catch-up subscription yourself.

A catch-up subscription listener must be extracted into a separate microservice and run in a single replica (.spec.replicas=1 in Kubernetes). This microservice must not be updated using RollingUpdate Deployment strategy. Recreate Deployment strategy must be used instead (.spec.strategy.type=Recreate in Kubernetes) when all existing Pods are killed before new ones are created.

Why EventStoreDB?

EventStoreDB was specifically designed as an event store for event sourcing. It supports only mandatory operations on events and thus is simple to use and high-performance.

How to Run the Sample?

  1. Download & install OpenJDK 11 (LTS) at AdoptOpenJDK.

  2. Download and install Docker and Docker Compose.

  3. Build Java project and Docker image

    ./gradlew clean build jibDockerBuild -i
    
  4. Run Kafka, ksqlDB and event-sourcing-app

    docker-compose up -d --scale event-sourcing-app=2
    # wait a few minutes
    
  5. Follow the logs of the application

    docker-compose logs -f event-sourcing-app
    
  6. Install curl and jq.

  7. Run test.sh script and see the output.

The test.sh script has the following instructions:

  1. Place new order.
    ORDER_ID=$(curl -s -X POST http://localhost:8080/orders/ -d '{"riderId":"63770803-38f4-4594-aec2-4c74918f7165","price":"123.45","route":[{"address":"Київ, вулиця Полярна, 17А","lat":50.51980052414157,"lon":30.467197278948536},{"address":"Київ, вулиця Новокостянтинівська, 18В","lat":50.48509161169076,"lon":30.485170724431292}]}' -H 'Content-Type: application/json' | jq -r .orderId)
    sleep 1s
    
  2. Get the placed order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "dfc9cc1f-ad69-4977-a271-595b5c9a7fcd",
      "revision": 0,
      "status": "PLACED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "placedDate": "2021-04-25T12:18:30.891914Z"
    }
    
  3. Accept the order.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"ACCEPTED","driverId":"2c068a1a-9263-433f-a70b-067d51b98378","revision":0}' -H 'Content-Type: application/json'
    sleep 1s
    
  4. Get the accepted order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "dfc9cc1f-ad69-4977-a271-595b5c9a7fcd",
      "revision": 1,
      "status": "ACCEPTED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-25T12:18:30.891914Z",
      "acceptedDate": "2021-04-25T12:18:32.421801Z"
    }
    
  5. Try to cancel an outdated version of the order to simulate lost update.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","revision":0}' -H 'Content-Type: application/json' | jq
    
    {
      "error": "Actual revision 1 doesn't match expected revision 0"
    }
    
  6. Try to cancel a version of the order 'from the future' to simulate unordering.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","revision":2}' -H 'Content-Type: application/json' | jq
    
    {
      "error": "Actual revision 1 doesn't match expected revision 2"
    }
    
  7. Complete the order.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"COMPLETED","revision":1}' -H 'Content-Type: application/json'
    sleep 1s
    
  8. Get the completed order.
    curl -s -X GET http://localhost:8080/orders/$ORDER_ID | jq
    
    {
      "id": "dfc9cc1f-ad69-4977-a271-595b5c9a7fcd",
      "revision": 2,
      "status": "COMPLETED",
      "riderId": "63770803-38f4-4594-aec2-4c74918f7165",
      "price": 123.45,
      "route": [
        {
          "address": "Київ, вулиця Полярна, 17А",
          "lat": 50.51980052414157,
          "lon": 30.467197278948536
        },
        {
          "address": "Київ, вулиця Новокостянтинівська, 18В",
          "lat": 50.48509161169076,
          "lon": 30.485170724431292
        }
      ],
      "driverId": "2c068a1a-9263-433f-a70b-067d51b98378",
      "placedDate": "2021-04-25T12:18:30.891914Z",
      "acceptedDate": "2021-04-25T12:18:32.421801Z",
      "completedDate": "2021-04-25T12:18:33.671775Z"
    }
    
  9. Try to cancel a completed order to simulate business rule violation.
    curl -s -X PATCH http://localhost:8080/orders/$ORDER_ID -d '{"status":"CANCELLED","revision":2}' -H 'Content-Type: application/json' | jq
    
    {
      "error": "Order in status COMPLETED can't be cancelled"
    }
    
  10. Print integration events.
    docker exec -it kafka /bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic order-integration-events --from-beginning --property print.key=true --timeout-ms 3000
    
    dfc9cc1f-ad69-4977-a271-595b5c9a7fcd	{"order_id":"dfc9cc1f-ad69-4977-a271-595b5c9a7fcd","event_type":"OrderPlacedEvent","event_timestamp":1619353110891,"revision":0,"status":"PLACED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}]}
    dfc9cc1f-ad69-4977-a271-595b5c9a7fcd	{"order_id":"dfc9cc1f-ad69-4977-a271-595b5c9a7fcd","event_type":"OrderAcceptedEvent","event_timestamp":1619353112421,"revision":1,"status":"ACCEPTED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"}
    dfc9cc1f-ad69-4977-a271-595b5c9a7fcd	{"order_id":"dfc9cc1f-ad69-4977-a271-595b5c9a7fcd","event_type":"OrderCompletedEvent","event_timestamp":1619353113671,"revision":2,"status":"COMPLETED","rider_id":"63770803-38f4-4594-aec2-4c74918f7165","price":123.45,"route":[{"ADDRESS":"Київ, вулиця Полярна, 17А","LAT":50.51980052414157,"LON":30.467197278948536},{"ADDRESS":"Київ, вулиця Новокостянтинівська, 18В","LAT":50.48509161169076,"LON":30.485170724431292}],"driver_id":"2c068a1a-9263-433f-a70b-067d51b98378"}
    

If you find this article helpful, please star it on GitHub

Star