Sunday, October 27, 2013

Event projections

In my first two posts on event sourcing, I implemented an event sourced aggregate from scratch. After being able to have an aggregate record and play events, I looked at persisting them in an event store. Logically, the next question is: how do I query my aggregates, how do I get my state out?

In traditional systems, write and read models are not separated, they are one and the same. Event sourced systems on the other hand have a write model - event streams, and a separate read model. The read model is built from events committed to the write model; events are projected into one or more read models.


An interface for a projection could look like this.
public interface IProjection {
    void Handle(EventStream eventStream);                     
}  
A projection takes in an event stream, and projects it to some read model.

A read model can be anything; a cache, a document store, a key value store, a relational database, a file, or even some evil global state.
public class EvilStatisticsReadModel {
    public static int WithdrawalAmountExceededCount { get; set; }

    public static int AmountDepositedCount { get; set; }
}
In this model, we want to maintain statistics of events that happened. For that to happen, we need to define a projection of our event stream.
public class ProjectionsToEvilStaticsReadModel : IProjection {
    public void Handle(EventStream eventStream) {
        foreach (var @event in eventStream)
            When((dynamic)@event);
    }

    public void When(WithdrawalAmountExceeded @event) {
        EvilStatisticsReadModel.WithdrawalAmountExceededCount++;
    }

    public void When(AmountDeposited @event) {
        EvilStatisticsReadModel.AmountDepositedCount++;
    }    
}
If we now let this projection handle an event stream, our read model will be kept up-to-date.
[TestMethod]
public void ReadModelIsKeptUpToDateWhileProjectingTheEventStream() {
    var events = new List<IEvent>() {
        new WithdrawalAmountExceeded(new Amount(3000)),
        new AmountDeposited(new Amount(300)),
        new AmountDeposited(new Amount(500)),
        new AmountWithdrawn(new Amount(100))
    };
    var stream = new EventStream(events);

    new ProjectionsToEvilStaticsReadModel().Handle(stream);

    Assert.AreEqual(1, EvilStatisticsReadModel.WithdrawalAmountExceededCount);
    Assert.AreEqual(2, EvilStatisticsReadModel.AmountDepositedCount);    
}
One could argue that all of this is too much - not worth the effort. Where you first just persisted the structure of an aggregate, and could query that same structure, you now first have to persist events for then to write projections that maintain separate read models that can be queried.

You have to look beyond that though. Those that have done any serious work on a traditional stack have felt the pain of migrations, complex queries that take up three pages, obscure stored procedures that run for hours, optimizing while having to consider a handful of different use cases, finding the balance between write- and read performance, database servers that can't handle the load on special events, expensive licenses and so on. While these first few concerns are mostly technical, personally I'm often overwhelmed by how much concepts these designs force you to keep in your head all at once.

Separating reads from writes using event sourcing might bring some relief. Reducing cognitive overload by separating responsibilities into smaller, more granular bits might be the only argument you need. However, there's a lot more. Running an event store should be low-maintenance; it's an append-only data model storing simple serialized DTO's with some meta data - forget about big migrations (not completely though), schemas, indexes and so on. Even if you project into a relational database, being able to re-run projections should make migration scripts and versioning avoidable. An event can be projected into multiple read models, allowing you to optimize per use case, without having to take other use cases into account. Since it should be easy to rebuild read models, they can be stored in cheap and volatile storage - think key-value store, in-memory and so on, allowing for crazy fast reads.

Letting go of the single-model dogma seems to enable so much more, giving you a whole new set of possibilities. Another extremely useful use case that suddenly becomes a lot easier to support is business intelligence; when business experts think of new ways to look at the past, you just create a new projection and project events from day one. Getting statistics of how your users are using your system doesn't sound that hard now, does it?

One of the obvious drawbacks next to writing a bit more, boring code is that storage costs will increase - you are now persisting the same data in multiple representations. But storage is cheap, right? Maybe money isn't an issue, but what about performance? It's slower to do three writes instead of one, right? For a lot of scenarios this won't be much of an issue, but if it is, there is a lot of room for optimiziations doing projections; parallelization, eventual consistency and so on.

Next week: event source all the things? 

Sunday, October 20, 2013

An event store

Last week, I implemented an event sourced aggregate from scratch. There I learned, that there isn't much to a naively implemented event sourced aggregate; it should be able to initialize itself from a stream of events, and it should be able to record all the events it raises.
public interface IEventSourcedAggregate : IAggregate {
    void Initialize(EventStream eventStream);

    EventStream RecordedEvents();
}
The question I want to answer today is: how do I persist those event sourced aggregates?

