Simple Event-Sourcing with EF Core and SQL Server

Introduction

In this article, I delve deeper into the architectural explorations that began with my initial post on ‘Implementing a Clean Architecture in ASP.NET Core 6‘. The spotlight this time is on implementing a rudimentary event-sourcing mechanism using Entity Framework Core and SQL Server.

While this article navigates the intricacies of my chosen implementation, it presumes you already have a foundational grasp of event sourcing, its association with event-driven architectures, and its synergy with patterns like Domain-Driven Design (DDD) and Command Query Responsibility Segregation (CQRS). If you’re looking to enhance your understanding of these underlying concepts or to garner insights on the merits and challenges of event-sourcing, I’ve assembled a list of resources at the conclusion of this article.

Core Abstractions

In event-sourcing, we often deal with large streams of events and need ways to efficiently recreate system state. Our three core interfaces handle these challenges: IEventStore manages event persistence and retrieval, IEventStoreSnapshotProvider uses snapshots for faster state reconstruction, and IRetroactiveEventsService deals with changes to past events.

Event persistence

IEventStore is central to our event-sourced system, serving as the interface for storing and retrieving domain events tied to aggregates. It manages the persistence of domain events for an aggregate and provides capabilities to fetch domain events based on a version range.

Snapshots

To optimize performance in scenarios with large event logs, IEventStoreSnapshotProvider offers snapshot functionality. Snapshots capture an aggregate’s state at a specific point in time, reducing the need to replay every event for aggregate rehydration. This interface deals with both the retrieval of an aggregate’s state from a snapshot and the persistence of an aggregate’s current state as a snapshot.

Retroactive events

For instances where we need to make adjustments to historical event streams, IRetroactiveEventsService comes into play. Whether for corrections, recalculations, or backdated operations, this interface facilitates the introduction of retroactive events into an existing event stream and returns the modified stream.

The Event Store

In this section, we’ll delve into the actual implementation details of our event store using Entity Framework. Our primary class, EFEventStore, provides concrete implementations for the essential operations defined in our IEventStore interface. With Entity Framework as its backbone, this class essentially persists and retrieves domain events to and from our data store.

Context and DbSet

At its core, EFEventStore relies on the EventStoreDbContext context. The context contains only one DbSet, the EventEntity DbSet, which represents the table where the domain events are stored.

        private readonly EventStoreDbContext _context;
        private readonly DbSet<EventEntity> _events;

        public EFEventStore(EventStoreDbContext context) {
            _context = context;
            _events = _context.Set<EventEntity>();
        }

Loading Domain Events

The LoadAsync method fetches a sequence of domain events for a specified aggregate, within a given version range. Each fetched event is then transformed from its storage format into its actual domain event type using the DomainEventHelper.ConstructDomainEvent method.

public async Task<IReadOnlyCollection<IDomainEvent<TAggregateId>>> LoadAsync<TAggregateId>(TAggregateId aggregateRootId, string aggregateName, int fromVersion, int toVersion)
{
    Guard.Against.Negative(fromVersion, nameof(fromVersion));
    Guard.Against.Negative(toVersion, nameof(toVersion));
    if (fromVersion > toVersion)
    {
        throw new ArgumentException($"{nameof(fromVersion)} cannot be greater than {nameof(toVersion)}");
    }

    IQueryable<EventEntity> events = _events.Where(e => e.AggregateId == aggregateRootId.ToString() && e.AggregateName == aggregateName && e.Version >= fromVersion && e.Version <= toVersion).OrderBy(de => de.Version);
    var domainEvents = new List<IDomainEvent<TAggregateId>>();
    
    //get events
    foreach (var @event in events)
    {
        var domainEvent = DomainEventHelper.ConstructDomainEvent<TAggregateId>(@event.Data, @event.AssemblyTypeName);
        domainEvents.Add(domainEvent);
    }

    return domainEvents.AsReadOnly();
}

Saving Domain Events

EFEventStore provides two SaveAsync methods: one for saving a collection of domain events and another for saving a single event. Both methods create corresponding EventEntity instances for each domain event and add them to the EventEntity DbSet. The changes are then persisted to the database using the SaveChangesAsync method.

public async Task SaveAsync<TAggregateId>(string aggregateName, int expectedVersion, IEnumerable<IDomainEvent<TAggregateId>> domainEvents)
{
    foreach (var domainEvent in domainEvents)
    {
        EventEntity eventEntity = ConstructEventEntity(domainEvent, expectedVersion, aggregateName);
        await _events.AddAsync(eventEntity);
    }
    await _context.SaveChangesAsync();
}

