Sunday, October 18, 2015

Bulk SQL projections with F# and type providers

Early Summer, I had to set up an integration with an external partner. They required of us to daily provide them with a relational dataset stored in SQL Server. Most, if not all of the data was temporal, append-only by nature; think logins, financial transactions..

Since the data required largely lived in an eventstore on our end, I needed fast bulk projections. Having experimented with a few approaches, I eventually settled on projections in F# taking advantage of type providers.

Let's say we have an event for when users watched a video and one for when users shared a video.

We want to take streams from our eventstore and project them to a specific state; a stream goes in and state comes out.

Then we want to take that state, and store it in our SQL Server database.

Some infrastructure that reads a specific stream, runs the projection, stores the state and checkpoints the projection, could look like this.

To avoid data corruption, storing the state and writing the checkpoint happens in the same transaction.

With this piece of infrastructure in place, we are close to implementing an example. But before we do that, we first need to install the FSharp.Data.SqlClient package. Using this package, we can use the SqlProgrammabilityProvider type provider to provide us with types for each table in our destination database. In the snippet below, I'll create a typed dataset for the WatchedVideos table and add a row.

I haven't defined this type, nor was it generated by me. The SqlProgrammabilityProvider type provider gives you these for free, based on the meta data it can extract from the destination database. This also means that when you change your table, without changing your code, the compiler will have no mercy and immediately feed back where you broke your code. In this usecase, where you rather rebuild your data than migrate it, the feedback loop of changing your database model becomes so short, that it allows you to break stuff with much confidence. The only caveat here is that the compiler must always be able to access that specific database, compiling without fails. In practice, this means you need to ship your source with a build script that sets up your database locally before you do any work.

Going from a stream to a dataset is quite declarative and straightforward with the help of pattern matching.

Storing the result in an efficient fashion is also simple, since the dataset directly exposes a BulkCopy method.

When we put this all together, we end up with this composition.

Executing this program, we can see the data was persisted like expected.

In the real world, you also want to take care of batching and logging, but that isn't too hard to implement.

Having this approach in production for some time now, I'm still quite happy with how it turned out. The implementation is fast, and the code is compact and easy to maintain.


  1. Where I work we're about to finish work on something very similar. The main difference is that our projection function doesn't talk to SQL directly. Instead it builds a (pure, in-memory) representation of the database request.

    type Request<'a, 'State> =
    | Load of Id * ('State -> Request<'a, 'State>)
    | Save of Id * 'State * Request<'a, 'State>
    | Done of 'a

    Your projection is then a pure function mapping an Event to a Request. You can write a monad bind operation which sequences two requests by ripping Done off the end of the first one and tacking the second request on. This makes batch-processing of events easy - just map the projection function over a list of events and sequence the result. The magic of Computation Expressions means you can make a projection look like an imperative program but in reality it's a pure function which builds an expression.

    The nice thing about that is the decoupling you get between the business logic of the projection and the operational code to talk to the data store. You get complete freedom in what to do with a Request. You could execute it against a SQL database with type providers, compile it to an Elastic Search query, run it against an in-memory dictionary (for tests), or print it to the console.

    The Haskell community call this pattern "Free Monad + Interpreter".

    1. Hi Benjamin

      Thanks for the comment, looks great. I definitely see potential for more abstraction and pureness, but I stopped writing code when my usecase was satisfied :-)

  2. Blogpost engine ate my first attempt to make a comment.

    I made almost same structure for partial-input, partial-output parsing.

    type Parser<'i, 'err, 'o> =
    | Await of 'i -> 'i * Parser<'i, 'err, 'o>
    | Yield of 'o * Parser<'i, 'err, 'o>
    | Stop of 'err

    and there was an interpreter:

    val tee : 'i -> Parser<'i, 'err, 'o> -> Either 'e ('o * Parser<'i, 'err, 'o>)

    The main problem on the way to make it a monad was containing itself in the negative position (as the result of the function). The parser should return "a new self" each time - I used it a lot. For the instance, when string "123456" is parsed from the text part "123" it reruns itself on the same input as (once (string "123") (once (string "456") (string "123456"))).

    As I can subsume, you don't use the self-mutating capabilities of your authomata.