In traditional systems, aggregate persistence is not a trivial topic. Especially relational databases have the reputation to make things hard on us. Even though tools such as ORM's have tried to help in making the gap between the relational and object oriented model as small as possible, there is still a lot of friction associated with the notorious impedance mismatch.
The last two years I have done some work using one of the popular NoSQL variants: a document store. In this paradigm, each aggregate materializes into a single document. Structure, constraints and referential integrity are not enforced by the database, but by code. The advantage of relaxing consistency at the database, is that it makes it easier to scale outside a single machine, and that developers feel more empowered. Giving in on consistency guarantees is not acceptable for each system though. Again, pick the right tool for the job.
What both paradigms have in common is that they both focus on structure instead of behaviour.

Event sourced systems on the other hand, don't care about the structure of an aggregate, but about the events that caused the aggregate to be in its current state. Only having to store events - which are represented as DTO's - makes persistence and tooling much easier compared to traditional systems.

There are three things a minimalistic event store should be able to do:
  1. Store a new event stream 
  2. Append to an existing event stream
  3. Retrieve an existing event stream
An interface for that could look like this.
public interface IEventStore {
    void CreateOrAppend(Guid aggregateId, EventStream eventStream);

    EventStream GetStream(Guid aggregateId);
}
Notice that there is no update or delete - events happen, we can't jump in a time machine and alter the past. This allows us to get by with an append-only data model. Can you imagine how much easier to implement, optimize and distribute this must be compared to traditional models?

As an exercise, I took the interface I just defined and implemented a durable, non-transactional, non-scalable (up to 4294967295 streams), single-user event store that persists event streams in raw text files. Each record on disk represents a serialized event with a tiny bit of metadata. 
public class FileEventStore : IEventStore {    
    private const string Dir = @"C:\EventStore";            

    public void CreateOrAppend(Guid aggregateId, EventStream eventStream) {
        EnsureDirectoryExists();

        var path = EventStoreFilePath.From(Dir, aggregateId).Value;

        using (var stream = new FileStream(
            path, FileMode.Append, FileAccess.Write, FileShare.None))
        {
            using (var streamWriter = new StreamWriter(stream))
            {
                streamWriter.AutoFlush = false;
                foreach (var @event in eventStream)
                    streamWriter.WriteLine(
                        new Record(aggregateId, @event).Serialized());
            }
        }
    }
    
    public EventStream GetStream(Guid aggregateId) {           
        var path = EventStoreFilePath.From(Dir, aggregateId).Value;

        if (!File.Exists(path))
            return null;

        var lines = File.ReadAllLines(path);
        var events = lines
            .Select(x => Record.Deserialize(x))
            .Select(x => x.Event)
            .ToList();

        if (events.Any())
            return new EventStream(events);

        return null;
    }

    private void EnsureDirectoryExists()
    {
        if (!Directory.Exists(Dir))
            Directory.CreateDirectory(Dir);
    }
}
A long-ish test proves that I can create a stream, append to it and read it again without losing any data.
[TestMethod]
public void EventStoreCanCreateAppendAndRetrieveEventStreams() 
{
    var eventStore = new FileEventStore();

    var aggregateId = Guid.NewGuid();
    var account = new Account(aggregateId);
    account.Deposit(new Amount(3000));
    account.Withdraw(new Amount(400));    
    
    Assert.AreEqual(2, account.RecordedEvents().Count());
    Assert.AreEqual(new Amount(2600), account.Amount);

    eventStore.CreateOrAppend(aggregateId, account.RecordedEvents());
    var eventStream = eventStore.GetStream(aggregateId);

    Assert.AreEqual(2, eventStream.Count());

    var anotherAccount = new Account(aggregateId);
    anotherAccount.Initialize(eventStream);

    Assert.AreEqual(new Amount(2600), anotherAccount.Amount);

    anotherAccount.Withdraw(new Amount(200));

    Assert.AreEqual(new Amount(2400), anotherAccount.Amount);
    Assert.AreEqual(1, anotherAccount.RecordedEvents().Count());

    eventStore.CreateOrAppend(aggregateId, anotherAccount.RecordedEvents());

    var finalEventStream = eventStore.GetStream(aggregateId);
    Assert.AreEqual(3, finalEventStream.Count());
}
This produced the following artifact on disk.


While this implementation is far from ideal - dangerous really, it does show that implementing a minimalistic event store is doable - especially if you can build on top of existing data stores.