public async Task SaveAsync<TAggregateId>(string aggregateName, int expectedVersion, IDomainEvent<TAggregateId> domainEvent)
{

    EventEntity eventEntity = ConstructEventEntity(domainEvent, expectedVersion, aggregateName);
    await _events.AddAsync(eventEntity);
    await _context.SaveChangesAsync();
}

Event Entity Construction

To assist in saving domain events, the private method ConstructEventEntity is used. This method constructs an EventEntity instance from a given domain event. The method also checks for potential concurrency issues by verifying the version of the incoming domain event against the expected version. The domain event is serialized into a string format for storage, and additional metadata, like the event’s type, is captured for future deserialization.

private EventEntity ConstructEventEntity<TAggregateId>(IDomainEvent<TAggregateId> domainEvent, int expectedVersion, string aggregateName)
{
    if (domainEvent.AggregateVersion > expectedVersion)
        throw new EventStoreException($"Concurrency issue detected when saving events. Event found with version {domainEvent.AggregateVersion} which is larger than maximum expected version {expectedVersion}");
    
    Type domainEventType = domainEvent.GetType();
    return new EventEntity()
    {
        Id = domainEvent.EventId,
        AggregateId = domainEvent.AggregateId.ToString(),
        AggregateName = aggregateName,
        Name = domainEventType.Name,
        AssemblyTypeName = domainEventType.AssemblyQualifiedName,
        Data = JsonConvert.SerializeObject(domainEvent),
        Version = domainEvent.AggregateVersion,
        CreatedAt = DateTime.UtcNow //duplicate. save or lose??
    };
}

The Snapshot Provider

As our system evolves and the number of events increases, snapshots become essential for performance optimization. The EFEventStoreSnapshotProvider class gives us the capability to save the current state of an aggregate, allowing us to avoid reapplying a large number of events. Through this service, we can persist these snapshots and retrieve them when needed. The full implementation is shown below:

using Microsoft.EntityFrameworkCore;
using Newtonsoft.Json;
using CH.EventStore.Abstractions;
using CH.Domain.Abstractions;
using CH.EventStore.EntityFramework.Entities;
using System.Threading.Tasks;
using System;
using System.Linq;

namespace CH.EventStore.EntityFramework
{
    /// <summary>
    /// Implementation of <see cref="IEventStoreSnapshotProvider"/> using Entity Framework
    /// </summary>
    internal class EFEventStoreSnapshotProvider : IEventStoreSnapshotProvider
    {
        private readonly EventStoreDbContext _context;
        private readonly DbSet<AggregateSnapshotEntity> _snapshots;
        private readonly JsonSerializerSettings _jsonSerializerSettings = new JsonSerializerSettings { ContractResolver = new PrivateSetterContractResolver() };

        public EFEventStoreSnapshotProvider(EventStoreDbContext context)
        {
            _context = context;
            _snapshots = context.Set<AggregateSnapshotEntity>();
        }

        public async Task<T> GetAggregateFromSnapshotAsync<T, TAggregateId>(TAggregateId aggregateId, string aggregateName) where T : class, IAggregateRoot<TAggregateId>
        {
            AggregateSnapshotEntity entity = await GetLatestSnapshotAsync(aggregateId, aggregateName);
            if (entity == null)
                return default;
            T aggregate = JsonConvert.DeserializeObject<T>(entity.Data, _jsonSerializerSettings);
            aggregate.ClearUncommittedEvents(); //to remove constructor creation event
            return aggregate;
        }

        public async Task SaveSnapshotAsync<T, TId>(T aggregate, Guid lastEventId) where T : class, IAggregateRoot<TId>
        {
            AggregateSnapshotEntity newSnapshot = new AggregateSnapshotEntity()
            {
                Data = JsonConvert.SerializeObject(aggregate),
                AggregateId = aggregate.Id.ToString(),
                LastAggregateVersion = aggregate.Version,
                AggregateName = typeof(T).Name,
                LastEventId = lastEventId
            };
            _snapshots.Add(newSnapshot);
            await _context.SaveChangesAsync();
        }

        private Task<AggregateSnapshotEntity> GetLatestSnapshotAsync<TAggregateId>(TAggregateId aggregateId, string aggregateName)
        {
            return _snapshots.Where(snap => snap.AggregateId == aggregateId.ToString() && snap.AggregateName == aggregateName).OrderByDescending(a => a.LastAggregateVersion).FirstOrDefaultAsync();
        }
    }
}

The Retroactive Events service

The EFRetroactiveEventsService is responsible for the critical task of applying retroactive events to an event stream. Using this service, we can correct or enhance an existing event stream by interleaving or replacing events that may have occurred out-of-order, were rejected, or later identified as incorrect.

Branch points

In event-sourced systems, once an event is stored, it is immutable; it cannot be changed. However, there are times when you might discover that an event was incorrect, or perhaps you wish to insert additional events in a specific position within the event stream. That’s where branch points come into play.

