Jef Claes

On software and life

30 Jul 2017

Fast projections

Most EventStore client libraries allow you to subscribe to a stream by passing in a callback which is invoked when an event occurs (either a live or historic event).

type Envelope = {
  Id : Int64
  Event : Event
  Historic : bool
}

type EventOccurred = Envelope -> unit

Let’s say we subscribe to a stream of a popular video service, and we want to project a read model that shows how many videos a viewer has watched. We don’t care about the bookmarked videos for now.

type Event =
  | WatchedVideo of WatchedVideo
  | BookmarkedVideo of BookmarkedVideo
and WatchedVideo = { ViewerId : string; VideoId : string; At : DateTime }
and BookmarkedVideo = { ViewerId : string; VideoId : string; At : DateTime }

We’re sitting on top of storage that can execute a single statement and a batch of statements.

The statements supported are limited:

  • Set a checkpoint
  • Increment the view count for a viewer
type Statement =
  | Checkpoint of Int64
  | IncrementViewCount of string * int

type Batch = Statement seq

type Storage = {
  ExecuteStatement : Statement -> unit
  ExecuteBatch : Batch -> unit
  CalculateCost : unit -> decimal
}

The storage engine exposes a method which calculates the cost of executed statements:

  • Executing a single statement costs 1 execution unit
  • Executing a batch also costs 1 execution unit plus 0.1 execution unit per statement in the batch

The stream

For this exercise the stream contains 3500 historic views, 50 historic bookmarks and 100 live views.

let stream = seq {
  for i in 1L .. 3500L do
    yield { Id = i; Event = WatchedVideo { ViewerId = "1"; VideoId = "1"; At = DateTime.UtcNow }; Historic = true }
  for i in 3551L .. 3650L do
    yield { Id = i; Event = WatchedVideo { ViewerId = "2"; VideoId = "1"; At = DateTime.UtcNow }; Historic = false }
  for i in 3501L .. 3550L do 
    yield { Id = i; Event = BookmarkedVideo { ViewerId = "1"; VideoId = "1"; At = DateTime.UtcNow }; Historic = true }
}

First attempt

The first attempt at projecting the stream to state, executes a statement for each event we’re interested in and checkpoints after each event (even the ones we’re not interested in).

let ``First try`` (storage : Storage) : EventOccurred =
  fun envelope ->

      match envelope.Event with
      | WatchedVideo x -> IncrementViewCount (x.ViewerId, 1) |> storage.ExecuteStatement
      | _ -> ()

      Checkpoint envelope.Id |> storage.ExecuteStatement

// ## First try ##
// -----------------------------------------
// - Viewcount: seq [[1, 3500]; [2, 100]]
// - Checkpoint: 3650L
// -----------------------------------------
// = Cost: 7250.0M

The cost of this projection is high: 7250 execution units - even though there are only 3600 events we’re interested in. We execute a statement for each event we handled and checkpoint immediately after, even for the events we didn’t handle.

Less checkpointing

It’s not hard to get rid of some of the checkpointing though.

let ``Less checkpointing`` (storage : Storage) : EventOccurred =
  fun envelope ->

    match envelope.Event with
    | WatchedVideo x ->
      IncrementViewCount (x.ViewerId, 1) |> storage.ExecuteStatement
      Checkpoint envelope.Id |> storage.ExecuteStatement
    | _ -> ()

// ## Less checkpointing ##
// -----------------------------------------
// - Viewcount: seq [[1, 3500]; [2, 100]]
// - Checkpoint: 3650L
// -----------------------------------------
// = Cost: 7200.0M

The cost has improved, but only marginally. We saved 50 execution units by avoiding checkpointing after events we do not handle. Time for a bigger improvement..

Batching

Instead of handling each event individually, we will buffer them as soon as they come in. When we’re catching up and seeing historic events, we only flush the buffer every 100 events. When we’re caught up, we flush on each event. We want to always make a best attempt at showing fresh data.

When the buffer gets flushed, events are mapped into a sequence of statements, which are sent in batch to the storage engine. The checkpoint is appended to the tail of the batch.

let ``Batching`` (storage : Storage) : EventOccurred =
  fun envelope ->

    let flushOn = if envelope.Historic then 100 else 1

    let flush () =
      let batch =
        buffer
        |> Seq.map (fun env ->
          match env.Event with
          | WatchedVideo x -> IncrementViewCount (x.ViewerId, 1) |> Some
          | _ -> None
        )
        |> Seq.choose id

      if not (batch |> Seq.isEmpty) then
        [ Checkpoint envelope.Id ]
        |> Seq.append batch
        |> storage.ExecuteBatch

      buffer.Clear()

    match envelope.Event with
    | WatchedVideo _ -> buffer.Add envelope
    | _ -> ()

    if buffer.Count >= flushOn then
      flush()
      
// ## Batching ##
// -----------------------------------------
// - Viewcount: seq [[1, 3500]; [2, 100]]
// - Checkpoint: 3650L
// -----------------------------------------
// = Cost: 508.5M

This approach makes a significant difference. Execution cost has reduced by 93%! Batching of historic events makes replays much faster, but with some extra effort we can take this optimization even further.

Batching with transformation

It always pays off to understand the guarantees and intricacies of the storage you’re using. Looking closely at the storage interface, we find that we can increment the view count by any number. If we use a local data structure to aggregate the view count up front, we can reduce the number of statements even further.

In practice, we filter the for events we’re interested in, group by the viewer id, count the values and map that into a single statement per viewer.

let ``Batching with transformation`` (storage : Storage) : EventOccurred =
  fun envelope ->

    let flushOn = if envelope.Historic then 100 else 1

    let flush () =
      let batch =
        buffer
        |> Seq.map (fun env -> 
          match env.Event with 
          | WatchedVideo x -> Some x 
          | _ -> None
        )
        |> Seq.choose id
        |> Seq.groupBy (fun e -> e.ViewerId)
        |> Seq.map (fun (viewerId, e) -> (viewerId, e |> Seq.length))
        |> Seq.map (fun (viewerId, length) -> IncrementViewCount (viewerId, length))

      if not (batch |> Seq.isEmpty) then
          [ Checkpoint envelope.Id ]
          |> Seq.append batch
          |> storage.ExecuteBatch

      buffer.Clear()

    match envelope.Event with
    | WatchedVideo _ -> buffer.Add envelope
    | _ -> ()

    if buffer.Count >= flushOn then
      flush()
 
// ## Batching with transformation ##
// -----------------------------------------
// - Viewcount: seq [[1, 3500]; [2, 100]]
// - Checkpoint: 3650L
// -----------------------------------------
// = Cost: 162.0M

This further reduces costs by more than 2/3th. The optimization makes the code a bit more elaborate, but not necessarily that more complex - it’s still a local optimization.

Conclusion

In three steps, we brought cost down from 7250 execution units to only 162 units. That makes me a 44x engineer, right?

In general, storage is one of the slowest components of your system. Making your system faster often involves making it do less work. Avoiding waste by batching and some more work up front, can make a big impact when you want to make your projection faster.

You can find the complete F# script here.