Doable, but not trivial. Greg Young - having actually implemented an event store, on the CLR too - recently shared some invaluable insights into what it takes to build a real-world event store.
I have always said an event store is a fun project because you can go anywhere from an afternoon to years on an implementation. 
I think there is a misunderstanding how people normally use an event stream for event sourcing. They read from it. Then they write to it. They expect optimistic concurrency from another thread having read from then written to the same stream. This is currently not handled. This could be handled as simply as checking the expected previous event but this wouldn't work because the file could be scavenged in between. The way this is generally worked around is a monotonically increasing sequence that gets assigned to an event. This would be relatively trivial to add. 
The next issue is that I can only read the stream from the beginning to the end or vice versa. If I have a stream with 20m records in it and I have read 14m of them and the power goes out; when I come back up I want to start from 14m (stream.Position = previous; is a Seek() and 14m can be very expensive if you happen to be working with files the OS has not cached for you). This is a hugely expensive operation to redo and the position I could have saved won't help me as the file could get compacted in between. To allow arbitrary access to the stream is a bit more difficult. The naive way would be to use something like a sorted dictionary or dictionary of lists as an index but you will very quickly run out of memory. B+Trees/LSM are quite useful here. 
Even with the current index (stream name to current position) there is a fairly large problem as it gets large. With 5m+ streams you will start seeing large pauses from the serializing out the dictionary. At around 50m your process will blow up due to 1gb object size limit in CLR
Similar to the index issue is that with a dictionary of all keys being stored in memory and taking large numbers of writes per second it is quite likely you will run out of memory if people are using small streams (say I have 10000 sensors and I do a stream every 5 seconds for their data to partition). Performance will also drastically decrease as you use more memory due to GC.
A more sinister problem is the scavenge / compaction. It stops the writer. When I have 100mb of events this may be a short pause. When I have 50gb of events this pause may very well turn into minutes. 
There is also the problem of needing N * N/? disk space in order to do a scavenge (you need both files on disk). With write speeds of 10MB/second it obviously wouldn't take long to make these kinds of huge files especially in a day where we consider a few TB to be small. The general way of handling this is the file gets broken into chunks then each chunk can be scavenged independently (while still allowing reads off it). Chunks can for instance be combined as well as they get smaller (or empty). 
Another point to bring up is someone wanting to write N events together in a transactional fashion to a stream. This sounds like a trivial addition but its less than trivial to implement (especially with some of the other things discussed here). As was mentioned in a previous thread a transaction starts by definition when there is more than one thing to do. 
There are decades worth of previous art in this space. It might be worth some time looking through it. LSM trees are a good starting point as is some of the older material on various ways of implementing transaction logs.
Playing with Greg's event store is something that has been on my list for a long time.

What is your experience with implementing an event store?

Next week: but how do we query our aggregates now?

Sunday, October 13, 2013

An event sourced aggregate

Last week I shared my theoretical understanding of event sourcing. Today, I want to make an attempt at making that theory tangible by implementing an event sourced aggregate.

In traditional systems, we only persist the current state of an object.


In event sourced systems, we don't persist the current state of an object, but the sequence of events that caused the object to be in the current state.


If we want an aggregate to be event sourced, it should be able to rebuild itself from a stream of events, and it should be able to record all the events it raises.
public interface IEventSourcedAggregate : IAggregate
{
    void Initialize(EventStream eventStream);

    EventStream RecordedEvents();
}
Let's implement the example aggregate we used last week: an account. An account owner can deposit and withdraw an amount from his account. There is a maximum amount policy for withdrawals though.
public class Account : IEventSourcedAggregate {
    private readonly Guid _id;

    public Account(Guid id) {
        _id = id;
    }

    public Guid Id { get { return _id; } }

    public void Initialize(EventStream eventStream) { 
        throw new NotImplementedException();
    }

    public EventStream RecordedEvents() { 
        throw new NotImplementedException(); 
    }
    
    public void Deposit(Amount amount) { }
    
    public void Withdraw(Amount amount) { }
}
Next to the Initialize and RecordedEvents method, our aggregate facade hasn't changed. We still have a Deposit and a Withdraw operation like we would have in a traditional aggregate. How those two methods get implemented differs though.

When we deposit or withdraw an amount, we want to - instead of changing the state directly - apply events. When an event gets applied its handler will first be invoked, for the event then to be recorded.
public void Deposit(Amount amount) {
    Apply(new AmountDeposited(amount));
}

public void Withdraw(Amount amount) {
    if (amount.IsOver(AmountPolicy.Maximum))     {
        Apply(new WithdrawalAmountExceeded(amount));

        return;
    }

    Apply(new AmountWithdrawn(amount));
}