The BranchPointEntity class is a representation of such moments in the event stream where alterations are needed.

public class BranchPointEntity : DataEntityBase<int>
{
    /// <summary>
    /// Branch point indicative name
    /// </summary>
    public string Name { get; set; }

    /// <summary>
    /// FK for the Event Entity
    /// </summary>
    public Guid EventId { get; set; }

    /// <summary>
    /// The type of the branch point
    /// </summary>
    public BranchPointTypeEnum Type { get; set; }

    /// <summary>
    /// Navigation property of the event
    /// </summary>
    public virtual EventEntity Event { get; set; }

    public virtual ICollection<RetroactiveEventEntity> RetroactiveEvents { get; set; }
}

The properties of this BranchPointEntity are described below:

  • Name: This name helps in identifying or describing the purpose or reason for the branch point.
  • EventId: This is the foreign key that relates the branch point to a specific event in the event stream. It pinpoints the location in the event stream where retroactive changes need to be made.
  • Type (BranchPointTypeEnum): This enumeration value represents the nature of the branch point. There can be several types, such as:
    • OutOfOrder: Indicates that events should be interleaved within the existing stream.
    • Incorrect: Indicates that events should be added as a replacement of the linked event defined in the branch point.
    • Rejected: Indicates that the event is not supposed to be part of the event stream.
  • Event (Navigation Property): This is the linked event in the event stream associated with the branch point.
  • RetroactiveEvents: It’s a collection of events that should be applied when this branch point is hit during the processing of the event stream. These are the events that will replace, precede, or follow the original event based on the branch point’s type.

Applying retroactive events to stream

The ApplyRetroactiveEventsToStream method is a central piece in our implementation of retroactive events within the event-sourced system. It takes in an event stream and returns a potentially modified stream by applying the branch points, making it an essential part of the retroactivity logic. Lets describe what happens in here in more detail:

  1. Fetch Branch Points: We start by querying all the branch points that match events in the provided event stream.
  2. Sequential Stream Processing: For each event in the input stream (processed in ascending order of their aggregate version):
    • We check if there’s an associated branch point for the event.
    • If a branch point exists, based on the type of the branch point (OutOfOrder, Incorrect, Rejected), we apply the necessary retroactive events from the branch point into the stream. This could mean adding new events, replacing existing ones, or even skipping certain events.
    • If no branch point exists for the event, the event is added to the new stream unchanged.
  3. Return New Stream: After processing all events, the method returns a potentially modified event stream.

It is important in event-sourcing implementations, especially in the context of retroactive changes, to remain faithful to two principles: immutability and maintaining the order of events. As you’ll notice in the implementation, the original event stream remains unchanged and a potential new stream with retroactive changes is created and returned. Also, it’s important to process the input stream in the correct order (i.e., ascending order of aggregate version) to ensure the retroactive events are injected at the right places.

Putting it all together

We’ve been discussing different parts of this architecture. Let’s take a moment to explore the synthesis of these elements, captured in our custom event-sourcing repository, ESRepository<T, TId>.

The ESRepository<T, TId> class serves as a tailored repository for event-sourced entities. It’s built upon the concept of aggregate roots, represented by the IAggregateRoot<TId> abstraction. For those who’ve been following along, you’ll recall that we touched on the IAggregateRoot abstraction in a previous post. It forms the cornerstone of Domain-Driven Design (DDD) and the event sourcing patterns, encapsulating the business logic and ensuring changes are made in a consistent and valid way.


internal class ESRepository<T, TId> : IESRepository<T, TId> where T : class, IAggregateRoot<TId>
{
    private readonly ILogger<ESRepository<T, TId>> _logger;
    private readonly IEventStore _eventStore;
    private readonly IEventStoreSnapshotProvider _snapshotService;
    private readonly IRetroactiveEventsService _retroEventsService;
    private const int SNAPSHOT_FREQUENCY = 50;

    public ESRepository(ILogger<ESRepository<T, TId>> logger, IEventStore eventStore, IEventStoreSnapshotProvider snapshotProvider, IRetroactiveEventsService retroEventsService)
    {
        _logger = logger;
        _eventStore = eventStore;
        _snapshotService = snapshotProvider;
        _retroEventsService = retroEventsService;
    }