private void Apply(IEvent @event) {
    When((dynamic)@event);
    _eventRecorder.Record(@event);
}
An event recorder is a small object that keeps track of recorded events.
public class EventRecorder
{
    private readonly List<IEvent> _events = new List<IEvent>();

    public void Record(IEvent @event) {
        _events.Add(@event);
    }

    public EventStream RecordedEvents() {
        return new EventStream(_events);
    }
}
This object will be used to have our aggregate return a stream of recorded events.
public EventStream RecordedEvents() {
    return _eventRecorder.RecordedEvents();
}
We can now also implement initializing the aggregate from a stream of events.
public void Initialize(EventStream eventStream) {
    foreach (var @event in eventStream)
        When((dynamic)@event);
}
Here too, event handlers are invoked by using the dynamic run-time to find the best overload.

It's the event handlers that will change the aggregate's state. In this example, they can be implemented like this.
private void When(AmountWithdrawn @event) {
    _amount = _amount.Substract(@event.Amount);
}

private void When(AmountDeposited @event) {
    _amount = _amount.Add(@event.Amount);
}

private void When(WithdrawalAmountExceeded @event) { }
A test verifies that when I invoke operations on the aggregate, all the events are recorded, and the state has changed. When I use those recorded events to rebuild the same aggregate, we end up with the same state.
[TestMethod]
public void ICanReplayTheEventsAndHaveTheStateRebuilt() {
    var account = new Account(Guid.NewGuid());

    account.Deposit(new Amount(2500));
    account.Withdraw(new Amount(100));
    account.Withdraw(new Amount(200));

    Assert.AreEqual(3, account.RecordedEvents().Count());
    Assert.AreEqual(new Amount(2200), account.Amount);

    var events = account.RecordedEvents();

    var secondAccount = new Account(Guid.NewGuid());
    secondAccount.Initialize(events);

    Assert.AreEqual(new Amount(2200), secondAccount.Amount);
    Assert.AreEqual(0, secondAccount.RecordedEvents().Count());
}
And this is all there is to an event sourced aggregate.

For this exercise I tried to keep the number of concepts low. Many will have noticed that extracting a few concepts would benefit re-use and explicitness.

Also using the DLR to invoke the correct event handlers might be frowned upon; it's not the most performant method, each event must have a handler, and in case a handler is missing the exception is not pretty. Experienced readers will also have noticed concepts such as versioning and snapshots are not implemented yet. I hope limiting the amount of concepts and indirections made this blog post easier to read.

Any thoughts on this implementation?

Next week: where do I persist these events?

Sunday, October 6, 2013

My understanding of event sourcing

I've been studying event sourcing from a distance for little over a year now; reading online material and going through some of the excellent OS code. Unfortunately, there would be no value introducing it into my current project - it would even be a terrible idea, so I decided to satisfy my inquisitiveness by consolidating and sharing my understanding of the concept.

Domain events

An event is something that happened in the past.

Events are described as verbs in the past tense. For example; amount withdrawn, amount deposited, maximum withdrawal amount exceeded. Listen for them when talking to your domain experts; events are as much a part of the ubiquitous language as commands, aggregates, value objects etc...

Once you've captured a few events, you will notice how these concepts have always implicitly been there, but by making them explicit you introduce a whole new set of power tools to work with.

Event sourcing

Having defined domain events one more time, we can now look at event sourcing. By the name alone, it should be obvious events are going to play the lead role.

In traditional systems, we only persist the current state of an object. In event sourced systems, we don't persist the current state of an object, but the sequence of events that caused the object to be in the current state.

In traditional systems, every time a change happens, we retrieve the old state, mutate it, and store the result as our current state. In this example, only the last column would be persisted.

Old amount Command Current amount
CreateAccount $0
$0 Deposit $2000 $2000
$2000 Withdraw $100 $1900
$1900 Withdraw $500 $1400
$1400 Withdraw $2000 $1400
$1400 Withdraw $300 $1100

In event sourced systems on the other hand, we store the changes that happened - the second column, not the current state. To arrive at the current state again, we take all these events - and replay them.

Command Event Current amount
CreateAccount AccountCreated $0
Deposit $2000 Deposited $2000 $2000
Withdraw $100 Withdrawn $100 $1900
Withdraw $500 Withdrawn $500 $1400
Withdraw $2000 Maximum withdrawal amount exceeded!  $1400
Withdraw $300 Withdrawn $300 $1100

Notice how we already gain better insights into what's happening by seeing an explicit maximum amount exceeded event.

Next time; what does this look like in code?

Feel free to complement and correct my understanding of event sourcing.