    public async Task<T> GetByIdAsync(TId id)
    {
        try
        {
            T aggregate = CreateEmptyAggregate();
            string aggregateName = typeof(T).Name;
            int fromVersion = 0;

            T snapshotAggregate = await _snapshotService.GetAggregateFromSnapshotAsync<T, TId>(id, aggregateName);
            if (snapshotAggregate != default)
            {
                aggregate = snapshotAggregate;
                fromVersion = snapshotAggregate.Version + 1;
            }

            var eventsForAggregate = await _eventStore.LoadAsync<TId>(id, aggregateName, fromVersion, int.MaxValue);

            //if no events are found, return default
            if (!eventsForAggregate.Any() && snapshotAggregate == default) //if no events or snapshot is found
                throw new EventStoreAggregateNotFoundException($"Aggregate {aggregateName} with id {id} not found");

            eventsForAggregate = _retroEventsService.ApplyRetroactiveEventsToStream<T, TId>(eventsForAggregate);

            foreach (var @event in eventsForAggregate)
            {
                aggregate.ApplyEvent(@event, @event.AggregateVersion);
            }

            if (aggregate.IsDeleted)
                throw new EventStoreAggregateNotFoundException($"Aggregate {aggregateName} with id {id} not found");

            return aggregate;
        }
        catch (Exception ex)
        {
            _logger.LogError($"Error occured while retrieving from event source repository. Exception: {ex}");
            throw;
        }
    }

    public async Task SaveAsync(T aggregate)
    {
        try
        {
            IAggregateRoot<TId> aggregatePersistence = aggregate;
            string aggregateName = typeof(T).Name;
            var uncommittedEvents = aggregate.GetUncommittedEvents();

            if (!uncommittedEvents.Any())
            {
                return;
            }

            Guid lastEventId = uncommittedEvents.Last().EventId;

            await _eventStore.SaveAsync(aggregateName, aggregate.Version, uncommittedEvents);
            if (ShouldSnapshot(aggregate.Version, uncommittedEvents.Count()))
            {
                await _snapshotService.SaveSnapshotAsync<T, TId>(aggregate, lastEventId);
            }
            aggregatePersistence.ClearUncommittedEvents();
        }
        catch (Exception ex)
        {
            _logger.LogError($"Error occured while saving aggregate. Exception: {ex}");
            throw;
        }
    }

    private T CreateEmptyAggregate()
    {
        return (T)typeof(T).GetConstructor(BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Public, null, Array.Empty<Type>(), Array.Empty<ParameterModifier>()).Invoke(Array.Empty<object>());
    }

    private bool ShouldSnapshot(int aggregateVersion, int numberOfeventsToCommit)
    {
        //Every N events we save a snapshot
        return ((aggregateVersion >= SNAPSHOT_FREQUENCY) &&
            (
                (numberOfeventsToCommit >= SNAPSHOT_FREQUENCY) ||
                (aggregateVersion % SNAPSHOT_FREQUENCY < numberOfeventsToCommit) ||
                (aggregateVersion % SNAPSHOT_FREQUENCY == 0)
            )
        );
    }
}

This repository provides a seamless interface to interact with event-sourced entities by orchestrating operations with the Event Store, Snapshot Service, and the Retroactive Events Service. It facilitates fetching the aggregate’s current state by replaying events and possibly leveraging snapshots for efficiency. Additionally, it handles the persistence of new domain events and decides when to take snapshots based on the number of events occurred previously.

Wrapping up

Event Sourcing is a profound pattern that can offer immense value to applications, especially when dealing with complex domain logic. In this exploration, we’ve touched upon the various intricacies and components that contribute to a holistic event-sourcing architecture.

To facilitate ease of integration and modularity, the abstractions have been packaged into one Nuget package, while the Entity Framework Core implementations are encapsulated in another separate package. For those eager to delve deeper into the inner workings, or perhaps even contribute, the source code is available in the repository.

Resources

Follow some of the links below if you are interested to know more about event-sourcing, its pros and cons, and when or why you should (or shouldn’t) use it.

Martin Fowler on Event Sourcing: A comprehensive introduction to the event sourcing pattern from one of the industry’s leading figures. Read here

Event Sourcing Pattern – Microsoft Azure: Detailed insights into the pattern, especially in the context of cloud solutions. Check it out

Greg Young – The art of destroying software: A must-watch talk for anyone diving into event sourcing, where Greg Young elaborates on the importance of being able to rebuild your system at any point in time. Watch the talk

Event Store: An open-source, functional database that exclusively uses the event sourcing pattern. It provides a good reference for those looking to see event sourcing in action. Visit their website

Domain-Driven Design: While not exclusively about event sourcing, Eric Evans’ book on Domain-Driven Design touches on patterns and practices that align closely with event sourcing. A recommended read for anyone delving into domain modeling. Find the book here

Event Sourcing Made Simple: A blog post that breaks down the complexities of event sourcing into digestible bits. Read the post

Event Sourcing Pattern – Microservices.io: A concise breakdown of the event sourcing pattern in the context of microservices architecture. The page offers clear diagrams and explanations of key concepts. Dive in here

Leave a reply:

Your email address will not be published.

Site Footer